Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -N -rda7737de7046ba0ecd255240fb36b4a46584ebf2 -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision da7737de7046ba0ecd255240fb36b4a46584ebf2) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) @@ -25,18 +25,25 @@ { TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : m_spUnorderedQueue(spUnorderedQueue), + m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), - m_dwChunkSize(dwChunkSize), - m_eventHasBuffers(true, false) + m_dwChunkSize(dwChunkSize) { if(!spUnorderedQueue) throw TCoreException(eErr_InvalidArgument, L"spUnorderedQueue is NULL", LOCATION); if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); + m_emptyBuffersQueueConnector = m_spUnorderedQueue->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this, _1)); + UpdateHasBuffers(); } + TReadBufferQueueWrapper::~TReadBufferQueueWrapper() + { + m_emptyBuffersQueueConnector.disconnect(); + } + void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { if (!pBuffer) @@ -98,8 +105,8 @@ { if(IsDataSourceFinished()) return !m_tClaimedQueue.empty(); - else - return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); + + return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); } size_t TReadBufferQueueWrapper::GetCount() const @@ -153,6 +160,11 @@ m_eventHasBuffers.SetEvent(IsBufferReady()); } + void TReadBufferQueueWrapper::UpdateHasBuffers(bool /*bAdded*/) + { + UpdateHasBuffers(); + } + void TReadBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) { m_tClaimedQueue.ReleaseBuffers(spBuffers);