Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -N -rda7737de7046ba0ecd255240fb36b4a46584ebf2 -rc4cbf6cd567821f9a981586ab5d8294a26f873be --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision da7737de7046ba0ecd255240fb36b4a46584ebf2) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision c4cbf6cd567821f9a981586ab5d8294a26f873be) @@ -23,6 +23,7 @@ #include "TEvent.h" #include "TOverlappedDataBuffer.h" #include "TBufferList.h" +#include "TCoreException.h" namespace chcore { @@ -35,27 +36,70 @@ TOrderedBufferQueue(unsigned long long ullExpectedPosition); void Push(TOverlappedDataBuffer* pBuffer); + + template + void PushError(TOverlappedDataBuffer* pBuffer, T& rRetryQueue); + TOverlappedDataBuffer* Pop(); + TOverlappedDataBuffer* PopError(); + const TOverlappedDataBuffer* const Peek() const; size_t GetCount() const; + bool IsEmpty() const; HANDLE GetHasBuffersEvent() const; + HANDLE GetHasErrorEvent() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: bool IsBufferReady() const; void UpdateHasBuffers(); + void UpdateHasErrors(); private: using BufferCollection = std::set; - BufferCollection m_setBuffers; + + TOverlappedDataBuffer* m_pFirstErrorBuffer = nullptr; + unsigned long long m_ullErrorPosition = NoPosition; + TEvent m_eventHasBuffers; - unsigned long long m_ullExpectedBufferPosition = NoPosition; + TEvent m_eventHasError; + + unsigned long long m_ullExpectedBufferPosition = 0; }; + template + void TOrderedBufferQueue::PushError(TOverlappedDataBuffer* pBuffer, T& rRetryQueue) + { + if(!pBuffer->HasError()) + throw TCoreException(eErr_InvalidArgument, L"Cannot push successful buffer to failed queue", LOCATION); + + if(!m_pFirstErrorBuffer && m_ullErrorPosition == NoPosition) + { + m_pFirstErrorBuffer = pBuffer; + m_ullErrorPosition = pBuffer->GetFilePosition(); + UpdateHasErrors(); + return; + } + + if(pBuffer->GetFilePosition() < m_ullErrorPosition) + { + rRetryQueue.Push(m_pFirstErrorBuffer, true); + m_pFirstErrorBuffer = pBuffer; + m_ullErrorPosition = pBuffer->GetFilePosition(); + } + else if(pBuffer->GetFilePosition() > m_ullErrorPosition) + rRetryQueue.Push(pBuffer, true); + else + throw TCoreException(eErr_InvalidArgument, L"Trying to push the same buffer again", LOCATION); + + UpdateHasErrors(); + } + using TOrderedBufferQueuePtr = std::shared_ptr; }