Index: src/libchcore/TOrderedBufferQueue.cpp
===================================================================
diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4
--- src/libchcore/TOrderedBufferQueue.cpp	(.../TOrderedBufferQueue.cpp)	(revision 518d1a3484cde91ff17c071211df5e40cb3487e3)
+++ src/libchcore/TOrderedBufferQueue.cpp	(.../TOrderedBufferQueue.cpp)	(revision d99302fce795dbb5139659016a5da7948f141fb4)
@@ -26,6 +26,7 @@
 namespace chcore
 {
 	TOrderedBufferQueue::TOrderedBufferQueue(const TBufferListPtr& spEmptyBuffers, unsigned long long ullExpectedPosition) :
+		m_spBuffersCount(std::make_shared<TSharedCountMT<size_t>>()),
 		m_spEmptyBuffers(spEmptyBuffers),
 		m_eventHasBuffers(true, false),
 		m_eventHasError(true, false),
@@ -48,12 +49,14 @@
 		if(pBuffer->HasError())
 			throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION);
 
-		boost::unique_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		auto pairInsert = m_setBuffers.insert(pBuffer);
 		if (!pairInsert.second)
 			throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION);
 
+		m_spBuffersCount->Increase();
+
 		if(pBuffer->IsLastPart())
 			m_bDataSourceFinished = true;
 
@@ -73,14 +76,16 @@
 
 	TOverlappedDataBuffer* TOrderedBufferQueue::Pop()
 	{
-		boost::unique_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		if(!InternalHasPoppableBuffer())
 			return nullptr;
 
 		TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin();
 		m_setBuffers.erase(m_setBuffers.begin());
 
+		m_spBuffersCount->Decrease();
+
 		m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize();
 
 		UpdateHasBuffers();
@@ -90,7 +95,7 @@
 
 	TOverlappedDataBuffer* TOrderedBufferQueue::PopError()
 	{
-		boost::unique_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		if(!m_pFirstErrorBuffer)
 			return nullptr;
@@ -104,7 +109,7 @@
 
 	const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const
 	{
-		boost::shared_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		if(!m_setBuffers.empty())
 			return *m_setBuffers.begin();
@@ -113,19 +118,19 @@
 
 	size_t TOrderedBufferQueue::GetCount() const
 	{
-		boost::shared_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 		return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0);
 	}
 
 	bool TOrderedBufferQueue::IsEmpty() const
 	{
-		boost::shared_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 		return m_setBuffers.empty();
 	}
 
 	bool TOrderedBufferQueue::HasPoppableBuffer() const
 	{
-		boost::shared_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		return InternalHasPoppableBuffer();
 	}
@@ -156,13 +161,14 @@
 
 	void TOrderedBufferQueue::ClearBuffers()
 	{
-		boost::unique_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		for(TOverlappedDataBuffer* pBuffer : m_setBuffers)
 		{
 			m_spEmptyBuffers->Push(pBuffer);
 		}
 		m_setBuffers.clear();
+		m_spBuffersCount->SetValue(0);
 
 		if(m_pFirstErrorBuffer)
 		{
@@ -177,16 +183,7 @@
 
 	void TOrderedBufferQueue::UpdateHasBuffers()
 	{
-		if(InternalHasPoppableBuffer())
-		{
-			m_eventHasBuffers.SetEvent();
-			m_notifier(true);
-		}
-		else
-		{
-			m_eventHasBuffers.ResetEvent();
-			m_notifier(false);
-		}
+		m_eventHasBuffers.SetEvent(InternalHasPoppableBuffer());
 	}
 
 	void TOrderedBufferQueue::UpdateHasErrors()
@@ -212,18 +209,18 @@
 		m_eventHasReadingFinished.SetEvent(bFullSequence);
 	}
 
-	boost::signals2::signal<void(bool)>& TOrderedBufferQueue::GetNotifier()
-	{
-		return m_notifier;
-	}
-
 	void TOrderedBufferQueue::UpdateProcessingRange(unsigned long long ullNewPosition)
 	{
-		boost::unique_lock<boost::shared_mutex> lock(m_mutex);
+		boost::unique_lock<boost::recursive_mutex> lock(m_mutex);
 
 		if(!m_setBuffers.empty())
 			throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION);
 
 		m_ullExpectedBufferPosition = ullNewPosition;
 	}
+
+	TSharedCountMTPtr<size_t> TOrderedBufferQueue::GetSharedCount()
+	{
+		return m_spBuffersCount;
+	}
 }