Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -24,6 +24,8 @@ #include "TOverlappedDataBuffer.h" #include "TBufferList.h" #include "TCoreException.h" +#include +#include namespace chcore { @@ -51,30 +53,38 @@ HANDLE GetHasBuffersEvent() const; HANDLE GetHasErrorEvent() const; + HANDLE GetHasReadingFinished() const; void ReleaseBuffers(const TBufferListPtr& spBuffers); - boost::signals2::signal& GetNotifier(); + boost::signals2::signal& GetNotifier(); void UpdateProcessingRange(unsigned long long ullNewPosition); private: void UpdateHasBuffers(); void UpdateHasErrors(); + void UpdateReadingFinished(); + bool InternalHasPoppableBuffer() const; + private: using BufferCollection = std::set; BufferCollection m_setBuffers; + mutable boost::shared_mutex m_mutex; + TOverlappedDataBuffer* m_pFirstErrorBuffer = nullptr; unsigned long long m_ullErrorPosition = NoPosition; TEvent m_eventHasBuffers; TEvent m_eventHasError; + TEvent m_eventHasReadingFinished; unsigned long long m_ullExpectedBufferPosition = 0; + bool m_bDataSourceFinished = false; - boost::signals2::signal m_notifier; + boost::signals2::signal m_notifier; }; template @@ -85,6 +95,8 @@ if(!pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push successful buffer to failed queue", LOCATION); + boost::unique_lock lock(m_mutex); + if(!m_pFirstErrorBuffer && m_ullErrorPosition == NoPosition) { m_pFirstErrorBuffer = pBuffer;