Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -N -r980c1a0de537813728871676200a0960410b11fb -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -24,7 +24,7 @@ namespace chcore { TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : - m_spUnorderedQueue(spUnorderedQueue), + m_spEmptyBuffers(spUnorderedQueue), m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), m_dwChunkSize(dwChunkSize) @@ -34,7 +34,7 @@ if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); - m_emptyBuffersQueueConnector = m_spUnorderedQueue->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); UpdateHasBuffers(); } @@ -44,41 +44,49 @@ m_emptyBuffersQueueConnector.disconnect(); } - void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) + void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(!bKeepPosition) - m_spUnorderedQueue->Push(pBuffer); - else if(IsDataSourceFinished()) + if(IsDataSourceFinished()) { if(!pBuffer->IsLastPart()) { if(pBuffer->GetFilePosition() > m_ullDataSourceFinishedPos) throw TCoreException(eErr_InvalidArgument, L"Adding regular buffer after the queue was marked as finished", LOCATION); - m_tClaimedQueue.Push(pBuffer); + m_tRetryBuffers.Push(pBuffer); } else - m_spUnorderedQueue->Push(pBuffer); + m_spEmptyBuffers->Push(pBuffer); } else - m_tClaimedQueue.Push(pBuffer); + m_tRetryBuffers.Push(pBuffer); UpdateHasBuffers(); } + void TReadBufferQueueWrapper::PushEmpty(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + m_spEmptyBuffers->Push(pBuffer); + + //UpdateHasBuffers(); // already updated using notifier + } + TOverlappedDataBuffer* TReadBufferQueueWrapper::Pop() { if(!IsBufferReady()) return nullptr; // always return retry buffers first - TOverlappedDataBuffer* pBuffer = m_tClaimedQueue.Pop(); + TOverlappedDataBuffer* pBuffer = m_tRetryBuffers.Pop(); if(!pBuffer) { - pBuffer = m_spUnorderedQueue->Pop(); + pBuffer = m_spEmptyBuffers->Pop(); if(pBuffer) { pBuffer->InitForRead(m_ullNextReadPosition, m_dwChunkSize); @@ -95,14 +103,14 @@ bool TReadBufferQueueWrapper::IsBufferReady() const { if(IsDataSourceFinished()) - return !m_tClaimedQueue.empty(); + return !m_tRetryBuffers.empty(); - return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); + return !m_tRetryBuffers.empty() || !m_spEmptyBuffers->IsEmpty(); } size_t TReadBufferQueueWrapper::GetCount() const { - return m_tClaimedQueue.size(); + return m_tRetryBuffers.size(); } void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) @@ -117,22 +125,22 @@ m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); // release superfluous finished buffers - auto iterFind = std::find_if(m_tClaimedQueue.begin(), m_tClaimedQueue.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); - if(iterFind == m_tClaimedQueue.end() || ++iterFind == m_tClaimedQueue.end()) + auto iterFind = std::find_if(m_tRetryBuffers.begin(), m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); + if(iterFind == m_tRetryBuffers.end() || ++iterFind == m_tRetryBuffers.end()) { UpdateHasBuffers(); return; } - auto iterInvalidParts = std::find_if(iterFind, m_tClaimedQueue.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); - if(iterInvalidParts != m_tClaimedQueue.end()) + auto iterInvalidParts = std::find_if(iterFind, m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); + if(iterInvalidParts != m_tRetryBuffers.end()) throw TCoreException(eErr_InvalidArgument, L"Found non-last-parts after last-part", LOCATION); - for(auto iter = iterFind; iter != m_tClaimedQueue.end(); ++iter) + for(auto iter = iterFind; iter != m_tRetryBuffers.end(); ++iter) { - m_spUnorderedQueue->Push(*iter); + m_spEmptyBuffers->Push(*iter); } - m_tClaimedQueue.erase(iterFind, m_tClaimedQueue.end()); + m_tRetryBuffers.erase(iterFind, m_tRetryBuffers.end()); UpdateHasBuffers(); } @@ -153,13 +161,8 @@ m_eventHasBuffers.SetEvent(IsBufferReady()); } - void TReadBufferQueueWrapper::UpdateHasBuffers(bool /*bAdded*/) + void TReadBufferQueueWrapper::ReleaseBuffers() { - UpdateHasBuffers(); + m_tRetryBuffers.ReleaseBuffers(m_spEmptyBuffers); } - - void TReadBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) - { - m_tClaimedQueue.ReleaseBuffers(spBuffers); - } }