Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -20,12 +20,15 @@ #include "TOrderedBufferQueue.h" #include "TOverlappedDataBuffer.h" #include "TCoreException.h" +#include +#include namespace chcore { TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) : m_eventHasBuffers(true, false), m_eventHasError(true, false), + m_eventHasReadingFinished(true, false), m_ullExpectedBufferPosition(ullExpectedPosition) { } @@ -37,10 +40,18 @@ if(pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION); + boost::unique_lock lock(m_mutex); + auto pairInsert = m_setBuffers.insert(pBuffer); if (!pairInsert.second) throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + if(pBuffer->IsLastPart()) + m_bDataSourceFinished = true; + + if(m_bDataSourceFinished) + UpdateReadingFinished(); + if(pBuffer->GetFilePosition() == m_ullErrorPosition) { if(m_pFirstErrorBuffer != nullptr) @@ -54,7 +65,9 @@ TOverlappedDataBuffer* TOrderedBufferQueue::Pop() { - if(!HasPoppableBuffer()) + boost::unique_lock lock(m_mutex); + + if(!InternalHasPoppableBuffer()) return nullptr; TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); @@ -69,6 +82,8 @@ TOverlappedDataBuffer* TOrderedBufferQueue::PopError() { + boost::unique_lock lock(m_mutex); + if(!m_pFirstErrorBuffer) return nullptr; @@ -81,23 +96,34 @@ const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const { + boost::shared_lock lock(m_mutex); + if(!m_setBuffers.empty()) return *m_setBuffers.begin(); return nullptr; } size_t TOrderedBufferQueue::GetCount() const { + boost::shared_lock lock(m_mutex); return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0); } bool TOrderedBufferQueue::IsEmpty() const { + boost::shared_lock lock(m_mutex); return m_setBuffers.empty(); } bool TOrderedBufferQueue::HasPoppableBuffer() const { + boost::shared_lock lock(m_mutex); + + return InternalHasPoppableBuffer(); + } + + bool TOrderedBufferQueue::InternalHasPoppableBuffer() const + { if(m_setBuffers.empty()) return false; @@ -115,11 +141,18 @@ return m_eventHasError.Handle(); } + HANDLE TOrderedBufferQueue::GetHasReadingFinished() const + { + return m_eventHasReadingFinished.Handle(); + } + void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) { if(!spBuffers) throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION); + boost::unique_lock lock(m_mutex); + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) { spBuffers->Push(pBuffer); @@ -139,15 +172,15 @@ void TOrderedBufferQueue::UpdateHasBuffers() { - if(HasPoppableBuffer()) + if(InternalHasPoppableBuffer()) { m_eventHasBuffers.SetEvent(); - m_notifier(); + m_notifier(true); } else { m_eventHasBuffers.ResetEvent(); - m_notifier(); + m_notifier(false); } } @@ -156,13 +189,33 @@ m_eventHasError.SetEvent(m_pFirstErrorBuffer != nullptr); } - boost::signals2::signal& TOrderedBufferQueue::GetNotifier() + void TOrderedBufferQueue::UpdateReadingFinished() { + bool bFullSequence = true; + unsigned long long ullExpected = m_ullExpectedBufferPosition; + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) + { + if(pBuffer->GetFilePosition() != ullExpected) + { + bFullSequence = false; + break; + } + + ullExpected += pBuffer->GetRequestedDataSize(); + } + + m_eventHasReadingFinished.SetEvent(bFullSequence); + } + + boost::signals2::signal& TOrderedBufferQueue::GetNotifier() + { return m_notifier; } void TOrderedBufferQueue::UpdateProcessingRange(unsigned long long ullNewPosition) { + boost::unique_lock lock(m_mutex); + if(!m_setBuffers.empty()) throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION);