Index: src/libchcore/TEvent.h =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TEvent.h (.../TEvent.h) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TEvent.h (.../TEvent.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -27,8 +27,11 @@ { public: TEvent(bool bManualReset, bool bInitialState); + TEvent(const TEvent& rSrc) = delete; virtual ~TEvent(); + TEvent& operator=(const TEvent& rSrc) = delete; + HANDLE Get() const { return m_hEvent; } void SetEvent(bool bSet); Index: src/libchcore/TEventCounter.h =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TEventCounter.h (.../TEventCounter.h) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TEventCounter.h (.../TEventCounter.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -40,6 +40,9 @@ UpdateEvent(); } + TEventCounter(const TEventCounter& rSrc) = delete; + TEventCounter& operator=(const TEventCounter& rSrc) = delete; + void Increase() { ++m_tCounter; @@ -48,7 +51,7 @@ void Decrease() { - ++m_tCounter; + --m_tCounter; UpdateEvent(); } @@ -57,6 +60,11 @@ return m_tCounter; } + HANDLE GetEventHandle() const + { + return m_event.Handle(); + } + private: void UpdateEvent() { Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -146,7 +146,7 @@ return m_eventHasReadingFinished.Handle(); } - void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) + void TOrderedBufferQueue::ClearBuffers(const TBufferListPtr& spBuffers) { if(!spBuffers) throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION); Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -55,7 +55,7 @@ HANDLE GetHasErrorEvent() const; HANDLE GetHasReadingFinished() const; - void ReleaseBuffers(const TBufferListPtr& spBuffers); + void ClearBuffers(const TBufferListPtr& spBuffers); boost::signals2::signal& GetNotifier(); Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -49,9 +49,6 @@ TOverlappedDataBuffer* TOverlappedReader::GetEmptyBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_tInputBuffers.Pop(); if (pBuffer) pBuffer->SetParam(this); @@ -63,16 +60,13 @@ if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(!m_bReleaseMode) - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as really-empty; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - } + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as really-empty; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); m_tInputBuffers.PushEmpty(pBuffer); } @@ -82,48 +76,35 @@ if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_tInputBuffers.PushEmpty(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as empty; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as empty; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - m_tInputBuffers.Push(pBuffer); - } + m_tInputBuffers.Push(pBuffer); } void TOverlappedReader::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_tInputBuffers.PushEmpty(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-read; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-read; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - m_spFullBuffers->PushError(pBuffer, m_tInputBuffers); - } + m_spFullBuffers->PushError(pBuffer, m_tInputBuffers); } TOverlappedDataBuffer* TOverlappedReader::GetFailedReadBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_spFullBuffers->PopError(); if (pBuffer) pBuffer->SetParam(this); @@ -136,30 +117,22 @@ if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_tInputBuffers.PushEmpty(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-read; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-read; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - if(pBuffer->IsLastPart()) - m_tInputBuffers.SetDataSourceFinished(pBuffer); + if(pBuffer->IsLastPart()) + m_tInputBuffers.SetDataSourceFinished(pBuffer); - m_spFullBuffers->Push(pBuffer); - } + m_spFullBuffers->Push(pBuffer); } TOverlappedDataBuffer* TOverlappedReader::GetFinishedReadBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_spFullBuffers->Pop(); if(pBuffer) pBuffer->SetParam(this); @@ -177,11 +150,11 @@ return m_tInputBuffers.IsDataSourceFinished(); } - void TOverlappedReader::ReleaseBuffers() + void TOverlappedReader::ClearBuffers() { - m_bReleaseMode = true; - m_tInputBuffers.ReleaseBuffers(); - m_spFullBuffers->ReleaseBuffers(m_spEmptyBuffers); + m_tInputBuffers.ClearBuffers(); + // Do not clear full buffers as they might be in use + //m_spFullBuffers->ClearBuffers(m_spEmptyBuffers); } void TOverlappedReader::UpdateProcessingRange(unsigned long long ullNewPosition) Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -61,7 +61,7 @@ HANDLE GetEventReadFinishedHandle() const; HANDLE GetEventDataSourceFinishedHandle() const; - void ReleaseBuffers(); + void ClearBuffers(); void UpdateProcessingRange(unsigned long long ullNewPosition); @@ -73,8 +73,6 @@ TReadBufferQueueWrapper m_tInputBuffers; TOrderedBufferQueuePtr m_spFullBuffers; // buffers with data - bool m_bReleaseMode = false; // when set, all incoming buffers will go to empty buffers - boost::signals2::connection m_dataRangeChanged; }; Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -82,6 +82,7 @@ void TOverlappedReaderFB::StartThreaded() { TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); + TEvent eventNonSignaled(true, false); m_eThreadResult = TSubTaskBase::eSubResult_Continue; @@ -92,19 +93,18 @@ // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data - enum - { - eKillThread, eReadFailed, eReadPossible, eDataSourceFinished - }; + enum { eKillThread, eReadFailed, eReadPossible, eDataSourceFinished }; std::vector vHandles = { m_rThreadController.GetKillThreadHandle(), - GetReader()->GetEventReadFailedHandle(), - GetReader()->GetEventReadPossibleHandle(), - GetReader()->GetEventDataSourceFinishedHandle() + m_spReader->GetEventReadFailedHandle(), + m_spReader->GetEventReadPossibleHandle(), + m_spReader->GetEventDataSourceFinishedHandle() }; - while(m_eThreadResult == TSubTaskBase::eSubResult_Continue) + bool bDataSourceFinished = false; + + while(m_eThreadResult == TSubTaskBase::eSubResult_Continue && !bDataSourceFinished) { DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); switch(dwResult) @@ -125,21 +125,43 @@ break; case WAIT_OBJECT_0 + eDataSourceFinished: + bDataSourceFinished = true; m_eThreadResult = TSubTaskBase::eSubResult_Continue; - m_eventReadingFinished.SetEvent(); - return; + break; default: throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); } } + + WaitForOnTheFlyBuffers(); + ClearQueues(); + + if(bDataSourceFinished) + m_eventReadingFinished.SetEvent(); } - TOverlappedReaderPtr TOverlappedReaderFB::GetReader() const + void TOverlappedReaderFB::WaitForOnTheFlyBuffers() { - return m_spReader; + DWORD dwResult = WaitForSingleObjectEx(m_counterOnTheFly.GetEventHandle(), INFINITE, TRUE); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0: + return; + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } } + void TOverlappedReaderFB::ClearQueues() + { + m_spReader->ClearBuffers(); + } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::UpdateFileStats() { // update the source file size (it might differ from the time this file was originally scanned). @@ -163,11 +185,6 @@ return eResult; } - void TOverlappedReaderFB::SetReleaseMode() - { - m_spReader->ReleaseBuffers(); - } - HANDLE TOverlappedReaderFB::GetEventReadingFinishedHandle() const { return m_eventReadingFinished.Handle(); Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -55,7 +55,6 @@ TSubTaskBase::ESubOperationResult StopThreaded(); TOrderedBufferQueuePtr GetFinishedQueue() const; - void SetReleaseMode(); HANDLE GetEventReadingFinishedHandle() const; HANDLE GetEventProcessingFinishedHandle() const; @@ -68,7 +67,8 @@ TSubTaskBase::ESubOperationResult OnReadPossible(); TSubTaskBase::ESubOperationResult OnReadFailed(); - TOverlappedReaderPtr GetReader() const; + void WaitForOnTheFlyBuffers(); + void ClearQueues(); private: TOverlappedReaderPtr m_spReader; Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -20,7 +20,6 @@ #include "TOverlappedReaderWriterFB.h" #include "TCoreException.h" #include "ErrorCodes.h" -#include #include "TWorkerThreadController.h" #include "TOverlappedThreadPool.h" #include "TCoreWin32Exception.h" @@ -58,66 +57,29 @@ { } - TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState() + void TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState() { - m_spReader->SetReleaseMode(); - m_spWriter->SetReleaseMode(); - - enum - { - eAllBuffersAccountedFor, eWriteFinished, eWriteFailed, eWritePossible, eHandleCount - }; - std::array arrHandles = { - m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent(), - m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), - m_spWriter->GetWriter()->GetEventWriteFailedHandle(), - m_spWriter->GetWriter()->GetEventWritePossibleHandle() - }; - - TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; bool bStopProcessing = false; while(!bStopProcessing) { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + DWORD dwResult = WaitForSingleObjectEx(m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent(), INFINITE, TRUE); switch(dwResult) { case STATUS_USER_APC: break; - case WAIT_OBJECT_0 + eAllBuffersAccountedFor: + case WAIT_OBJECT_0: { LOG_DEBUG(m_spLog) << L"All buffer accounted for."; - eResult = TSubTaskBase::eSubResult_KillRequest; bStopProcessing = true; break; } - case WAIT_OBJECT_0 + eWritePossible: - { - m_spWriter->OnWritePossible(); - break; - } - - case WAIT_OBJECT_0 + eWriteFailed: - { - m_spWriter->OnWriteFailed(); - break; - } - - case WAIT_OBJECT_0 + eWriteFinished: - { - bool bIgnoreStop = false; - m_spWriter->OnWriteFinished(bIgnoreStop); - break; - } - default: throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); } } - - return eResult; } TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::Start() Index: src/libchcore/TOverlappedReaderWriterFB.h =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -58,7 +58,7 @@ TOverlappedWriterFBPtr GetWriter() const { return m_spWriter; } // event access - TSubTaskBase::ESubOperationResult WaitForMissingBuffersAndResetState(); + void WaitForMissingBuffersAndResetState(); private: logger::TLoggerPtr m_spLog; Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -160,6 +160,12 @@ m_tFinishedBuffers.UpdateProcessingRange(ullNewPosition); } + void TOverlappedWriter::ClearBuffers() + { + m_tBuffersToWrite.ClearBuffers(m_spEmptyBuffers); + m_tFinishedBuffers.ClearBuffers(m_spEmptyBuffers); + } + void TOverlappedWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -59,6 +59,7 @@ HANDLE GetEventWriteFinishedHandle() const; void UpdateProcessingRange(unsigned long long ullNewPosition); + void ClearBuffers(); private: logger::TLoggerPtr m_spLog; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -191,6 +191,11 @@ m_counterOnTheFly.Decrease(); } + void TOverlappedWriterFB::ClearBuffers() + { + m_spWriter->ClearBuffers(); + } + void TOverlappedWriterFB::AdjustProcessedSize(file_size_t fsWritten) { // in case we read past the original eof, try to get new file size from filesystem @@ -305,20 +310,22 @@ void TOverlappedWriterFB::StartThreaded() { TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); + TEvent eventNonSignaled(true, false); m_eThreadResult = TSubTaskBase::eSubResult_Continue; - enum { eKillThread, eWriteFinished, eWriteFailed, eWritePossible }; + enum { eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eNoBuffersOnTheFly }; std::vector vHandles = { m_rThreadController.GetKillThreadHandle(), m_spWriter->GetEventWriteFinishedHandle(), m_spWriter->GetEventWriteFailedHandle(), - m_spWriter->GetEventWritePossibleHandle() + m_spWriter->GetEventWritePossibleHandle(), + eventNonSignaled.Handle() }; - bool bStopProcessing = false; - while(!bStopProcessing && m_eThreadResult == TSubTaskBase::eSubResult_Continue) + bool bWrittenLastBuffer = false; + while(!bWrittenLastBuffer && m_eThreadResult == TSubTaskBase::eSubResult_Continue) { DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); switch(dwResult) @@ -328,7 +335,6 @@ case WAIT_OBJECT_0 + eKillThread: m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; - bStopProcessing = true; break; case WAIT_OBJECT_0 + eWritePossible: @@ -340,20 +346,38 @@ break; case WAIT_OBJECT_0 + eWriteFinished: - { - m_eThreadResult = OnWriteFinished(bStopProcessing); - if(m_eThreadResult == TSubTaskBase::eSubResult_Continue && bStopProcessing) - m_eventWritingFinished.SetEvent(); - break; - } + m_eThreadResult = OnWriteFinished(bWrittenLastBuffer); + break; default: DWORD dwLastError = GetLastError(); throw TCoreWin32Exception(eErr_UnhandledCase, dwLastError, L"Unknown result from async waiting function", LOCATION); } } + + WaitForOnTheFlyBuffers(); + ClearBuffers(); + + if(m_eThreadResult == TSubTaskBase::eSubResult_Continue && bWrittenLastBuffer) + m_eventWritingFinished.SetEvent(); } + void TOverlappedWriterFB::WaitForOnTheFlyBuffers() + { + DWORD dwResult = WaitForSingleObjectEx(m_counterOnTheFly.GetEventHandle(), INFINITE, TRUE); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0: + return; + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } + } + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::StopThreaded() { return m_eThreadResult; @@ -363,9 +387,4 @@ { return m_spWriter; } - - void TOverlappedWriterFB::SetReleaseMode() - { - m_bReleaseMode = true; - } } Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -r3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision 3d7d129eda4a42e9f9318ae6b6f3b873dc9290d4) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -57,8 +57,6 @@ TOverlappedWriterPtr GetWriter() const; - void SetReleaseMode(); - TSubTaskBase::ESubOperationResult OnWritePossible(); TSubTaskBase::ESubOperationResult OnWriteFailed(); TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing); @@ -71,6 +69,8 @@ private: void AdjustProcessedSize(file_size_t fsWritten); TSubTaskBase::ESubOperationResult AdjustFinalSize(); + void WaitForOnTheFlyBuffers(); + void ClearBuffers(); private: TOverlappedWriterPtr m_spWriter; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -161,9 +161,9 @@ m_eventHasBuffers.SetEvent(IsBufferReady()); } - void TReadBufferQueueWrapper::ReleaseBuffers() + void TReadBufferQueueWrapper::ClearBuffers() { - m_tRetryBuffers.ReleaseBuffers(m_spEmptyBuffers); + m_tRetryBuffers.ClearBuffers(m_spEmptyBuffers); } void TReadBufferQueueWrapper::UpdateProcessingRange(unsigned long long ullNewPosition) Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -47,7 +47,7 @@ bool IsDataSourceFinished() const; HANDLE GetHasBuffersEvent() const; - void ReleaseBuffers(); + void ClearBuffers(); void UpdateProcessingRange(unsigned long long ullNewPosition); Index: src/libchcore/TSimpleOrderedBufferQueue.h =================================================================== diff -u -r685d0da3259dd94327ee8d644a88c155585b8249 -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -54,7 +54,7 @@ return *begin(); } - void ReleaseBuffers(const TBufferListPtr& spBuffers) + void ClearBuffers(const TBufferListPtr& spBuffers) { if(!spBuffers) throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION); Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -87,6 +87,12 @@ return m_eventHasBuffers.Handle(); } + void TWriteBufferQueueWrapper::ClearBuffers(const TBufferListPtr& spEmptyBuffers) + { + m_spDataQueue->ClearBuffers(spEmptyBuffers); + m_tRetryBuffers.ClearBuffers(spEmptyBuffers); + } + void TWriteBufferQueueWrapper::UpdateHasBuffers(bool bDataQueueHasPoppableBuffer) { bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.empty(); Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -r10d42e85d810f6da082cb2ce4415dcb72903410e -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -40,6 +40,7 @@ size_t GetCount() const; HANDLE GetHasBuffersEvent() const; + void ClearBuffers(const TBufferListPtr& spEmptyBuffers); private: void UpdateHasBuffers(bool bDataQueueHasPoppableBuffer); Index: src/libchcore/Tests/TOrderedBufferQueueTests.cpp =================================================================== diff -u -ra1f5b3d99f2f175b102d81379698ea1f08e42cce -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/Tests/TOrderedBufferQueueTests.cpp (.../TOrderedBufferQueueTests.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) +++ src/libchcore/Tests/TOrderedBufferQueueTests.cpp (.../TOrderedBufferQueueTests.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -99,7 +99,7 @@ TOverlappedDataBuffer buffer(1024, nullptr); queue.Push(&buffer); - queue.ReleaseBuffers(spReleaseList); + queue.ClearBuffers(spReleaseList); EXPECT_EQ(1, spReleaseList->GetCount()); } @@ -112,7 +112,7 @@ buffer.SetFilePosition(1000); queue.Push(&buffer); - queue.ReleaseBuffers(spReleaseList); + queue.ClearBuffers(spReleaseList); EXPECT_EQ(1, spReleaseList->GetCount()); } Index: src/libchcore/Tests/TSimpleOrderedBufferQueueTests.cpp =================================================================== diff -u -rc4cbf6cd567821f9a981586ab5d8294a26f873be -rb051cbac8dac8c448507aa7c64753af9cf793af5 --- src/libchcore/Tests/TSimpleOrderedBufferQueueTests.cpp (.../TSimpleOrderedBufferQueueTests.cpp) (revision c4cbf6cd567821f9a981586ab5d8294a26f873be) +++ src/libchcore/Tests/TSimpleOrderedBufferQueueTests.cpp (.../TSimpleOrderedBufferQueueTests.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) @@ -70,7 +70,7 @@ TOverlappedDataBuffer buffer(1024, nullptr); queue.Push(&buffer); - queue.ReleaseBuffers(spReleaseList); + queue.ClearBuffers(spReleaseList); EXPECT_EQ(1, spReleaseList->GetCount()); }