Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -N -radf2d680643ef85665b042e03fed274ab8f11180 -re96806b7f8ff7ca7e9f4afbea603e6351a3dc3e3 --- src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision adf2d680643ef85665b042e03fed274ab8f11180) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision e96806b7f8ff7ca7e9f4afbea603e6351a3dc3e3) @@ -24,315 +24,314 @@ #include #include -BEGIN_CHCORE_NAMESPACE - -bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) +namespace chcore { - return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); -} + bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) + { + return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); + } -TOverlappedDataBufferQueue::TOverlappedDataBufferQueue() : - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, true), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) -{ -} + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue() : + m_eventReadPossible(true, false), + m_eventWritePossible(true, false), + m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, true), + m_bDataSourceFinished(false), + m_bDataWritingFinished(false), + m_ullNextReadBufferOrder(0), + m_ullNextWriteBufferOrder(0), + m_ullNextFinishedBufferOrder(0) + { + } -TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize) : - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, false), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) -{ - ReinitializeBuffers(stCount, stBufferSize); -} + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize) : + m_eventReadPossible(true, false), + m_eventWritePossible(true, false), + m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, false), + m_bDataSourceFinished(false), + m_bDataWritingFinished(false), + m_ullNextReadBufferOrder(0), + m_ullNextWriteBufferOrder(0), + m_ullNextFinishedBufferOrder(0) + { + ReinitializeBuffers(stCount, stBufferSize); + } -TOverlappedDataBufferQueue::~TOverlappedDataBufferQueue() -{ -} + TOverlappedDataBufferQueue::~TOverlappedDataBufferQueue() + { + } -TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetEmptyBuffer() -{ - if (!m_listEmptyBuffers.empty()) + TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetEmptyBuffer() { - TOverlappedDataBuffer* pBuffer = m_listEmptyBuffers.front(); - m_listEmptyBuffers.pop_front(); + if (!m_listEmptyBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = m_listEmptyBuffers.front(); + m_listEmptyBuffers.pop_front(); - pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); + pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); - UpdateReadPossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); + UpdateReadPossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); - return pBuffer; + return pBuffer; + } + + return nullptr; } - return nullptr; -} + void TOverlappedDataBufferQueue::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); -void TOverlappedDataBufferQueue::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) -{ - if (!pBuffer) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + m_listEmptyBuffers.push_back(pBuffer); + UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); + } - m_listEmptyBuffers.push_back(pBuffer); - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); -} + void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() + { + if (!m_listEmptyBuffers.empty() && !m_bDataSourceFinished) + m_eventReadPossible.SetEvent(); + else + m_eventReadPossible.ResetEvent(); + } -void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() -{ - if (!m_listEmptyBuffers.empty() && !m_bDataSourceFinished) - m_eventReadPossible.SetEvent(); - else - m_eventReadPossible.ResetEvent(); -} - -TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFullBuffer() -{ - if (!m_setFullBuffers.empty()) + TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFullBuffer() { - TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) - return nullptr; + if (!m_setFullBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); + if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) + return nullptr; - m_setFullBuffers.erase(m_setFullBuffers.begin()); + m_setFullBuffers.erase(m_setFullBuffers.begin()); - // if this is the last part - mark that writing is finished, so that no other buffer will be written - if (pBuffer->IsLastPart()) - m_bDataWritingFinished = true; + // if this is the last part - mark that writing is finished, so that no other buffer will be written + if (pBuffer->IsLastPart()) + m_bDataWritingFinished = true; - ++m_ullNextWriteBufferOrder; + ++m_ullNextWriteBufferOrder; - UpdateWritePossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); + UpdateWritePossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); - return pBuffer; + return pBuffer; + } + + return nullptr; } - return nullptr; -} + void TOverlappedDataBufferQueue::AddFullBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); -void TOverlappedDataBufferQueue::AddFullBuffer(TOverlappedDataBuffer* pBuffer) -{ - if (!pBuffer) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); - std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); + if (pBuffer->IsLastPart()) + m_bDataSourceFinished = true; - if(pBuffer->IsLastPart()) - m_bDataSourceFinished = true; + UpdateWritePossibleEvent(); + UpdateAllBuffersAccountedFor(); + } - UpdateWritePossibleEvent(); - UpdateAllBuffersAccountedFor(); -} - -void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() -{ - if (m_bDataWritingFinished || m_setFullBuffers.empty()) - m_eventWritePossible.ResetEvent(); - else + void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() { - TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) - m_eventWritePossible.SetEvent(); - else + if (m_bDataWritingFinished || m_setFullBuffers.empty()) m_eventWritePossible.ResetEvent(); + else + { + TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); + if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) + m_eventWritePossible.SetEvent(); + else + m_eventWritePossible.ResetEvent(); + } } -} -TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFinishedBuffer() -{ - if(!m_setFinishedBuffers.empty()) + TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFinishedBuffer() { - TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if(pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) - return nullptr; + if (!m_setFinishedBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); + if (pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) + return nullptr; - m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); + m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); - ++m_ullNextFinishedBufferOrder; + ++m_ullNextFinishedBufferOrder; - UpdateWriteFinishedEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); + UpdateWriteFinishedEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); - return pBuffer; + return pBuffer; + } + + return nullptr; } - return nullptr; -} + void TOverlappedDataBufferQueue::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); -void TOverlappedDataBufferQueue::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) -{ - if (!pBuffer) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); - std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); + UpdateWriteFinishedEvent(); + UpdateAllBuffersAccountedFor(); + } - UpdateWriteFinishedEvent(); - UpdateAllBuffersAccountedFor(); -} - -void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() -{ - if (m_setFinishedBuffers.empty()) - m_eventWriteFinished.ResetEvent(); - else + void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() { - TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) - m_eventWriteFinished.SetEvent(); - else + if (m_setFinishedBuffers.empty()) m_eventWriteFinished.ResetEvent(); + else + { + TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); + if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) + m_eventWriteFinished.SetEvent(); + else + m_eventWriteFinished.ResetEvent(); + } } -} -void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() -{ - size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); - if (stCurrentBuffers == m_listAllBuffers.size()) - m_eventAllBuffersAccountedFor.SetEvent(); - else - m_eventAllBuffersAccountedFor.ResetEvent(); -} - -void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) -{ - // sanity check - if any of the buffers are still in use, we can't change the sizes - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) - THROW_CORE_EXCEPTION(eErr_InternalProblem); - if (stBufferSize == 0) - THROW_CORE_EXCEPTION(eErr_InvalidArgument); - - if (stBufferSize != GetSingleBufferSize()) + void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() { - // buffer sizes increased - clear current buffers and proceed with creating new ones - m_listAllBuffers.clear(); - m_listEmptyBuffers.clear(); + size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); + if (stCurrentBuffers == m_listAllBuffers.size()) + m_eventAllBuffersAccountedFor.SetEvent(); + else + m_eventAllBuffersAccountedFor.ResetEvent(); } - else if (stCount == m_listAllBuffers.size()) - return; // nothing really changed - else if (stCount > m_listAllBuffers.size()) - stCount -= m_listAllBuffers.size(); // allocate only the missing buffers - else if (stCount < m_listAllBuffers.size()) + + void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { - // there are too many buffers - reduce - m_listEmptyBuffers.clear(); + // sanity check - if any of the buffers are still in use, we can't change the sizes + if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) + THROW_CORE_EXCEPTION(eErr_InternalProblem); + if (stBufferSize == 0) + THROW_CORE_EXCEPTION(eErr_InvalidArgument); - size_t stCountToRemove = m_listAllBuffers.size() - stCount; + if (stBufferSize != GetSingleBufferSize()) + { + // buffer sizes increased - clear current buffers and proceed with creating new ones + m_listAllBuffers.clear(); + m_listEmptyBuffers.clear(); + } + else if (stCount == m_listAllBuffers.size()) + return; // nothing really changed + else if (stCount > m_listAllBuffers.size()) + stCount -= m_listAllBuffers.size(); // allocate only the missing buffers + else if (stCount < m_listAllBuffers.size()) + { + // there are too many buffers - reduce + m_listEmptyBuffers.clear(); - m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); - for (const auto& upElement : m_listAllBuffers) + size_t stCountToRemove = m_listAllBuffers.size() - stCount; + + m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); + for (const auto& upElement : m_listAllBuffers) + { + m_listEmptyBuffers.push_back(upElement.get()); + } + + UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); + return; + } + + // allocate buffers + while (stCount--) { - m_listEmptyBuffers.push_back(upElement.get()); + auto upBuffer = std::make_unique(stBufferSize, this); + m_listEmptyBuffers.push_back(upBuffer.get()); + m_listAllBuffers.push_back(std::move(upBuffer)); } UpdateReadPossibleEvent(); UpdateAllBuffersAccountedFor(); - return; } - // allocate buffers - while (stCount--) + size_t TOverlappedDataBufferQueue::GetTotalBufferCount() const { - auto upBuffer = std::make_unique(stBufferSize, this); - m_listEmptyBuffers.push_back(upBuffer.get()); - m_listAllBuffers.push_back(std::move(upBuffer)); + return m_listAllBuffers.size(); } - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); -} + size_t TOverlappedDataBufferQueue::GetSingleBufferSize() const + { + if (m_listAllBuffers.empty()) + return 0; -size_t TOverlappedDataBufferQueue::GetTotalBufferCount() const -{ - return m_listAllBuffers.size(); -} + return (*m_listAllBuffers.begin())->GetBufferSize(); + } -size_t TOverlappedDataBufferQueue::GetSingleBufferSize() const -{ - if (m_listAllBuffers.empty()) - return 0; + void TOverlappedDataBufferQueue::DataSourceChanged() + { + CleanupBuffers(); - return (*m_listAllBuffers.begin())->GetBufferSize(); -} + if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) + THROW_CORE_EXCEPTION(eErr_InternalProblem); -void TOverlappedDataBufferQueue::DataSourceChanged() -{ - CleanupBuffers(); + m_bDataSourceFinished = false; + m_bDataWritingFinished = false; + m_ullNextReadBufferOrder = 0; + m_ullNextWriteBufferOrder = 0; + m_ullNextFinishedBufferOrder = 0; - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) - THROW_CORE_EXCEPTION(eErr_InternalProblem); + UpdateReadPossibleEvent(); + m_eventWritePossible.ResetEvent(); + m_eventWriteFinished.ResetEvent(); + } - m_bDataSourceFinished = false; - m_bDataWritingFinished = false; - m_ullNextReadBufferOrder = 0; - m_ullNextWriteBufferOrder = 0; - m_ullNextFinishedBufferOrder = 0; - - UpdateReadPossibleEvent(); - m_eventWritePossible.ResetEvent(); - m_eventWriteFinished.ResetEvent(); -} - -void TOverlappedDataBufferQueue::CleanupBuffers() -{ - // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state - // then this buffer is really the empty one - if (m_bDataSourceFinished && !m_setFullBuffers.empty()) + void TOverlappedDataBufferQueue::CleanupBuffers() { - auto iterCurrent = m_setFullBuffers.begin(); - while (iterCurrent != m_setFullBuffers.end()) + // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state + // then this buffer is really the empty one + if (m_bDataSourceFinished && !m_setFullBuffers.empty()) { - if ((*iterCurrent)->IsLastPart()) + auto iterCurrent = m_setFullBuffers.begin(); + while (iterCurrent != m_setFullBuffers.end()) { - m_listEmptyBuffers.push_back(*iterCurrent); - iterCurrent = m_setFullBuffers.erase(iterCurrent); + if ((*iterCurrent)->IsLastPart()) + { + m_listEmptyBuffers.push_back(*iterCurrent); + iterCurrent = m_setFullBuffers.erase(iterCurrent); + } + else + ++iterCurrent; } - else - ++iterCurrent; } } -} -void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) -{ - enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; - std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; - - bool bExit = false; - while (!bExit) + void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch (dwResult) + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; + + bool bExit = false; + while (!bExit) { - case STATUS_USER_APC: - ATLTRACE(_T("STATUS_USER_APC while waiting for missing buffers\n")); - break; + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + ATLTRACE(_T("STATUS_USER_APC while waiting for missing buffers\n")); + break; - case WAIT_OBJECT_0 + eAllBuffersReturned: - bExit = true; - break; + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; - case WAIT_OBJECT_0 + eKillThread: - bExit = true; - break; + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } } } } - -END_CHCORE_NAMESPACE