Index: src/libchcore/TBufferList.cpp =================================================================== diff -u -N -ra1f5b3d99f2f175b102d81379698ea1f08e42cce -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) +++ src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -19,6 +19,7 @@ #include "stdafx.h" #include "TBufferList.h" #include "TCoreException.h" +#include namespace chcore { @@ -32,50 +33,69 @@ if(!pBuffer) throw TCoreException(eErr_InvalidArgument, L"pBuffer", LOCATION); - m_listBuffers.push_front(pBuffer); - UpdateEvent(); + { + boost::unique_lock lock(m_mutex); + + m_listBuffers.push_front(pBuffer); + UpdateEvent(); + } + m_notifier(); } TOverlappedDataBuffer* TBufferList::Pop() { - if(m_listBuffers.empty()) - return nullptr; + TOverlappedDataBuffer* pBuffer = nullptr; - TOverlappedDataBuffer* pBuffer = m_listBuffers.front(); - m_listBuffers.pop_front(); + { + boost::unique_lock lock(m_mutex); - UpdateEvent(); + if(m_listBuffers.empty()) + return nullptr; + pBuffer = m_listBuffers.front(); + m_listBuffers.pop_front(); + + UpdateEvent(); + } + m_notifier(); return pBuffer; } void TBufferList::Clear() { - bool bRemoved = !m_listBuffers.empty(); - m_listBuffers.clear(); - - if (bRemoved) + bool bRemoved = false; { - UpdateEvent(); - m_notifier(); + boost::unique_lock lock(m_mutex); + + bRemoved = !m_listBuffers.empty(); + m_listBuffers.clear(); + + if(bRemoved) + UpdateEvent(); } + + if(bRemoved) + m_notifier(); } size_t TBufferList::GetCount() const { + boost::shared_lock lock(m_mutex); return m_listBuffers.size(); } bool TBufferList::IsEmpty() const { + boost::shared_lock lock(m_mutex); return m_listBuffers.empty(); } void TBufferList::SetExpectedBuffersCount(size_t stExpectedBuffers) { + boost::shared_lock lock(m_mutex); m_stExpectedBuffers = stExpectedBuffers; UpdateEvent(); } Index: src/libchcore/TBufferList.h =================================================================== diff -u -N -ra1f5b3d99f2f175b102d81379698ea1f08e42cce -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -48,6 +48,8 @@ void UpdateEvent(); private: + mutable boost::shared_mutex m_mutex; + size_t m_stExpectedBuffers = 0; // count of buffers there should be in m_listBuffers when no buffer is in use std::list m_listBuffers; Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -20,12 +20,15 @@ #include "TOrderedBufferQueue.h" #include "TOverlappedDataBuffer.h" #include "TCoreException.h" +#include +#include namespace chcore { TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) : m_eventHasBuffers(true, false), m_eventHasError(true, false), + m_eventHasReadingFinished(true, false), m_ullExpectedBufferPosition(ullExpectedPosition) { } @@ -37,10 +40,18 @@ if(pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION); + boost::unique_lock lock(m_mutex); + auto pairInsert = m_setBuffers.insert(pBuffer); if (!pairInsert.second) throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + if(pBuffer->IsLastPart()) + m_bDataSourceFinished = true; + + if(m_bDataSourceFinished) + UpdateReadingFinished(); + if(pBuffer->GetFilePosition() == m_ullErrorPosition) { if(m_pFirstErrorBuffer != nullptr) @@ -54,7 +65,9 @@ TOverlappedDataBuffer* TOrderedBufferQueue::Pop() { - if(!HasPoppableBuffer()) + boost::unique_lock lock(m_mutex); + + if(!InternalHasPoppableBuffer()) return nullptr; TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); @@ -69,6 +82,8 @@ TOverlappedDataBuffer* TOrderedBufferQueue::PopError() { + boost::unique_lock lock(m_mutex); + if(!m_pFirstErrorBuffer) return nullptr; @@ -81,23 +96,34 @@ const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const { + boost::shared_lock lock(m_mutex); + if(!m_setBuffers.empty()) return *m_setBuffers.begin(); return nullptr; } size_t TOrderedBufferQueue::GetCount() const { + boost::shared_lock lock(m_mutex); return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0); } bool TOrderedBufferQueue::IsEmpty() const { + boost::shared_lock lock(m_mutex); return m_setBuffers.empty(); } bool TOrderedBufferQueue::HasPoppableBuffer() const { + boost::shared_lock lock(m_mutex); + + return InternalHasPoppableBuffer(); + } + + bool TOrderedBufferQueue::InternalHasPoppableBuffer() const + { if(m_setBuffers.empty()) return false; @@ -115,11 +141,18 @@ return m_eventHasError.Handle(); } + HANDLE TOrderedBufferQueue::GetHasReadingFinished() const + { + return m_eventHasReadingFinished.Handle(); + } + void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) { if(!spBuffers) throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION); + boost::unique_lock lock(m_mutex); + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) { spBuffers->Push(pBuffer); @@ -139,15 +172,15 @@ void TOrderedBufferQueue::UpdateHasBuffers() { - if(HasPoppableBuffer()) + if(InternalHasPoppableBuffer()) { m_eventHasBuffers.SetEvent(); - m_notifier(); + m_notifier(true); } else { m_eventHasBuffers.ResetEvent(); - m_notifier(); + m_notifier(false); } } @@ -156,13 +189,33 @@ m_eventHasError.SetEvent(m_pFirstErrorBuffer != nullptr); } - boost::signals2::signal& TOrderedBufferQueue::GetNotifier() + void TOrderedBufferQueue::UpdateReadingFinished() { + bool bFullSequence = true; + unsigned long long ullExpected = m_ullExpectedBufferPosition; + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) + { + if(pBuffer->GetFilePosition() != ullExpected) + { + bFullSequence = false; + break; + } + + ullExpected += pBuffer->GetRequestedDataSize(); + } + + m_eventHasReadingFinished.SetEvent(bFullSequence); + } + + boost::signals2::signal& TOrderedBufferQueue::GetNotifier() + { return m_notifier; } void TOrderedBufferQueue::UpdateProcessingRange(unsigned long long ullNewPosition) { + boost::unique_lock lock(m_mutex); + if(!m_setBuffers.empty()) throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION); Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -24,6 +24,8 @@ #include "TOverlappedDataBuffer.h" #include "TBufferList.h" #include "TCoreException.h" +#include +#include namespace chcore { @@ -51,30 +53,38 @@ HANDLE GetHasBuffersEvent() const; HANDLE GetHasErrorEvent() const; + HANDLE GetHasReadingFinished() const; void ReleaseBuffers(const TBufferListPtr& spBuffers); - boost::signals2::signal& GetNotifier(); + boost::signals2::signal& GetNotifier(); void UpdateProcessingRange(unsigned long long ullNewPosition); private: void UpdateHasBuffers(); void UpdateHasErrors(); + void UpdateReadingFinished(); + bool InternalHasPoppableBuffer() const; + private: using BufferCollection = std::set; BufferCollection m_setBuffers; + mutable boost::shared_mutex m_mutex; + TOverlappedDataBuffer* m_pFirstErrorBuffer = nullptr; unsigned long long m_ullErrorPosition = NoPosition; TEvent m_eventHasBuffers; TEvent m_eventHasError; + TEvent m_eventHasReadingFinished; unsigned long long m_ullExpectedBufferPosition = 0; + bool m_bDataSourceFinished = false; - boost::signals2::signal m_notifier; + boost::signals2::signal m_notifier; }; template @@ -85,6 +95,8 @@ if(!pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push successful buffer to failed queue", LOCATION); + boost::unique_lock lock(m_mutex); + if(!m_pFirstErrorBuffer && m_ullErrorPosition == NoPosition) { m_pFirstErrorBuffer = pBuffer; Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -204,4 +204,9 @@ { return m_spFullBuffers->GetHasBuffersEvent(); } + + HANDLE TOverlappedReader::GetEventDataSourceFinishedHandle() const + { + return m_spFullBuffers->GetHasReadingFinished(); + } } Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -59,6 +59,7 @@ HANDLE GetEventReadPossibleHandle() const; HANDLE GetEventReadFailedHandle() const; HANDLE GetEventReadFinishedHandle() const; + HANDLE GetEventDataSourceFinishedHandle() const; void ReleaseBuffers(); Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -20,6 +20,7 @@ #include "TOverlappedReaderFB.h" #include "TCoreException.h" #include "TFileInfo.h" +#include "TWorkerThreadController.h" namespace chcore { @@ -38,7 +39,8 @@ m_spFilesystem(spFilesystem), m_spSrcFile(), m_spStats(spStats), - m_spSrcFileInfo(spSrcFileInfo) + m_spSrcFileInfo(spSrcFileInfo), + m_rThreadController(rThreadController) { if(!spFeedbackHandler) throw TCoreException(eErr_InvalidArgument, L"spFeedbackHandler is NULL", LOCATION); @@ -63,6 +65,24 @@ return eResult; } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::StopThreaded() + { + if(m_spReadThread) + { + if(m_spReadThread->joinable()) + m_spReadThread->join(); + m_spReadThread.reset(); + } + + return m_eThreadResult; + } + + void TOverlappedReaderFB::StartThreaded() + { + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + m_spReadThread = std::make_unique(&TOverlappedReaderFB::ThreadProc, this); + } + TOverlappedReaderPtr TOverlappedReaderFB::GetReader() const { return m_spReader; @@ -91,6 +111,57 @@ return eResult; } + void TOverlappedReaderFB::ThreadProc() + { + // read data from file to buffer + // NOTE: order is critical here: + // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) + // - kill request is second, so that we can stop processing as soon as all the data is written to destination location; + // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost + // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible + // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data + enum + { + eKillThread, eReadFailed, eReadPossible, eDataSourceFinished + }; + + std::vector vHandles = { + m_rThreadController.GetKillThreadHandle(), + GetReader()->GetEventReadFailedHandle(), + GetReader()->GetEventReadPossibleHandle(), + GetReader()->GetEventDataSourceFinishedHandle() + }; + + while(m_eThreadResult == TSubTaskBase::eSubResult_Continue) + { + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eKillThread: + m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; + break; + + case WAIT_OBJECT_0 + eReadPossible: + m_eThreadResult = OnReadPossible(); + break; + + case WAIT_OBJECT_0 + eReadFailed: + m_eThreadResult = OnReadFailed(); + break; + + case WAIT_OBJECT_0 + eDataSourceFinished: + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + return; + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } + } + } + void TOverlappedReaderFB::SetReleaseMode() { m_spReader->ReleaseBuffers(); Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -22,6 +22,7 @@ #include "TOverlappedReader.h" #include "TFilesystemFileFeedbackWrapper.h" #include "TOverlappedProcessorRange.h" +#include namespace chcore { @@ -44,6 +45,9 @@ ~TOverlappedReaderFB(); TSubTaskBase::ESubOperationResult Start(); + + void StartThreaded(); + TSubTaskBase::ESubOperationResult StopThreaded(); TOverlappedReaderPtr GetReader() const; void SetReleaseMode(); @@ -53,6 +57,7 @@ private: TSubTaskBase::ESubOperationResult UpdateFileStats(); + void ThreadProc(); private: TOverlappedReaderPtr m_spReader; @@ -61,6 +66,10 @@ TFilesystemFileFeedbackWrapperPtr m_spSrcFile; TSubTaskStatsInfoPtr m_spStats; TFileInfoPtr m_spSrcFileInfo; + + TWorkerThreadController& m_rThreadController; + std::unique_ptr m_spReadThread; + TSubTaskBase::ESubOperationResult m_eThreadResult = TSubTaskBase::eSubResult_Continue; }; using TOverlappedReaderFBPtr = std::shared_ptr; Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -22,6 +22,7 @@ #include "ErrorCodes.h" #include #include "TWorkerThreadController.h" +#include namespace chcore { @@ -126,6 +127,8 @@ if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; + m_spReader->StartThreaded(); + // read data from file to buffer // NOTE: order is critical here: // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) @@ -135,70 +138,61 @@ // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data enum { - eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount + eKillThread, eDataSourceFinished, eWriteFinished, eWriteFailed, eWritePossible }; - std::array arrHandles = { + TEvent unsignaledEvent(true, false); + + std::vector vHandles = { m_rThreadController.GetKillThreadHandle(), + m_spReader->GetReader()->GetEventDataSourceFinishedHandle(), m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), m_spWriter->GetWriter()->GetEventWriteFailedHandle(), - m_spWriter->GetWriter()->GetEventWritePossibleHandle(), - m_spReader->GetReader()->GetEventReadFailedHandle(), - m_spReader->GetReader()->GetEventReadPossibleHandle() + m_spWriter->GetWriter()->GetEventWritePossibleHandle() }; bool bStopProcessing = false; while(!bStopProcessing && eResult == TSubTaskBase::eSubResult_Continue) { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); switch(dwResult) { case STATUS_USER_APC: break; case WAIT_OBJECT_0 + eKillThread: - { // log LOG_INFO(m_spLog) << L"Received kill request while copying file"; eResult = TSubTaskBase::eSubResult_KillRequest; bStopProcessing = true; break; - } - case WAIT_OBJECT_0 + eReadPossible: - { - eResult = m_spReader->OnReadPossible(); - break; - } - case WAIT_OBJECT_0 + eReadFailed: - { - eResult = m_spReader->OnReadFailed(); - break; - } case WAIT_OBJECT_0 + eWritePossible: - { eResult = m_spWriter->OnWritePossible(); break; - } case WAIT_OBJECT_0 + eWriteFailed: - { eResult = m_spWriter->OnWriteFailed(); break; - } case WAIT_OBJECT_0 + eWriteFinished: - { eResult = m_spWriter->OnWriteFinished(bStopProcessing); break; - } + case WAIT_OBJECT_0 + eDataSourceFinished: + eResult = m_spReader->StopThreaded(); + vHandles[eDataSourceFinished] = unsignaledEvent.Handle(); + break; + default: throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); } } + // ensure the reading thread is stopped (in case the switch version won't be called) + m_spReader->StopThreaded(); + WaitForMissingBuffersAndResetState(); return eResult; Index: src/libchcore/TSubTaskStatsInfo.h =================================================================== diff -u -N -r734408890246965d47e6bbf2c2978371269dd1fd -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TSubTaskStatsInfo.h (.../TSubTaskStatsInfo.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) +++ src/libchcore/TSubTaskStatsInfo.h (.../TSubTaskStatsInfo.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -161,18 +161,17 @@ TSharedModificationTracker m_fcProcessedCount; mutable TSharedModificationTracker m_tCountSpeed; - TSharedModificationTracker m_fcCurrentIndex; + TSharedModificationTracker m_fcCurrentIndex; //?? - TSharedModificationTracker m_ullCurrentItemProcessedSize; - TSharedModificationTracker m_ullCurrentItemTotalSize; - TSharedModificationTracker m_bCurrentItemSilentResume; + TSharedModificationTracker m_ullCurrentItemProcessedSize; //?? + TSharedModificationTracker m_ullCurrentItemTotalSize; //?? + TSharedModificationTracker m_bCurrentItemSilentResume; //?? mutable TSharedModificationTracker m_tTimer; - TSharedModificationTracker m_iCurrentBufferIndex; + TSharedModificationTracker m_iCurrentBufferIndex; //?? + TSharedModificationTracker m_strCurrentPath; //?? - TSharedModificationTracker m_strCurrentPath; // currently processed path - TSharedModificationTracker m_bIsInitialized; const ESubOperationType m_eSubOperationType; Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -N -ra1f5b3d99f2f175b102d81379698ea1f08e42cce -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -32,7 +32,7 @@ UpdateHasBuffers(); - m_emptyBuffersQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); + m_emptyBuffersQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); } TWriteBufferQueueWrapper::~TWriteBufferQueueWrapper() @@ -77,11 +77,6 @@ return m_spDataQueue->Pop(); } - bool TWriteBufferQueueWrapper::IsBufferReady() const - { - return !m_tRetryBuffers.empty() || m_spDataQueue->HasPoppableBuffer(); - } - size_t TWriteBufferQueueWrapper::GetCount() const { return m_spDataQueue->GetCount(); @@ -92,8 +87,14 @@ return m_eventHasBuffers.Handle(); } + void TWriteBufferQueueWrapper::UpdateHasBuffers(bool bDataQueueHasPoppableBuffer) + { + bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.empty(); + m_eventHasBuffers.SetEvent(bIsReady); + } + void TWriteBufferQueueWrapper::UpdateHasBuffers() { - m_eventHasBuffers.SetEvent(IsBufferReady()); + UpdateHasBuffers(m_spDataQueue->HasPoppableBuffer()); } } Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -N -ra1f5b3d99f2f175b102d81379698ea1f08e42cce -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -42,8 +42,9 @@ HANDLE GetHasBuffersEvent() const; private: - bool IsBufferReady() const; + void UpdateHasBuffers(bool bDataQueueHasPoppableBuffer); void UpdateHasBuffers(); + TOverlappedDataBuffer* InternalPop(); private: Index: src/libchcore/Tests/OverlappedCallbacksTests.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/Tests/OverlappedCallbacksTests.cpp (.../OverlappedCallbacksTests.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/Tests/OverlappedCallbacksTests.cpp (.../OverlappedCallbacksTests.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -35,7 +35,7 @@ logger::TLogFileDataPtr spLogData(std::make_shared()); TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); - TOverlappedProcessorRangePtr spRange(std::make_shared(0)); + TOverlappedProcessorRangePtr spRange(std::make_shared(0)); TOverlappedReader queue(spLogData, spBuffers->GetBufferList(), spRange, 4096); TOverlappedDataBuffer buffer(16384, &queue); @@ -58,7 +58,7 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); TOrderedBufferQueuePtr spBuffersToWrite(std::make_shared(0)); - TOverlappedProcessorRangePtr spRange(std::make_shared(0)); + TOverlappedProcessorRangePtr spRange(std::make_shared(0)); TOverlappedWriter queue(spLogData, spBuffersToWrite, spRange, spBuffers->GetBufferList()); TOverlappedDataBuffer buffer(16384, &queue);