Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -N -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -27,266 +27,73 @@ namespace chcore { - /////////////////////////////////////////////////////////////////////////////////// - // class TOverlappedDataBuffer - VOID CALLBACK OverlappedReadCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue() : + m_eventHasBuffers(true, false), + m_eventAllBuffersAccountedFor(true, true) { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); - - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; - TOverlappedDataBufferQueue* pQueue = (TOverlappedDataBufferQueue*)pBuffer->GetParam(); - - // determine if this is the last packet - bool bEof = (dwErrorCode == ERROR_HANDLE_EOF || - pBuffer->GetStatusCode() == STATUS_END_OF_FILE || - (dwErrorCode == ERROR_SUCCESS && dwNumberOfBytesTransfered != pBuffer->GetRequestedDataSize())); - - // reset status code and error code if they pointed out to EOF - if(pBuffer->GetStatusCode() == STATUS_END_OF_FILE) - pBuffer->SetStatusCode(0); - - pBuffer->SetErrorCode(dwErrorCode == ERROR_HANDLE_EOF ? ERROR_SUCCESS : dwErrorCode); - - pBuffer->SetRealDataSize(dwNumberOfBytesTransfered); - pBuffer->SetLastPart(bEof); - - pQueue->AddFullBuffer(pBuffer); } - VOID CALLBACK OverlappedWriteCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize) : + TOverlappedDataBufferQueue() { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); - - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; - TOverlappedDataBufferQueue* pQueue = (TOverlappedDataBufferQueue*)pBuffer->GetParam(); - - pBuffer->SetErrorCode(dwErrorCode); - - pQueue->AddFinishedBuffer(pBuffer); - } - - bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) - { - return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); - } - - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData) : - m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, true), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) - { - } - - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData, size_t stCount, size_t stBufferSize) : - m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, false), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) - { ReinitializeBuffers(stCount, stBufferSize); } TOverlappedDataBufferQueue::~TOverlappedDataBufferQueue() { } - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetEmptyBuffer() + TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetBuffer() { - if (!m_listEmptyBuffers.empty()) + if (!m_dequeBuffers.empty()) { - TOverlappedDataBuffer* pBuffer = m_listEmptyBuffers.front(); - m_listEmptyBuffers.pop_front(); + TOverlappedDataBuffer* pBuffer = m_dequeBuffers.front(); + m_dequeBuffers.pop_front(); - pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); - UpdateReadPossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - return pBuffer; } return nullptr; } - void TOverlappedDataBufferQueue::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + bool TOverlappedDataBufferQueue::AreAllBuffersAccountedFor() const { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer as empty; buffer-order: " << pBuffer->GetBufferOrder(); - - m_listEmptyBuffers.push_back(pBuffer); - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); + return m_dequeBuffers.size() == m_listAllBuffers.size(); } - void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() + void TOverlappedDataBufferQueue::AddBuffer(TOverlappedDataBuffer* pBuffer) { - if (!m_listEmptyBuffers.empty() && !m_bDataSourceFinished) - m_eventReadPossible.SetEvent(); - else - m_eventReadPossible.ResetEvent(); - } - - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFullBuffer() - { - if (!m_setFullBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) - return nullptr; - - m_setFullBuffers.erase(m_setFullBuffers.begin()); - - if(pBuffer->GetErrorCode() == ERROR_SUCCESS) - { - // if this is the last part - mark that writing is finished, so that no other buffer will be written - if(pBuffer->IsLastPart()) - m_bDataWritingFinished = true; - - ++m_ullNextWriteBufferOrder; - } - - UpdateWritePossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - - return nullptr; - } - - void TOverlappedDataBufferQueue::AddFullBuffer(TOverlappedDataBuffer* pBuffer) - { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetBufferOrder() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if (pBuffer->IsLastPart()) - { - m_bDataSourceFinished = true; - UpdateReadPossibleEvent(); - } - - UpdateWritePossibleEvent(); + m_dequeBuffers.push_back(pBuffer); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); } - void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() + void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() { - if (m_bDataWritingFinished || m_setFullBuffers.empty()) - m_eventWritePossible.ResetEvent(); + if (AreAllBuffersAccountedFor()) + m_eventAllBuffersAccountedFor.SetEvent(); else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) - m_eventWritePossible.SetEvent(); - else - m_eventWritePossible.ResetEvent(); - } - } - - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFinishedBuffer() - { - if (!m_setFinishedBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) - return nullptr; - - m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); - - m_eventWriteFinished.ResetEvent(); // faster than UpdateWriteFinishedEvent() and the final effect should be the same m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - - return nullptr; } - void TOverlappedDataBufferQueue::MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) + void TOverlappedDataBufferQueue::UpdateHasBuffers() { - if(!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - // allow next finished buffer to be processed - ++m_ullNextFinishedBufferOrder; - UpdateWriteFinishedEvent(); - } - - void TOverlappedDataBufferQueue::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) - { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetBufferOrder() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - UpdateWriteFinishedEvent(); - UpdateAllBuffersAccountedFor(); - } - - void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() - { - if (m_setFinishedBuffers.empty()) - m_eventWriteFinished.ResetEvent(); + if(!m_dequeBuffers.empty()) + m_eventHasBuffers.SetEvent(); else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) - m_eventWriteFinished.SetEvent(); - else - m_eventWriteFinished.ResetEvent(); - } + m_eventHasBuffers.ResetEvent(); } - void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() - { - size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); - if (stCurrentBuffers == m_listAllBuffers.size()) - m_eventAllBuffersAccountedFor.SetEvent(); - else - m_eventAllBuffersAccountedFor.ResetEvent(); - } - void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { // sanity check - if any of the buffers are still in use, we can't change the sizes - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) + if (m_listAllBuffers.size() != m_dequeBuffers.size()) throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); if (stBufferSize == 0) throw TCoreException(eErr_InvalidArgument, L"stBufferSize", LOCATION); @@ -295,7 +102,7 @@ { // buffer sizes increased - clear current buffers and proceed with creating new ones m_listAllBuffers.clear(); - m_listEmptyBuffers.clear(); + m_dequeBuffers.clear(); } else if (stCount == m_listAllBuffers.size()) return; // nothing really changed @@ -304,30 +111,30 @@ else if (stCount < m_listAllBuffers.size()) { // there are too many buffers - reduce - m_listEmptyBuffers.clear(); + m_dequeBuffers.clear(); size_t stCountToRemove = m_listAllBuffers.size() - stCount; m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); for (const auto& upElement : m_listAllBuffers) { - m_listEmptyBuffers.push_back(upElement.get()); + m_dequeBuffers.push_back(upElement.get()); } - UpdateReadPossibleEvent(); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); return; } // allocate buffers while (stCount--) { - auto upBuffer = std::make_unique(stBufferSize, this); - m_listEmptyBuffers.push_back(upBuffer.get()); + auto upBuffer = std::make_unique(stBufferSize, nullptr); + m_dequeBuffers.push_back(upBuffer.get()); m_listAllBuffers.push_back(std::move(upBuffer)); } - UpdateReadPossibleEvent(); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); } @@ -336,6 +143,11 @@ return m_listAllBuffers.size(); } + size_t TOverlappedDataBufferQueue::GetAvailableBufferCount() const + { + return m_dequeBuffers.size(); + } + size_t TOverlappedDataBufferQueue::GetSingleBufferSize() const { if (m_listAllBuffers.empty()) @@ -344,46 +156,8 @@ return (*m_listAllBuffers.begin())->GetBufferSize(); } - void TOverlappedDataBufferQueue::DataSourceChanged() + void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) const { - CleanupBuffers(); - - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) - throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); - - m_bDataSourceFinished = false; - m_bDataWritingFinished = false; - m_ullNextReadBufferOrder = 0; - m_ullNextWriteBufferOrder = 0; - m_ullNextFinishedBufferOrder = 0; - - UpdateReadPossibleEvent(); - m_eventWritePossible.ResetEvent(); - m_eventWriteFinished.ResetEvent(); - } - - void TOverlappedDataBufferQueue::CleanupBuffers() - { - // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state - // then this buffer is really the empty one - if (m_bDataSourceFinished && !m_setFullBuffers.empty()) - { - auto iterCurrent = m_setFullBuffers.begin(); - while (iterCurrent != m_setFullBuffers.end()) - { - if ((*iterCurrent)->IsLastPart()) - { - m_listEmptyBuffers.push_back(*iterCurrent); - iterCurrent = m_setFullBuffers.erase(iterCurrent); - } - else - ++iterCurrent; - } - } - } - - void TOverlappedDataBufferQueue::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) - { enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; @@ -405,11 +179,5 @@ break; } } - - std::copy(m_setFullBuffers.begin(), m_setFullBuffers.end(), std::back_inserter(m_listEmptyBuffers)); - std::copy(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), std::back_inserter(m_listEmptyBuffers)); - - m_setFinishedBuffers.clear(); - m_setFullBuffers.clear(); } }