Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -26,6 +26,7 @@ namespace chcore { TOrderedBufferQueue::TOrderedBufferQueue(const TBufferListPtr& spEmptyBuffers, unsigned long long ullExpectedPosition) : + m_spBuffersCount(std::make_shared>()), m_spEmptyBuffers(spEmptyBuffers), m_eventHasBuffers(true, false), m_eventHasError(true, false), @@ -48,12 +49,14 @@ if(pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION); - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); auto pairInsert = m_setBuffers.insert(pBuffer); if (!pairInsert.second) throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + m_spBuffersCount->Increase(); + if(pBuffer->IsLastPart()) m_bDataSourceFinished = true; @@ -73,14 +76,16 @@ TOverlappedDataBuffer* TOrderedBufferQueue::Pop() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!InternalHasPoppableBuffer()) return nullptr; TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); m_setBuffers.erase(m_setBuffers.begin()); + m_spBuffersCount->Decrease(); + m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize(); UpdateHasBuffers(); @@ -90,7 +95,7 @@ TOverlappedDataBuffer* TOrderedBufferQueue::PopError() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_pFirstErrorBuffer) return nullptr; @@ -104,7 +109,7 @@ const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_setBuffers.empty()) return *m_setBuffers.begin(); @@ -113,19 +118,19 @@ size_t TOrderedBufferQueue::GetCount() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0); } bool TOrderedBufferQueue::IsEmpty() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return m_setBuffers.empty(); } bool TOrderedBufferQueue::HasPoppableBuffer() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return InternalHasPoppableBuffer(); } @@ -156,13 +161,14 @@ void TOrderedBufferQueue::ClearBuffers() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); for(TOverlappedDataBuffer* pBuffer : m_setBuffers) { m_spEmptyBuffers->Push(pBuffer); } m_setBuffers.clear(); + m_spBuffersCount->SetValue(0); if(m_pFirstErrorBuffer) { @@ -177,16 +183,7 @@ void TOrderedBufferQueue::UpdateHasBuffers() { - if(InternalHasPoppableBuffer()) - { - m_eventHasBuffers.SetEvent(); - m_notifier(true); - } - else - { - m_eventHasBuffers.ResetEvent(); - m_notifier(false); - } + m_eventHasBuffers.SetEvent(InternalHasPoppableBuffer()); } void TOrderedBufferQueue::UpdateHasErrors() @@ -212,18 +209,18 @@ m_eventHasReadingFinished.SetEvent(bFullSequence); } - boost::signals2::signal& TOrderedBufferQueue::GetNotifier() - { - return m_notifier; - } - void TOrderedBufferQueue::UpdateProcessingRange(unsigned long long ullNewPosition) { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_setBuffers.empty()) throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION); m_ullExpectedBufferPosition = ullNewPosition; } + + TSharedCountMTPtr TOrderedBufferQueue::GetSharedCount() + { + return m_spBuffersCount; + } }