Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -N -refe016ef1d0cb0cf1ba379dbe3693e35f6a2361e -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision efe016ef1d0cb0cf1ba379dbe3693e35f6a2361e) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -21,6 +21,8 @@ #include "TOverlappedDataBuffer.h" #include "TCoreException.h" #include "ErrorCodes.h" +#include +#include BEGIN_CHCORE_NAMESPACE @@ -33,9 +35,11 @@ m_eventReadPossible(true, false), m_eventWritePossible(true, false), m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, true), m_stBufferSize(0), m_ullNextExpectedWritePosition(0), m_bDataSourceFinished(false), + m_bDataWritingFinished(false), m_ullNextReadBufferOrder(0), m_ullNextWriteBufferOrder(0), m_ullNextFinishedBufferOrder(0) @@ -46,9 +50,11 @@ m_eventReadPossible(true, false), m_eventWritePossible(true, false), m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, false), m_stBufferSize(0), m_ullNextExpectedWritePosition(0), m_bDataSourceFinished(false), + m_bDataWritingFinished(false), m_ullNextReadBufferOrder(0), m_ullNextWriteBufferOrder(0), m_ullNextFinishedBufferOrder(0) @@ -70,6 +76,7 @@ pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); UpdateReadPossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -84,6 +91,7 @@ m_listEmptyBuffers.push_back(pBuffer); UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() @@ -108,6 +116,7 @@ ++m_ullNextWriteBufferOrder; UpdateWritePossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -124,22 +133,24 @@ // then it can be treated as an empty buffer if (pBuffer->IsLastPart() && m_bDataSourceFinished && pBuffer->GetBytesTransferred() == 0) { - // not using AddEmptyBuffer() as we there is no need for changing the signals (they are already set correctly) - m_listEmptyBuffers.push_back(pBuffer); + AddEmptyBuffer(pBuffer); return; } - m_setFullBuffers.insert(pBuffer); + std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); if(pBuffer->IsLastPart()) m_bDataSourceFinished = true; UpdateWritePossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() { - if (m_setFullBuffers.empty()) + if (m_bDataWritingFinished || m_setFullBuffers.empty()) m_eventWritePossible.ResetEvent(); else { @@ -163,7 +174,12 @@ ++m_ullNextFinishedBufferOrder; + // if this is the last part - mark that writing is finished, so that no other buffer will be written + if (pBuffer->IsLastPart()) + m_bDataWritingFinished = true; + UpdateWriteFinishedEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -176,9 +192,12 @@ if (!pBuffer) THROW_CORE_EXCEPTION(eErr_InvalidPointer); - m_setFinishedBuffers.insert(pBuffer); + std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); UpdateWriteFinishedEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() @@ -195,6 +214,15 @@ } } +void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() +{ + size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); + if (stCurrentBuffers == m_listAllBuffers.size()) + m_eventAllBuffersAccountedFor.SetEvent(); + else + m_eventAllBuffersAccountedFor.ResetEvent(); +} + void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { // sanity check - if any of the buffers are still in use, we can't change the sizes @@ -227,6 +255,7 @@ } UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); return; } @@ -240,6 +269,7 @@ m_stBufferSize = stCount; UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::DataSourceChanged() @@ -250,11 +280,12 @@ THROW_CORE_EXCEPTION(eErr_InternalProblem); m_bDataSourceFinished = false; + m_bDataWritingFinished = false; m_ullNextReadBufferOrder = 0; m_ullNextWriteBufferOrder = 0; m_ullNextFinishedBufferOrder = 0; - m_eventReadPossible.SetEvent(); + UpdateReadPossibleEvent(); m_eventWritePossible.ResetEvent(); m_eventWriteFinished.ResetEvent(); } @@ -279,4 +310,30 @@ } } +void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) +{ + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; + + bool bExit = false; + while (!bExit) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + ATLTRACE(_T("STATUS_USER_APC while waiting for missing buffers\n")); + break; + + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; + + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } + } +} + END_CHCORE_NAMESPACE