Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -N -rda7737de7046ba0ecd255240fb36b4a46584ebf2 -rc4cbf6cd567821f9a981586ab5d8294a26f873be --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision da7737de7046ba0ecd255240fb36b4a46584ebf2) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision c4cbf6cd567821f9a981586ab5d8294a26f873be) @@ -25,16 +25,28 @@ { TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) : m_eventHasBuffers(true, false), + m_eventHasError(true, false), m_ullExpectedBufferPosition(ullExpectedPosition) { } void TOrderedBufferQueue::Push(TOverlappedDataBuffer* pBuffer) { + if(pBuffer->HasError()) + throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION); + auto pairInsert = m_setBuffers.insert(pBuffer); if (!pairInsert.second) throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + if(pBuffer->GetFilePosition() == m_ullErrorPosition) + { + if(m_pFirstErrorBuffer != nullptr) + throw TCoreException(eErr_InternalProblem, L"Buffer with error was not retrieved prior to adding same-by-position buffer without error", LOCATION); + m_ullErrorPosition = NoPosition; + UpdateHasErrors(); + } + UpdateHasBuffers(); } @@ -54,6 +66,18 @@ return pBuffer; } + TOverlappedDataBuffer* TOrderedBufferQueue::PopError() + { + if(!m_pFirstErrorBuffer) + return nullptr; + + TOverlappedDataBuffer* pBuffer = m_pFirstErrorBuffer; + m_pFirstErrorBuffer = nullptr; + UpdateHasErrors(); + + return pBuffer; + } + const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const { if(!m_setBuffers.empty()) @@ -68,7 +92,7 @@ size_t TOrderedBufferQueue::GetCount() const { - return m_setBuffers.size(); + return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0); } bool TOrderedBufferQueue::IsEmpty() const @@ -81,13 +105,28 @@ return m_eventHasBuffers.Handle(); } + HANDLE TOrderedBufferQueue::GetHasErrorEvent() const + { + return m_eventHasError.Handle(); + } + void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) { for(TOverlappedDataBuffer* pBuffer : m_setBuffers) { spBuffers->Push(pBuffer); } m_setBuffers.clear(); + + if(m_pFirstErrorBuffer) + { + spBuffers->Push(m_pFirstErrorBuffer); + m_pFirstErrorBuffer = nullptr; + m_ullErrorPosition = NoPosition; + } + + UpdateHasBuffers(); + UpdateHasErrors(); } void TOrderedBufferQueue::UpdateHasBuffers() @@ -97,4 +136,9 @@ else m_eventHasBuffers.ResetEvent(); } + + void TOrderedBufferQueue::UpdateHasErrors() + { + m_eventHasError.SetEvent(m_pFirstErrorBuffer != nullptr); + } }