Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -N -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,21 +23,29 @@ namespace chcore { - TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue) : + TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue, size_t stMaxOtfBuffers, TSharedCountPtr spOtfBuffersCount) : m_spDataQueue(spQueue), + m_stMaxOtfBuffers(stMaxOtfBuffers), + m_spOtfBuffersCount(spOtfBuffersCount), m_eventHasBuffers(true, false) { if (!spQueue) throw TCoreException(eErr_InvalidArgument, L"spQueue is NULL", LOCATION); + if (!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); + if (stMaxOtfBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxOtfBuffers cannot be 0", LOCATION); UpdateHasBuffers(); - m_dataQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_dataQueueConnector = m_spDataQueue->GetSharedCount()->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); + m_retryBuffersConnector = m_tRetryBuffers.GetSharedCount()->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); } TWriteBufferQueueWrapper::~TWriteBufferQueueWrapper() { m_dataQueueConnector.disconnect(); + m_retryBuffersConnector.disconnect(); } void TWriteBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) @@ -63,18 +71,11 @@ TOverlappedDataBuffer* TWriteBufferQueueWrapper::InternalPop() { - const TOverlappedDataBuffer* pClaimedQueueBuffer = m_tRetryBuffers.Peek(); - if (!pClaimedQueueBuffer) - return m_spDataQueue->Pop(); + TOverlappedDataBuffer* pBuffer = m_tRetryBuffers.Pop(); + if(!pBuffer) + pBuffer = m_spDataQueue->Pop(); - const TOverlappedDataBuffer* pDataQueueBuffer = m_spDataQueue->Peek(); - if (!pDataQueueBuffer) - return m_tRetryBuffers.Pop(); - - if (pClaimedQueueBuffer->GetFilePosition() < pDataQueueBuffer->GetFilePosition()) - return m_tRetryBuffers.Pop(); - else - return m_spDataQueue->Pop(); + return pBuffer; } size_t TWriteBufferQueueWrapper::GetCount() const @@ -93,14 +94,19 @@ m_tRetryBuffers.ClearBuffers(spEmptyBuffers); } - void TWriteBufferQueueWrapper::UpdateHasBuffers(bool bDataQueueHasPoppableBuffer) + bool TWriteBufferQueueWrapper::HasBuffersToProcess() const { - bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.IsEmpty(); - m_eventHasBuffers.SetEvent(bIsReady); + if(m_spOtfBuffersCount->GetValue() >= m_stMaxOtfBuffers) + return false; + + if(!m_tRetryBuffers.IsEmpty()) + return true; + + return m_spDataQueue->HasPoppableBuffer(); } void TWriteBufferQueueWrapper::UpdateHasBuffers() { - UpdateHasBuffers(m_spDataQueue->HasPoppableBuffer()); + m_eventHasBuffers.SetEvent(HasBuffersToProcess()); } }