Index: src/libchcore/TFailedBufferQueue.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TFailedBufferQueue.cpp (.../TFailedBufferQueue.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TFailedBufferQueue.cpp (.../TFailedBufferQueue.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -87,6 +87,15 @@ return m_eventHasBuffers.Handle(); } + void TFailedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) + { + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) + { + spBuffers->Push(pBuffer); + } + m_setBuffers.clear(); + } + void TFailedBufferQueue::UpdateHasBuffers() { if(IsBufferReady()) Index: src/libchcore/TFailedBufferQueue.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TFailedBufferQueue.h (.../TFailedBufferQueue.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TFailedBufferQueue.h (.../TFailedBufferQueue.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -22,6 +22,8 @@ #include #include "TEvent.h" #include "TOverlappedDataBuffer.h" +#include "TCoreException.h" +#include "TBufferList.h" namespace chcore { @@ -51,7 +53,11 @@ if(pBuf->HasError()) rRetryQueue.Push(pBuf, true); else - newQueue.insert(pBuf); + { + auto pairInsert = newQueue.insert(pBuf); + if (!pairInsert.second) + throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + } } if(newQueue.size() != m_setBuffers.size()) @@ -72,7 +78,10 @@ m_ullErrorPosition = NoPosition; } - m_setBuffers.insert(pBuffer); + auto pairInsert = m_setBuffers.insert(pBuffer); + if (!pairInsert.second) + throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + UpdateHasBuffers(); } @@ -85,6 +94,7 @@ bool IsEmpty() const; HANDLE GetHasBuffersEvent() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: bool IsBufferReady() const; Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -19,6 +19,7 @@ #include "stdafx.h" #include "TOrderedBufferQueue.h" #include "TOverlappedDataBuffer.h" +#include "TCoreException.h" namespace chcore { @@ -35,7 +36,10 @@ void TOrderedBufferQueue::Push(TOverlappedDataBuffer* pBuffer) { - m_setBuffers.insert(pBuffer); + auto pairInsert = m_setBuffers.insert(pBuffer); + if (!pairInsert.second) + throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + UpdateHasBuffers(); } @@ -47,7 +51,7 @@ TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); m_setBuffers.erase(m_setBuffers.begin()); - if(!pBuffer->HasError()) + if(!pBuffer->HasError() && m_ullExpectedBufferPosition != NoPosition) m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize(); UpdateHasBuffers(); @@ -89,6 +93,15 @@ return m_eventHasBuffers.Handle(); } + void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers) + { + for(TOverlappedDataBuffer* pBuffer : m_setBuffers) + { + spBuffers->Push(pBuffer); + } + m_setBuffers.clear(); + } + void TOrderedBufferQueue::UpdateHasBuffers() { if(!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || Peek()->GetFilePosition() == m_ullExpectedBufferPosition)) Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -22,6 +22,7 @@ #include #include "TEvent.h" #include "TOverlappedDataBuffer.h" +#include "TBufferList.h" namespace chcore { @@ -44,6 +45,7 @@ bool IsEmpty() const; HANDLE GetHasBuffersEvent() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: bool IsBufferReady() const; Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -73,6 +73,9 @@ void TOverlappedReader::AddFullBuffer(TOverlappedDataBuffer* pBuffer) { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetFilePosition() << L", requested-data-size: " << pBuffer->GetRequestedDataSize() << L", real-data-size: " << pBuffer->GetRealDataSize() << @@ -87,18 +90,20 @@ m_spFullBuffers->Push(pBuffer); } - TOverlappedDataBuffer* TOverlappedReader::GetFullBuffer() - { - return m_spFullBuffers->Pop(); - } - TOrderedBufferQueuePtr TOverlappedReader::GetFinishedQueue() const { return m_spFullBuffers; } size_t TOverlappedReader::GetBufferCount() const { - return m_tFailedReadBuffers.GetCount() + m_spFullBuffers->GetCount(); + return m_tEmptyBuffers.GetCount() + m_tFailedReadBuffers.GetCount() + m_spFullBuffers->GetCount(); } + + void TOverlappedReader::ReleaseBuffers(const TBufferListPtr& spBuffers) + { + m_tEmptyBuffers.ReleaseBuffers(spBuffers); + m_tFailedReadBuffers.ReleaseBuffers(spBuffers); + m_spFullBuffers->ReleaseBuffers(spBuffers); + } } Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -48,7 +48,6 @@ TOverlappedDataBuffer* GetFailedReadBuffer(); void AddFullBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetFullBuffer(); TOrderedBufferQueuePtr GetFinishedQueue() const; @@ -57,10 +56,11 @@ // event access HANDLE GetEventReadPossibleHandle() const { return m_tEmptyBuffers.GetHasBuffersEvent(); } - HANDLE GetEventReadFailedHandle() const { return m_tEmptyBuffers.GetHasBuffersEvent(); } + HANDLE GetEventReadFailedHandle() const { return m_tFailedReadBuffers.GetHasBuffersEvent(); } HANDLE GetEventReadFinishedHandle() const { return m_spFullBuffers->GetHasBuffersEvent(); } size_t GetBufferCount() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: logger::TLoggerPtr m_spLog; Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -22,6 +22,7 @@ #include "TCoreException.h" #include "ErrorCodes.h" #include +#include namespace chcore { @@ -56,6 +57,17 @@ void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as empty; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + m_tReader.AddEmptyBuffer(pBuffer, bKeepPosition); UpdateAllBuffersAccountedFor(); } @@ -72,13 +84,24 @@ void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-read; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + m_tReader.AddFailedReadBuffer(pBuffer); UpdateAllBuffersAccountedFor(); } - TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedReadBuffer() + TOverlappedDataBuffer* TOverlappedReaderWriter::GetWriteBuffer() { - TOverlappedDataBuffer* pBuffer = m_tReader.GetFullBuffer(); + TOverlappedDataBuffer* pBuffer = m_tWriter.GetWriteBuffer(); if(pBuffer) { @@ -95,6 +118,16 @@ void TOverlappedReaderWriter::AddFinishedReadBuffer(TOverlappedDataBuffer* pBuffer) { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-read; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); m_tReader.AddFullBuffer(pBuffer); UpdateAllBuffersAccountedFor(); @@ -103,9 +136,9 @@ TOverlappedDataBuffer* TOverlappedReaderWriter::GetFailedWriteBuffer() { TOverlappedDataBuffer* pBuffer = m_tWriter.GetFailedWriteBuffer(); + if(pBuffer) + UpdateAllBuffersAccountedFor(); - UpdateAllBuffersAccountedFor(); - return pBuffer; } @@ -114,7 +147,7 @@ if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as full (failed); buffer-order: " << pBuffer->GetFilePosition() << + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-write; buffer-order: " << pBuffer->GetFilePosition() << L", requested-data-size: " << pBuffer->GetRequestedDataSize() << L", real-data-size: " << pBuffer->GetRealDataSize() << L", file-position: " << pBuffer->GetFilePosition() << @@ -133,27 +166,19 @@ { TOverlappedDataBuffer* pBuffer = m_tWriter.GetFinishedBuffer(); - if(!pBuffer) + if(pBuffer) UpdateAllBuffersAccountedFor(); - return nullptr; + return pBuffer; } - void TOverlappedReaderWriter::MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) - { - if(!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - // allow next finished buffer to be processed - //m_ullNextFinishedBufferOrder += m_dwDataChunkSize; - } - void TOverlappedReaderWriter::AddFinishedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetFilePosition() << + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-write; buffer-order: " << pBuffer->GetFilePosition() << L", requested-data-size: " << pBuffer->GetRequestedDataSize() << L", real-data-size: " << pBuffer->GetRealDataSize() << L", file-position: " << pBuffer->GetFilePosition() << @@ -166,6 +191,21 @@ UpdateAllBuffersAccountedFor(); } + void TOverlappedReaderWriter::MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Marking buffer " << pBuffer << L" as finalized-write; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + m_tWriter.MarkAsFinalized(pBuffer); + } void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() { size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_tReader.GetBufferCount() + m_tWriter.GetBufferCount(); @@ -198,14 +238,8 @@ break; } } -/* - auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spMemoryPool->AddBuffer(pBuffer); }; - - std::for_each(m_spFailedWriteBuffers->begin(), m_spFailedWriteBuffers->end(), funcAdd); - std::for_each(m_spFinishedBuffers.begin(), m_spFinishedBuffers.end(), funcAdd); - - m_spFinishedBuffers.clear(); - m_spFailedWriteBuffers->clear();*/ + m_tReader.ReleaseBuffers(m_spMemoryPool->GetBufferList()); + m_tWriter.ReleaseBuffers(m_spMemoryPool->GetBufferList()); } } Index: src/libchcore/TOverlappedReaderWriter.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -49,10 +49,11 @@ TOverlappedDataBuffer* GetFailedReadBuffer(); void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetFinishedReadBuffer(); void AddFinishedReadBuffer(TOverlappedDataBuffer* pBuffer); // buffer management - writer + TOverlappedDataBuffer* GetWriteBuffer(); + TOverlappedDataBuffer* GetFailedWriteBuffer(); void AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer); Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -40,6 +40,11 @@ { } + TOverlappedDataBuffer* TOverlappedWriter::GetWriteBuffer() + { + return m_tBuffersToWrite.Pop(); + } + void TOverlappedWriter::AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) @@ -66,9 +71,30 @@ TOverlappedDataBuffer* TOverlappedWriter::GetFinishedBuffer() { - return m_tFinishedBuffers.Pop(); + TOverlappedDataBuffer* pBuffer = m_tFinishedBuffers.Pop(); + + if (pBuffer && pBuffer->IsLastPart()) + { + if (m_pLastPartBuffer != nullptr) + throw TCoreException(eErr_InternalProblem, L"Encountered another 'last-part' finished buffer", LOCATION); + m_pLastPartBuffer = pBuffer; + } + + return pBuffer; } + void TOverlappedWriter::MarkAsFinalized(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + if (pBuffer != m_pLastPartBuffer) + throw TCoreException(eErr_InvalidArgument, L"Trying to mark different buffer as finalized", LOCATION); + + m_bDataWritingFinished = true; + m_pLastPartBuffer = nullptr; + } + void TOverlappedWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) @@ -87,6 +113,13 @@ size_t TOverlappedWriter::GetBufferCount() const { - return m_tBuffersToWrite.GetCount() + m_tFailedWriteBuffers.GetCount() + m_tFinishedBuffers.GetCount(); + return m_tFailedWriteBuffers.GetCount() + m_tFinishedBuffers.GetCount(); } + + void TOverlappedWriter::ReleaseBuffers(const TBufferListPtr& spList) + { + m_tBuffersToWrite.ReleaseBuffers(spList); + m_tFailedWriteBuffers.ReleaseBuffers(spList); + m_tFinishedBuffers.ReleaseBuffers(spList); + } } Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -24,6 +24,7 @@ #include "TOrderedBufferQueue.h" #include "TFailedBufferQueue.h" #include "TWriteBufferQueueWrapper.h" +#include "TBufferList.h" namespace chcore { @@ -40,6 +41,8 @@ TOverlappedWriter& operator=(const TOverlappedWriter&) = delete; + TOverlappedDataBuffer* GetWriteBuffer(); + // buffer management - writer void AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* GetFailedWriteBuffer(); @@ -48,6 +51,7 @@ TOverlappedDataBuffer* GetFinishedBuffer(); // processing info + void MarkAsFinalized(TOverlappedDataBuffer* pBuffer); bool IsDataWritingFinished() const { return m_bDataWritingFinished; } // event access @@ -56,6 +60,7 @@ HANDLE GetEventWriteFinishedHandle() const { return m_tFinishedBuffers.GetHasBuffersEvent(); } size_t GetBufferCount() const; + void ReleaseBuffers(const TBufferListPtr& spList); private: logger::TLoggerPtr m_spLog; @@ -66,6 +71,7 @@ TOrderedBufferQueue m_tFinishedBuffers; bool m_bDataWritingFinished = false; // output file was already written to the end + TOverlappedDataBuffer* m_pLastPartBuffer = nullptr; }; } Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -19,6 +19,7 @@ #include "stdafx.h" #include "TReadBufferQueueWrapper.h" #include "TOverlappedDataBuffer.h" +#include "TCoreException.h" namespace chcore { @@ -28,10 +29,14 @@ m_dwChunkSize(dwChunkSize), m_eventHasBuffers(true, false) { + UpdateHasBuffers(); } void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + if(!bKeepPosition) { if(IsDataSourceFinished()) @@ -100,7 +105,7 @@ size_t TReadBufferQueueWrapper::GetCount() const { - return m_spUnorderedQueue->GetCount(); + return m_tClaimedQueue.GetCount(); } bool TReadBufferQueueWrapper::IsEmpty() const @@ -116,6 +121,7 @@ { m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); // #todo: release excessive claimed buffers + UpdateHasBuffers(); } } } @@ -134,4 +140,9 @@ { m_eventHasBuffers.SetEvent(IsBufferReady()); } + + void TReadBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) + { + m_tClaimedQueue.ReleaseBuffers(spBuffers); + } } Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -49,6 +49,7 @@ bool IsDataSourceFinished() const; HANDLE GetHasBuffersEvent() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: void UpdateHasBuffers(); Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -384,7 +384,10 @@ // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data - enum { eWriteFinished, eKillThread, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount }; + enum + { + eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount + }; std::array arrHandles = { rThreadController.GetKillThreadHandle(), tReaderWriter.GetEventWriteFinishedHandle(), @@ -469,13 +472,10 @@ } case WAIT_OBJECT_0 + eWritePossible: { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedReadBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetWriteBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); - // was there an error reported? - pBuffer->InitForWrite(); - eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { @@ -528,44 +528,47 @@ if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); - eResult = tFileFBWrapper.FinalizeFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); - if (eResult != TSubTaskBase::eSubResult_Continue) + if(pBuffer->IsLastPart()) { - tReaderWriter.AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - } - else if (bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer, false); + eResult = tFileFBWrapper.FinalizeFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); + if (eResult != TSubTaskBase::eSubResult_Continue) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + break; + } + else if (bSkip) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); - AdjustProcessedSizeForSkip(pData->spSrcFile); + AdjustProcessedSizeForSkip(pData->spSrcFile); - pData->bProcessed = false; - bStopProcessing = true; + pData->bProcessed = false; + bStopProcessing = true; + break; + } } - else - { - file_size_t fsWritten = pBuffer->GetRealDataSize(); - // in case we read past the original eof, try to get new file size from filesystem - AdjustProcessedSize(fsWritten, pData->spSrcFile, fileSrc); + file_size_t fsWritten = pBuffer->GetRealDataSize(); - // stop iterating through file - bStopProcessing = pBuffer->IsLastPart(); + // in case we read past the original eof, try to get new file size from filesystem + AdjustProcessedSize(fsWritten, pData->spSrcFile, fileSrc); + // stop iterating through file + bStopProcessing = pBuffer->IsLastPart(); + if(bStopProcessing) + { tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); - tReaderWriter.AddEmptyBuffer(pBuffer, false); - if(bStopProcessing) - { - // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for - AdjustFinalSize(pData->spSrcFile, fileSrc); + // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for + AdjustFinalSize(pData->spSrcFile, fileSrc); - pData->bProcessed = true; - m_tSubTaskStats.ResetCurrentItemProcessedSize(); - } + pData->bProcessed = true; + m_tSubTaskStats.ResetCurrentItemProcessedSize(); } + tReaderWriter.AddEmptyBuffer(pBuffer, false); + break; } Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -19,17 +19,22 @@ #include "stdafx.h" #include "TWriteBufferQueueWrapper.h" #include "TOverlappedDataBuffer.h" +#include "TCoreException.h" namespace chcore { TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue) : m_spDataQueue(spQueue), m_eventHasBuffers(true, false) { + UpdateHasBuffers(); } void TWriteBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + m_tClaimedQueue.Push(pBuffer); UpdateHasBuffers(); } @@ -80,4 +85,10 @@ { m_eventHasBuffers.SetEvent(IsBufferReady()); } + + void TWriteBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) + { + m_spDataQueue->ReleaseBuffers(spBuffers); + m_tClaimedQueue.ReleaseBuffers(spBuffers); + } } Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -21,6 +21,7 @@ #include "TEvent.h" #include "TOrderedBufferQueue.h" +#include "TBufferList.h" namespace chcore { @@ -45,6 +46,7 @@ bool IsEmpty() const; HANDLE GetHasBuffersEvent() const; + void ReleaseBuffers(const TBufferListPtr& spBuffers); private: void UpdateHasBuffers(); Index: src/libchcore/Tests/OverlappedCallbacksTests.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/Tests/OverlappedCallbacksTests.cpp (.../OverlappedCallbacksTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/Tests/OverlappedCallbacksTests.cpp (.../OverlappedCallbacksTests.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -1,11 +1,71 @@ -#include "stdafx.h" -#include "gtest/gtest.h" -#include "gmock/gmock.h" -#include "../OverlappedCallbacks.h" - -using namespace chcore; - -TEST(OverlappedCallbackTests, DefaultTest) -{ - -} +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../OverlappedCallbacks.h" +#include "../TOverlappedMemoryPool.h" +#include "../../liblogger/TLogFileData.h" +#include "../TOverlappedReaderWriter.h" + +using namespace chcore; + +TEST(OverlappedCallbackTests, OverlappedReadCompleted_Success) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); + TOverlappedDataBuffer buffer(16384, &queue); + + buffer.InitForRead(0, 1024); + buffer.SetStatusCode(0); + buffer.SetBytesTransferred(234); + + OverlappedReadCompleted(ERROR_SUCCESS, 234, &buffer); + + EXPECT_TRUE(buffer.IsLastPart()); + EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); + EXPECT_EQ(234, buffer.GetRealDataSize()); + + EXPECT_EQ(queue.GetWriteBuffer(), &buffer); +} + +TEST(OverlappedCallbackTests, OverlappedReadCompleted_Failure) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); + TOverlappedDataBuffer buffer(16384, &queue); + + buffer.InitForRead(0, 1024); + buffer.SetStatusCode(0); + buffer.SetBytesTransferred(0); + + OverlappedReadCompleted(ERROR_ACCESS_DENIED, 0, &buffer); + + EXPECT_FALSE(buffer.IsLastPart()); + EXPECT_EQ(ERROR_ACCESS_DENIED, buffer.GetErrorCode()); + EXPECT_EQ(0, buffer.GetRealDataSize()); + + EXPECT_EQ(queue.GetWriteBuffer(), &buffer); +} + +TEST(OverlappedCallbackTests, OverlappedWriteCompleted_Success) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); + TOverlappedDataBuffer buffer(16384, &queue); + + buffer.InitForRead(0, 1024); + buffer.SetStatusCode(0); + buffer.SetBytesTransferred(234); + buffer.SetLastPart(true); + buffer.SetRealDataSize(234); + + OverlappedWriteCompleted(ERROR_SUCCESS, 234, &buffer); + + EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); + EXPECT_EQ(queue.GetFinishedWriteBuffer(), &buffer); +} Index: src/libchcore/Tests/TOverlappedDataBufferTests.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -241,64 +241,3 @@ } /////////////////////////////////////////////////////////////////////////////////////////////////// -TEST(TOverlappedDataBufferTests, OverlappedReadCompleted_Success) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); - TOverlappedDataBuffer buffer(16384, &queue); - - buffer.InitForRead(0, 1024); - buffer.SetStatusCode(0); - buffer.SetBytesTransferred(234); - - OverlappedReadCompleted(ERROR_SUCCESS, 234, &buffer); - - EXPECT_TRUE(buffer.IsLastPart()); - EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); - EXPECT_EQ(234, buffer.GetRealDataSize()); - - EXPECT_EQ(queue.GetFinishedReadBuffer(), &buffer); -} - -TEST(TOverlappedDataBufferTests, OverlappedReadCompleted_Failure) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); - TOverlappedDataBuffer buffer(16384, &queue); - - buffer.InitForRead(0, 1024); - buffer.SetStatusCode(0); - buffer.SetBytesTransferred(0); - - OverlappedReadCompleted(ERROR_ACCESS_DENIED, 0, &buffer); - - EXPECT_FALSE(buffer.IsLastPart()); - EXPECT_EQ(ERROR_ACCESS_DENIED, buffer.GetErrorCode()); - EXPECT_EQ(0, buffer.GetRealDataSize()); - - EXPECT_EQ(queue.GetFinishedReadBuffer(), &buffer); -} - -TEST(TOverlappedDataBufferTests, OverlappedWriteCompleted_Success) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); - TOverlappedDataBuffer buffer(16384, &queue); - - buffer.InitForRead(0, 1024); - buffer.SetStatusCode(0); - buffer.SetBytesTransferred(234); - buffer.SetLastPart(true); - buffer.SetRealDataSize(234); - - OverlappedWriteCompleted(ERROR_SUCCESS, 234, &buffer); - - EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); - EXPECT_EQ(queue.GetFinishedWriteBuffer(), &buffer); -} Index: src/libchcore/Tests/TOverlappedReaderWriterTests.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 --- src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) @@ -29,7 +29,7 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_EQ(nullptr, tReaderWriter.GetEmptyBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetWriteBuffer()); EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); @@ -55,7 +55,7 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetWriteBuffer()); EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); @@ -266,7 +266,7 @@ tReaderWriter.AddFinishedReadBuffer(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWritePossibleHandle()); - tReaderWriter.GetFinishedReadBuffer(); + tReaderWriter.GetWriteBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); } @@ -279,13 +279,13 @@ TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetWriteBuffer()); tReaderWriter.AddFinishedReadBuffer(pBuffers[2]); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetWriteBuffer()); tReaderWriter.AddFinishedReadBuffer(pBuffers[0]); - EXPECT_NE(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_NE(nullptr, tReaderWriter.GetWriteBuffer()); } TEST(TOverlappedReaderWriterTests, AddFullBuffer_HandlingSrcEof) @@ -319,14 +319,14 @@ tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); tReaderWriter.AddFinishedReadBuffer(pBuffers[2]); - tReaderWriter.GetFinishedReadBuffer(); + tReaderWriter.GetWriteBuffer(); // EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); - tReaderWriter.GetFinishedReadBuffer(); + tReaderWriter.GetWriteBuffer(); // EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); // getting the last buffer (marked as eof) causes setting the data-writing-finished flag - tReaderWriter.GetFinishedReadBuffer(); + tReaderWriter.GetWriteBuffer(); // EXPECT_TRUE(tReaderWriter.IsDataWritingFinished()); } @@ -445,19 +445,13 @@ tReaderWriter.AddFinishedWriteBuffer(pBuffers[0]); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedWriteBuffer(); - EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); - tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); pBuffer = tReaderWriter.GetFinishedWriteBuffer(); - EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); - tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); pBuffer = tReaderWriter.GetFinishedWriteBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); - tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); - EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); } TEST(TOverlappedReaderWriterTests, GetFinishedBuffer_WrongOrder)