Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -N -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,25 +23,46 @@ namespace chcore { - TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : + TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, + unsigned long long ullNextReadPosition, DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, + TSharedCountPtr spOtfBuffersCount, TSharedCountMTPtr spCurrentReadAheadBuffers) : m_spEmptyBuffers(spEmptyBuffers), - m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), - m_dwChunkSize(dwChunkSize) + m_dwChunkSize(dwChunkSize), + m_stMaxOtfBuffers(stMaxOtfBuffers), + m_stMaxReadAheadBuffers(stMaxReadAheadBuffers), + m_spOtfBuffersCount(spOtfBuffersCount), + m_spCurrentReadAheadBuffers(spCurrentReadAheadBuffers), + m_eventHasBuffers(true, false) { if(!spEmptyBuffers) throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); + if(!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); + if(!spCurrentReadAheadBuffers) + throw TCoreException(eErr_InvalidArgument, L"spCurrentReadAheadBuffers is NULL", LOCATION); if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); + if(stMaxOtfBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxOtfBuffers cannot be 0", LOCATION); + if(stMaxReadAheadBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxReadAheadBuffers cannot be 0", LOCATION); - m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetSharedCount()->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_currentReadAheadConnector = m_spCurrentReadAheadBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_retryBuffersConnector = m_tRetryBuffers.GetSharedCount()->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_otfBuffersConnector = m_spOtfBuffersCount->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); UpdateHasBuffers(); } TReadBufferQueueWrapper::~TReadBufferQueueWrapper() { m_emptyBuffersQueueConnector.disconnect(); + m_currentReadAheadConnector.disconnect(); + m_retryBuffersConnector.disconnect(); + m_otfBuffersConnector.disconnect(); } void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) @@ -63,8 +84,6 @@ } else m_tRetryBuffers.Push(pBuffer); - - UpdateHasBuffers(); } void TReadBufferQueueWrapper::PushEmpty(TOverlappedDataBuffer* pBuffer) @@ -73,8 +92,6 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); m_spEmptyBuffers->Push(pBuffer); - - //UpdateHasBuffers(); // already updated using notifier } TOverlappedDataBuffer* TReadBufferQueueWrapper::Pop() @@ -94,23 +111,21 @@ } } - if(pBuffer) - UpdateHasBuffers(); - return pBuffer; } bool TReadBufferQueueWrapper::IsBufferReady() const { - if(IsDataSourceFinished()) - return !m_tRetryBuffers.IsEmpty(); + if(m_spOtfBuffersCount->GetValue() >= m_stMaxOtfBuffers) + return false; - return !m_tRetryBuffers.IsEmpty() || !m_spEmptyBuffers->IsEmpty(); - } + if(m_spCurrentReadAheadBuffers->GetValue() >= m_stMaxReadAheadBuffers) + return false; - size_t TReadBufferQueueWrapper::GetCount() const - { - return m_tRetryBuffers.GetCount(); + if(!m_tRetryBuffers.IsEmpty()) + return true; + + return !IsDataSourceFinished() && !m_spEmptyBuffers->IsEmpty(); } void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer)