Index: src/libchcore/TBufferList.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -32,6 +32,7 @@ throw TCoreException(eErr_InvalidArgument, L"pBuffer", LOCATION); m_listBuffers.push_front(pBuffer); + m_notifier(true); } TOverlappedDataBuffer* TBufferList::Pop() @@ -42,12 +43,18 @@ TOverlappedDataBuffer* pBuffer = m_listBuffers.front(); m_listBuffers.pop_front(); + m_notifier(false); + return pBuffer; } void TBufferList::Clear() { + bool bRemoved = !m_listBuffers.empty(); m_listBuffers.clear(); + + if (bRemoved) + m_notifier(false); } size_t TBufferList::GetCount() const @@ -59,4 +66,9 @@ { return m_listBuffers.empty(); } + + boost::signals2::signal& TBufferList::GetNotifier() + { + return m_notifier; + } } Index: src/libchcore/TBufferList.h =================================================================== diff -u -rbef894e38e5c1486824787cf8c47a87a0828b228 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision bef894e38e5c1486824787cf8c47a87a0828b228) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -19,6 +19,8 @@ #ifndef __TBUFFERLIST_H__ #define __TBUFFERLIST_H__ +#include + namespace chcore { class TOverlappedDataBuffer; @@ -36,8 +38,11 @@ size_t GetCount() const; bool IsEmpty() const; + boost::signals2::signal& GetNotifier(); + private: std::list m_listBuffers; + boost::signals2::signal m_notifier; }; using TBufferListPtr = std::shared_ptr; Index: src/libchcore/TFilesystemFileFeedbackWrapper.cpp =================================================================== diff -u -rb6a48931b8155a01d871d050f52d915abb2df8ca -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision b6a48931b8155a01d871d050f52d915abb2df8ca) +++ src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -324,7 +324,7 @@ return TSubTaskBase::eSubResult_Continue; } - TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::TruncateFileFB(file_size_t fsNewSize, const TSmartPath& pathFile, bool& bSkip) + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::TruncateFileFB(file_size_t fsNewSize, bool& bSkip) { bSkip = false; @@ -345,10 +345,10 @@ TString strFormat = _T("Error %errno while truncating file %path to 0"); strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); LOG_ERROR(m_spLog) << strFormat.c_str(); - TFeedbackResult frResult = m_spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eResizeError, dwLastError); + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eResizeError, dwLastError); switch (frResult.GetResult()) { case EFeedbackResult::eResult_Cancel: @@ -378,7 +378,7 @@ return TSubTaskBase::eSubResult_Continue; } - TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::ReadFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip) + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::ReadFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip) { bSkip = false; bool bRetry = false; @@ -401,10 +401,10 @@ TString strFormat = _T("Error %errno while requesting read of %count bytes from source file %path (CustomCopyFileFB)"); strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetRequestedDataSize()).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); LOG_ERROR(m_spLog) << strFormat.c_str(); - TFeedbackResult frResult = m_spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eReadError, dwLastError); + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eReadError, dwLastError); switch (frResult.GetResult()) { case EFeedbackResult::eResult_Cancel: @@ -434,7 +434,7 @@ return TSubTaskBase::eSubResult_Continue; } - TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::WriteFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip) + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::WriteFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip) { bSkip = false; @@ -458,10 +458,10 @@ TString strFormat = _T("Error %errno while trying to write %count bytes to destination file %path (CustomCopyFileFB)"); strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetBytesTransferred()).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); LOG_ERROR(m_spLog) << strFormat.c_str(); - TFeedbackResult frResult = m_spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eWriteError, dwLastError); + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eWriteError, dwLastError); switch (frResult.GetResult()) { case EFeedbackResult::eResult_Cancel: @@ -491,7 +491,7 @@ return TSubTaskBase::eSubResult_Continue; } - TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::FinalizeFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip) + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::FinalizeFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip) { bSkip = false; @@ -514,10 +514,10 @@ TString strFormat = _T("Error %errno while trying to finalize file %path (CustomCopyFileFB)"); strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); LOG_ERROR(m_spLog) << strFormat.c_str(); - TFeedbackResult frResult = m_spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eFinalizeError, dwLastError); + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eFinalizeError, dwLastError); switch (frResult.GetResult()) { case EFeedbackResult::eResult_Cancel: Index: src/libchcore/TFilesystemFileFeedbackWrapper.h =================================================================== diff -u -rb6a48931b8155a01d871d050f52d915abb2df8ca -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision b6a48931b8155a01d871d050f52d915abb2df8ca) +++ src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -42,13 +42,12 @@ TSubTaskBase::ESubOperationResult OpenDestinationFileFB(const TFileInfoPtr& spSrcFileInfo, unsigned long long& ullSeekTo, bool& bFreshlyCreated, bool& bSkip, bool bProtectReadOnlyFiles); - TSubTaskBase::ESubOperationResult TruncateFileFB(file_size_t fsNewSize, - const TSmartPath& pathFile, bool& bSkip); + TSubTaskBase::ESubOperationResult TruncateFileFB(file_size_t fsNewSize, bool& bSkip); - TSubTaskBase::ESubOperationResult ReadFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip); - TSubTaskBase::ESubOperationResult WriteFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip); + TSubTaskBase::ESubOperationResult ReadFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip); + TSubTaskBase::ESubOperationResult WriteFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip); - TSubTaskBase::ESubOperationResult FinalizeFileFB(TOverlappedDataBuffer& rBuffer, const TSmartPath& pathFile, bool& bSkip); + TSubTaskBase::ESubOperationResult FinalizeFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip); TSmartPath GetFilePath() const { return m_spFile->GetFilePath(); } file_size_t GetFileSize() const { return m_spFile->GetFileSize(); } Index: src/libchcore/TOverlappedMemoryPool.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedMemoryPool.cpp (.../TOverlappedMemoryPool.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedMemoryPool.cpp (.../TOverlappedMemoryPool.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -27,8 +27,7 @@ namespace chcore { TOverlappedMemoryPool::TOverlappedMemoryPool() : - m_spQueueBuffers(std::make_shared()), - m_eventAllBuffersAccountedFor(true, true) + m_spQueueBuffers(std::make_shared()) { } Index: src/libchcore/TOverlappedMemoryPool.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedMemoryPool.h (.../TOverlappedMemoryPool.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedMemoryPool.h (.../TOverlappedMemoryPool.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -47,8 +47,6 @@ std::vector> m_listAllBuffers; TBufferListPtr m_spQueueBuffers; - - TEvent m_eventAllBuffersAccountedFor; }; using TOverlappedMemoryPoolPtr = std::shared_ptr; Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) +++ src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -32,7 +32,6 @@ m_spMemoryPool(spMemoryPool), m_tReader(spLogFileData, spMemoryPool->GetBufferList(), ullFilePos, dwChunkSize), m_tWriter(spLogFileData, m_tReader.GetFinishedQueue(), ullFilePos), - m_bDataWritingFinished(false), m_eventAllBuffersAccountedFor(true, true) { if(!spMemoryPool) @@ -105,9 +104,6 @@ if(pBuffer) { - if (pBuffer->IsLastPart()) - m_bDataWritingFinished = true; - pBuffer->SetParam(this); UpdateAllBuffersAccountedFor(); @@ -206,6 +202,7 @@ m_tWriter.MarkAsFinalized(pBuffer); } + void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() { size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_tReader.GetBufferCount() + m_tWriter.GetBufferCount(); Index: src/libchcore/TOverlappedReaderWriter.h =================================================================== diff -u -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) +++ src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -23,7 +23,6 @@ #include "../liblogger/TLogFileData.h" #include "../liblogger/TLogger.h" #include "TOverlappedMemoryPool.h" -#include "TOrderedBufferQueue.h" #include "TOverlappedReader.h" #include "TOverlappedWriter.h" @@ -64,7 +63,6 @@ // processing info bool IsDataSourceFinished() const { return m_tReader.IsDataSourceFinished(); } - //bool IsDataWritingFinished() const { return m_bDataWritingFinished; } // event access HANDLE GetEventReadPossibleHandle() const { return m_tReader.GetEventReadPossibleHandle(); } @@ -88,8 +86,6 @@ TOverlappedReader m_tReader; TOverlappedWriter m_tWriter; - bool m_bDataWritingFinished = false; // output file was already written to the end - TEvent m_eventAllBuffersAccountedFor; }; } Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u -rc4cbf6cd567821f9a981586ab5d8294a26f873be -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision c4cbf6cd567821f9a981586ab5d8294a26f873be) +++ src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -28,8 +28,7 @@ unsigned long long ullFilePos) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), m_tBuffersToWrite(spBuffersToWrite), - m_tFinishedBuffers(ullFilePos), - m_bDataWritingFinished(false) + m_tFinishedBuffers(ullFilePos) { if(!spBuffersToWrite) throw TCoreException(eErr_InvalidArgument, L"spBuffersToWrite", LOCATION); @@ -90,7 +89,6 @@ if (pBuffer != m_pLastPartBuffer) throw TCoreException(eErr_InvalidArgument, L"Trying to mark different buffer as finalized", LOCATION); - m_bDataWritingFinished = true; m_pLastPartBuffer = nullptr; } Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u -rc4cbf6cd567821f9a981586ab5d8294a26f873be -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision c4cbf6cd567821f9a981586ab5d8294a26f873be) +++ src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -51,7 +51,6 @@ // processing info void MarkAsFinalized(TOverlappedDataBuffer* pBuffer); - bool IsDataWritingFinished() const { return m_bDataWritingFinished; } // event access HANDLE GetEventWritePossibleHandle() const { return m_tBuffersToWrite.GetHasBuffersEvent(); } @@ -67,7 +66,6 @@ TWriteBufferQueueWrapper m_tBuffersToWrite; TOrderedBufferQueue m_tFinishedBuffers; - bool m_bDataWritingFinished = false; // output file was already written to the end TOverlappedDataBuffer* m_pLastPartBuffer = nullptr; }; } Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -rda7737de7046ba0ecd255240fb36b4a46584ebf2 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision da7737de7046ba0ecd255240fb36b4a46584ebf2) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -25,18 +25,25 @@ { TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : m_spUnorderedQueue(spUnorderedQueue), + m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), - m_dwChunkSize(dwChunkSize), - m_eventHasBuffers(true, false) + m_dwChunkSize(dwChunkSize) { if(!spUnorderedQueue) throw TCoreException(eErr_InvalidArgument, L"spUnorderedQueue is NULL", LOCATION); if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); + m_emptyBuffersQueueConnector = m_spUnorderedQueue->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this, _1)); + UpdateHasBuffers(); } + TReadBufferQueueWrapper::~TReadBufferQueueWrapper() + { + m_emptyBuffersQueueConnector.disconnect(); + } + void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { if (!pBuffer) @@ -98,8 +105,8 @@ { if(IsDataSourceFinished()) return !m_tClaimedQueue.empty(); - else - return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); + + return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); } size_t TReadBufferQueueWrapper::GetCount() const @@ -153,6 +160,11 @@ m_eventHasBuffers.SetEvent(IsBufferReady()); } + void TReadBufferQueueWrapper::UpdateHasBuffers(bool /*bAdded*/) + { + UpdateHasBuffers(); + } + void TReadBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) { m_tClaimedQueue.ReleaseBuffers(spBuffers); Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -rda7737de7046ba0ecd255240fb36b4a46584ebf2 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision da7737de7046ba0ecd255240fb36b4a46584ebf2) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -34,6 +34,7 @@ public: TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize); + ~TReadBufferQueueWrapper(); void Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); TOverlappedDataBuffer* Pop(); @@ -49,9 +50,12 @@ private: bool IsBufferReady() const; void UpdateHasBuffers(); + void UpdateHasBuffers(bool bAdded); private: TBufferListPtr m_spUnorderedQueue; // external queue of buffers to use + boost::signals2::connection m_emptyBuffersQueueConnector; + TSimpleOrderedBufferQueue m_tClaimedQueue; // internal queue of claimed buffers TEvent m_eventHasBuffers; Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -rb6a48931b8155a01d871d050f52d915abb2df8ca -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision b6a48931b8155a01d871d050f52d915abb2df8ca) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -427,7 +427,7 @@ if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Read was possible, but no buffer is available", LOCATION); - eResult = srcFileWrapper.ReadFileFB(*pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); + eResult = srcFileWrapper.ReadFileFB(*pBuffer, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { tReaderWriter.AddEmptyBuffer(pBuffer, false); @@ -477,7 +477,7 @@ if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); - eResult = dstFileWrapper.WriteFileFB(*pBuffer, pData->pathDstFile, bSkip); + eResult = dstFileWrapper.WriteFileFB(*pBuffer, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { tReaderWriter.AddEmptyBuffer(pBuffer, false); @@ -531,7 +531,7 @@ if(pBuffer->IsLastPart()) { - eResult = dstFileWrapper.FinalizeFileFB(*pBuffer, pData->pathDstFile, bSkip); + eResult = dstFileWrapper.FinalizeFileFB(*pBuffer, bSkip); if (eResult != TSubTaskBase::eSubResult_Continue) { tReaderWriter.AddEmptyBuffer(pBuffer, false); @@ -748,7 +748,7 @@ if(!bDstFileFreshlyCreated) { // if destination file was opened (as opposed to newly created) - eResult = rDstFile.TruncateFileFB(fsMoveTo, pData->pathDstFile, bSkip); + eResult = rDstFile.TruncateFileFB(fsMoveTo, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; else if(bSkip)