Index: src/libchcore/TBufferList.cpp =================================================================== diff -u -r593ca68706f3a3c7c2a3820b02a56fc24d5bea11 -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 593ca68706f3a3c7c2a3820b02a56fc24d5bea11) +++ src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -74,6 +74,17 @@ return m_listBuffers.empty(); } + void TBufferList::SetExpectedBuffersCount(size_t stExpectedBuffers) + { + m_stExpectedBuffers = stExpectedBuffers; + UpdateEvent(); + } + + HANDLE TBufferList::GetAllBuffersAccountedForEvent() const + { + return m_eventAllBuffersAccountedFor.Handle(); + } + boost::signals2::signal& TBufferList::GetNotifier() { return m_notifier; Index: src/libchcore/TBufferList.h =================================================================== diff -u -rb941384e121190b6107f1c99b3233667e3daf4ce -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision b941384e121190b6107f1c99b3233667e3daf4ce) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -39,8 +39,8 @@ size_t GetCount() const; bool IsEmpty() const; - void SetExpectedBuffersCount(size_t stExpectedBuffers) { m_stExpectedBuffers = stExpectedBuffers; } - HANDLE GetAllBuffersAccountedForEvent() const { return m_eventAllBuffersAccountedFor.Handle(); } + void SetExpectedBuffersCount(size_t stExpectedBuffers); + HANDLE GetAllBuffersAccountedForEvent() const; boost::signals2::signal& GetNotifier(); Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -44,6 +44,11 @@ { } + void TOverlappedReaderFB::SetReleaseMode() + { + m_spReader->ReleaseBuffers(); + } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::OnReadPossible(bool& bStopProcessing, bool& bProcessedFlag) { TOverlappedDataBuffer* pBuffer = m_spReader->GetEmptyBuffer(); Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -36,6 +36,7 @@ ~TOverlappedReaderFB(); TOverlappedReaderPtr GetReader() const { return m_spReader; } + void SetReleaseMode(); TSubTaskBase::ESubOperationResult OnReadPossible(bool& bStopProcessing, bool& bProcessedFlag); TSubTaskBase::ESubOperationResult OnReadFailed(bool& bStopProcessing, bool& bProcessedFlag); Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -43,32 +43,67 @@ { } - void TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) + TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState(bool& bProcessed) { - m_spReader->GetReader()->ReleaseBuffers(); - m_spWriter->GetWriter()->ReleaseBuffers(); + m_spReader->SetReleaseMode(); + m_spWriter->SetReleaseMode(); - enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; - std::array arrHandles = { hKillEvent, m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent() }; + enum + { + eAllBuffersAccountedFor, eWriteFinished, eWriteFailed, eWritePossible, eHandleCount + }; + std::array arrHandles = { + m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent(), + m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), + m_spWriter->GetWriter()->GetEventWriteFailedHandle(), + m_spWriter->GetWriter()->GetEventWritePossibleHandle() + }; - bool bExit = false; - while (!bExit) + TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; + bool bStopProcessing = false; + while(!bStopProcessing) { + bool bIgnoreStop = false; + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch (dwResult) + switch(dwResult) { case STATUS_USER_APC: break; - case WAIT_OBJECT_0 + eAllBuffersReturned: - bExit = true; + case WAIT_OBJECT_0 + eAllBuffersAccountedFor: + { + LOG_DEBUG(m_spLog) << L"All buffer accounted for."; + + eResult = TSubTaskBase::eSubResult_KillRequest; + bStopProcessing = true; break; + } - case WAIT_OBJECT_0 + eKillThread: - bExit = true; + case WAIT_OBJECT_0 + eWritePossible: + { + eResult = m_spWriter->OnWritePossible(bIgnoreStop, bProcessed); break; } + + case WAIT_OBJECT_0 + eWriteFailed: + { + eResult = m_spWriter->OnWriteFailed(bIgnoreStop, bProcessed); + break; + } + + case WAIT_OBJECT_0 + eWriteFinished: + { + eResult = m_spWriter->OnWriteFinished(bIgnoreStop, bProcessed); + break; + } + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } } + + return eResult; } TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::Start(HANDLE hKill, bool& bProcessed) @@ -146,7 +181,7 @@ } } - WaitForMissingBuffersAndResetState(hKill); + WaitForMissingBuffersAndResetState(bProcessed); return eResult; } Index: src/libchcore/TOverlappedReaderWriterFB.h =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -47,7 +47,7 @@ TOverlappedWriterFBPtr GetWriter() const { return m_spWriter; } // event access - void WaitForMissingBuffersAndResetState(HANDLE hKillEvent); + TSubTaskBase::ESubOperationResult WaitForMissingBuffersAndResetState(bool& bProcessed); private: logger::TLoggerPtr m_spLog; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -racd7bcfa7355db4a0d9af99a1bb99d685810d790 -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision acd7bcfa7355db4a0d9af99a1bb99d685810d790) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -56,6 +56,12 @@ if(!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); + if(m_bReleaseMode) + { + m_spEmptyBuffers->Push(pBuffer); + return TSubTaskBase::eSubResult_Continue; + } + bool bSkip = false; TSubTaskBase::ESubOperationResult eResult = m_spDstFile->WriteFileFB(*pBuffer, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) @@ -82,6 +88,12 @@ if(!pBuffer) throw TCoreException(eErr_InternalProblem, L"Failed to retrieve write failed buffer", LOCATION); + if(m_bReleaseMode) + { + m_spEmptyBuffers->Push(pBuffer); + return TSubTaskBase::eSubResult_Continue; + } + bool bSkip = false; TSubTaskBase::ESubOperationResult eResult = m_spDstFile->HandleWriteError(*pBuffer, bSkip); if(eResult == TSubTaskBase::eSubResult_Retry) @@ -110,6 +122,18 @@ if(!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); + file_size_t fsWritten = pBuffer->GetRealDataSize(); + + if(m_bReleaseMode) + { + AdjustProcessedSize(fsWritten); + + m_spEmptyBuffers->Push(pBuffer); + bProcessedFlag = pBuffer->IsLastPart() && (pBuffer->GetBytesTransferred() == fsWritten); + + return TSubTaskBase::eSubResult_Continue; + } + TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; if(pBuffer->IsLastPart()) { @@ -133,8 +157,6 @@ } } - file_size_t fsWritten = pBuffer->GetRealDataSize(); - // in case we read past the original eof, try to get new file size from filesystem AdjustProcessedSize(fsWritten); Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -35,6 +35,8 @@ TOverlappedWriterPtr GetWriter() const { return m_spWriter; } + void SetReleaseMode() { m_bReleaseMode = true; } + TSubTaskBase::ESubOperationResult OnWritePossible(bool& bStopProcessing, bool& bProcessedFlag); TSubTaskBase::ESubOperationResult OnWriteFailed(bool& bStopProcessing, bool& bProcessedFlag); TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing, bool& bProcessedFlag); @@ -51,6 +53,7 @@ TFileInfoPtr m_spSrcFileInfo; TFileInfoPtr m_spDstFileInfo; TBufferListPtr m_spEmptyBuffers; + bool m_bReleaseMode = false; }; using TOverlappedWriterFBPtr = std::shared_ptr; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -r685d0da3259dd94327ee8d644a88c155585b8249 -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -50,16 +50,7 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); if(!bKeepPosition) - { - if(IsDataSourceFinished()) - m_spUnorderedQueue->Push(pBuffer); - else - { - pBuffer->InitForRead(m_ullNextReadPosition, m_dwChunkSize); - m_ullNextReadPosition += m_dwChunkSize; - m_tClaimedQueue.Push(pBuffer); - } - } + m_spUnorderedQueue->Push(pBuffer); else if(IsDataSourceFinished()) { if(!pBuffer->IsLastPart()) Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -r734408890246965d47e6bbf2c2978371269dd1fd -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -182,6 +182,9 @@ // next file to be copied TFileInfoPtr spFileInfo = rFilesCache.GetAt(fcIndex); + if(spFileInfo->IsProcessed()) + continue; + TSmartPath pathCurrent = spFileInfo->GetFullFilePath(); // new stats Index: src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp =================================================================== diff -u -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 -r980c1a0de537813728871676200a0960410b11fb --- src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) +++ src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) @@ -76,10 +76,10 @@ TOverlappedDataBuffer buffer2(1024, nullptr); TOverlappedDataBuffer buffer3(1024, nullptr); TOverlappedDataBuffer buffer4(1024, nullptr); - queue.Push(&buffer4, false); - queue.Push(&buffer3, false); - queue.Push(&buffer2, false); queue.Push(&buffer1, false); + queue.Push(&buffer2, false); + queue.Push(&buffer3, false); + queue.Push(&buffer4, false); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer4, queue.Pop()); @@ -121,16 +121,16 @@ queue.Push(&buffer4, false); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); - EXPECT_EQ(&buffer3, queue.Pop()); - EXPECT_EQ(0, buffer3.GetFilePosition()); - EXPECT_EQ(1024, buffer3.GetRequestedDataSize()); - - EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer4, queue.Pop()); - EXPECT_EQ(1024, buffer4.GetFilePosition()); + EXPECT_EQ(0, buffer4.GetFilePosition()); EXPECT_EQ(1024, buffer4.GetRequestedDataSize()); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer3, queue.Pop()); + EXPECT_EQ(1024, buffer3.GetFilePosition()); + EXPECT_EQ(1024, buffer3.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer2, queue.Pop()); EXPECT_EQ(2048, buffer2.GetFilePosition()); EXPECT_EQ(1024, buffer2.GetRequestedDataSize());