Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -N -re0588f4598dea526e0869360a0f5ee278e7902a0 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision e0588f4598dea526e0869360a0f5ee278e7902a0) +++ src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -26,19 +26,13 @@ namespace chcore { TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spMemoryPool, - file_size_t ullFilePos, DWORD dwChunkSize) : + unsigned long long ullFilePos, DWORD dwChunkSize) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), m_spMemoryPool(spMemoryPool), - m_eventReadPossible(true, true), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, true), - m_bDataSourceFinished(false), + m_tReader(spLogFileData, spMemoryPool->GetBufferList(), ullFilePos, dwChunkSize), + m_tWriter(spLogFileData, m_tReader.GetFinishedQueue(), ullFilePos), m_bDataWritingFinished(false), - m_dwDataChunkSize(dwChunkSize), - m_ullNextReadBufferOrder(ullFilePos), - m_ullNextWriteBufferOrder(ullFilePos), - m_ullNextFinishedBufferOrder(ullFilePos) + m_eventAllBuffersAccountedFor(true, true) { if(!spMemoryPool) throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); @@ -50,156 +44,72 @@ TOverlappedDataBuffer* TOverlappedReaderWriter::GetEmptyBuffer() { - TOverlappedDataBuffer* pBuffer = nullptr; - - // return buffers to re-read if exists - if(!m_setEmptyBuffers.empty()) - pBuffer = m_setEmptyBuffers.pop_front(); - else + TOverlappedDataBuffer* pBuffer = m_tReader.GetEmptyBuffer(); + if(pBuffer) { - // get empty buffer and initialize - pBuffer = m_spMemoryPool->GetBuffer(); - if(pBuffer) - { - pBuffer->SetParam(this); - pBuffer->InitForRead(m_ullNextReadBufferOrder, m_dwDataChunkSize); - - m_ullNextReadBufferOrder += m_dwDataChunkSize; - } + pBuffer->SetParam(this); + UpdateAllBuffersAccountedFor(); } - // reset the accounted-for event only if we managed to get the pointer, otherwise nothing is changing - if(pBuffer) - m_eventAllBuffersAccountedFor.ResetEvent(); - - UpdateReadPossibleEvent(); // update read-possible always - if we're getting null with read-possible event set (which we should not), we need to reset it - return pBuffer; } - void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer for re-read; buffer-order: " << pBuffer->GetFilePosition(); - - m_setEmptyBuffers.insert(pBuffer); - - m_eventReadPossible.SetEvent(); + m_tReader.AddEmptyBuffer(pBuffer, bKeepPosition); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFailedReadBuffer() { - if(!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + TOverlappedDataBuffer* pBuffer = m_tReader.GetFailedReadBuffer(); - LOG_TRACE(m_spLog) << L"Releasing empty buffer; buffer-order: " << pBuffer->GetFilePosition(); + if(pBuffer) + UpdateAllBuffersAccountedFor(); - m_spMemoryPool->AddBuffer(pBuffer); - - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); + return pBuffer; } - void TOverlappedReaderWriter::UpdateReadPossibleEvent() + void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) { - if(!m_setEmptyBuffers.empty() || (!m_bDataSourceFinished && m_spMemoryPool->HasBuffers())) - m_eventReadPossible.SetEvent(); - else - m_eventReadPossible.ResetEvent(); + m_tReader.AddFailedReadBuffer(pBuffer); + UpdateAllBuffersAccountedFor(); } - TOverlappedDataBuffer* TOverlappedReaderWriter::GetFullBuffer() + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedReadBuffer() { - if (!m_setFullBuffers.empty()) + TOverlappedDataBuffer* pBuffer = m_tReader.GetFullBuffer(); + + if(pBuffer) { - TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetFilePosition() != m_ullNextWriteBufferOrder) - return nullptr; + if (pBuffer->IsLastPart()) + m_bDataWritingFinished = true; - m_setFullBuffers.erase(m_setFullBuffers.begin()); + pBuffer->SetParam(this); - if(!pBuffer->HasError()) - { - // if this is the last part - mark that writing is finished, so that no other buffer will be written - if(pBuffer->IsLastPart()) - m_bDataWritingFinished = true; - - m_ullNextWriteBufferOrder += m_dwDataChunkSize; - } - - UpdateWritePossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; + UpdateAllBuffersAccountedFor(); } - return nullptr; + return pBuffer; } - void TOverlappedReaderWriter::AddFullBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFinishedReadBuffer(TOverlappedDataBuffer* pBuffer) { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + m_tReader.AddFullBuffer(pBuffer); - if(pBuffer->HasError()) - { - if(pBuffer->GetFilePosition() < m_ullReadErrorOrder) - { - // case: new buffer failed at even earlier position in file than the one that failed previously (should also work for numeric_limits::max()) - // - move existing buffers with errors to failed read buffers, add current one to full queue - m_ullReadErrorOrder = pBuffer->GetFilePosition(); + UpdateAllBuffersAccountedFor(); + } - TOrderedBufferQueue newQueue; + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFailedWriteBuffer() + { + TOverlappedDataBuffer* pBuffer = m_tWriter.GetFailedWriteBuffer(); - for(TOverlappedDataBuffer* pBuf : m_setFullBuffers) - { - if(pBuf->HasError()) - AddFailedReadBuffer(pBuf); - else - newQueue.insert(pBuf); - } - - if(newQueue.size() != m_setFullBuffers.size()) - std::swap(m_setFullBuffers, newQueue); - } - else if(pBuffer->GetFilePosition() > m_ullReadErrorOrder) - { - // case: new buffer failed at position later than the one that failed before - add to failed buffers - // for retry - AddFailedReadBuffer(pBuffer); - return; - } - //else -> case: we've received the same buffer that failed before; add to normal full queue for user to handle that - } - else if(m_ullReadErrorOrder == pBuffer->GetFilePosition()) - { - // case: adding correctly read buffer that previously failed to read; clear the error flag and add full buffer - m_ullReadErrorOrder = NoIoError; - } - - LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if (pBuffer->IsLastPart()) - m_bDataSourceFinished = true; - - UpdateWritePossibleEvent(); UpdateAllBuffersAccountedFor(); + + return pBuffer; } - void TOverlappedReaderWriter::AddFailedFullBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); @@ -214,48 +124,18 @@ // overwrite error code (to avoid treating the buffer as failed read) pBuffer->SetErrorCode(ERROR_SUCCESS); + m_tWriter.AddFailedWriteBuffer(pBuffer); - auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if(!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if(pBuffer->IsLastPart()) - m_bDataSourceFinished = true; - - UpdateWritePossibleEvent(); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::UpdateWritePossibleEvent() + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedWriteBuffer() { - if (m_bDataWritingFinished || m_setFullBuffers.empty()) - m_eventWritePossible.ResetEvent(); - else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetFilePosition() == m_ullNextWriteBufferOrder) - m_eventWritePossible.SetEvent(); - else - m_eventWritePossible.ResetEvent(); - } - } + TOverlappedDataBuffer* pBuffer = m_tWriter.GetFinishedBuffer(); - TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedBuffer() - { - if (!m_setFinishedBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if (pBuffer->GetFilePosition() != m_ullNextFinishedBufferOrder) - return nullptr; + if(!pBuffer) + UpdateAllBuffersAccountedFor(); - m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); - - m_eventWriteFinished.ResetEvent(); // faster than UpdateWriteFinishedEvent() and the final effect should be the same - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - return nullptr; } @@ -265,11 +145,10 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); // allow next finished buffer to be processed - m_ullNextFinishedBufferOrder += m_dwDataChunkSize; - UpdateWriteFinishedEvent(); + //m_ullNextFinishedBufferOrder += m_dwDataChunkSize; } - void TOverlappedReaderWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFinishedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); @@ -282,74 +161,20 @@ L", status-code: " << pBuffer->GetStatusCode() << L", is-last-part: " << pBuffer->IsLastPart(); - auto pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); + m_tWriter.AddFinishedBuffer(pBuffer); - UpdateWriteFinishedEvent(); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::UpdateWriteFinishedEvent() - { - if (m_setFinishedBuffers.empty()) - m_eventWriteFinished.ResetEvent(); - else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetFilePosition() == m_ullNextFinishedBufferOrder) - m_eventWriteFinished.SetEvent(); - else - m_eventWriteFinished.ResetEvent(); - } - } - void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() { - size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_setFullBuffers.size() + m_setFinishedBuffers.size() + m_setEmptyBuffers.size(); + size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_tReader.GetBufferCount() + m_tWriter.GetBufferCount(); if (stCurrentBuffers == m_spMemoryPool->GetTotalBufferCount()) m_eventAllBuffersAccountedFor.SetEvent(); else m_eventAllBuffersAccountedFor.ResetEvent(); } - void TOverlappedReaderWriter::DataSourceChanged() - { - CleanupBuffers(); - - if (!m_spMemoryPool->AreAllBuffersAccountedFor()) - throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); - - m_bDataSourceFinished = false; - m_bDataWritingFinished = false; - m_ullNextReadBufferOrder = 0; - m_ullNextWriteBufferOrder = 0; - m_ullNextFinishedBufferOrder = 0; - - m_eventWritePossible.ResetEvent(); - m_eventWriteFinished.ResetEvent(); - } - - void TOverlappedReaderWriter::CleanupBuffers() - { - // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state - // then this buffer is really the empty one - if (m_bDataSourceFinished && !m_setFullBuffers.empty()) - { - auto iterCurrent = m_setFullBuffers.begin(); - while (iterCurrent != m_setFullBuffers.end()) - { - if ((*iterCurrent)->IsLastPart()) - { - m_spMemoryPool->AddBuffer(*iterCurrent); - iterCurrent = m_setFullBuffers.erase(iterCurrent); - } - else - ++iterCurrent; - } - } - } - void TOverlappedReaderWriter::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) { enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; @@ -373,13 +198,14 @@ break; } } +/* auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spMemoryPool->AddBuffer(pBuffer); }; - std::for_each(m_setFullBuffers.begin(), m_setFullBuffers.end(), funcAdd); - std::for_each(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), funcAdd); + std::for_each(m_spFailedWriteBuffers->begin(), m_spFailedWriteBuffers->end(), funcAdd); + std::for_each(m_spFinishedBuffers.begin(), m_spFinishedBuffers.end(), funcAdd); - m_setFinishedBuffers.clear(); - m_setFullBuffers.clear(); + m_spFinishedBuffers.clear(); + m_spFailedWriteBuffers->clear();*/ } }