Index: src/libchcore/TBufferList.cpp =================================================================== diff -u -r980c1a0de537813728871676200a0960410b11fb -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) +++ src/libchcore/TBufferList.cpp (.../TBufferList.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -34,7 +34,7 @@ m_listBuffers.push_front(pBuffer); UpdateEvent(); - m_notifier(true); + m_notifier(); } TOverlappedDataBuffer* TBufferList::Pop() @@ -47,7 +47,7 @@ UpdateEvent(); - m_notifier(false); + m_notifier(); return pBuffer; } @@ -60,7 +60,7 @@ if (bRemoved) { UpdateEvent(); - m_notifier(false); + m_notifier(); } } @@ -85,7 +85,7 @@ return m_eventAllBuffersAccountedFor.Handle(); } - boost::signals2::signal& TBufferList::GetNotifier() + boost::signals2::signal& TBufferList::GetNotifier() { return m_notifier; } Index: src/libchcore/TBufferList.h =================================================================== diff -u -r980c1a0de537813728871676200a0960410b11fb -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision 980c1a0de537813728871676200a0960410b11fb) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -42,7 +42,7 @@ void SetExpectedBuffersCount(size_t stExpectedBuffers); HANDLE GetAllBuffersAccountedForEvent() const; - boost::signals2::signal& GetNotifier(); + boost::signals2::signal& GetNotifier(); private: void UpdateEvent(); @@ -51,7 +51,7 @@ size_t m_stExpectedBuffers = 0; // count of buffers there should be in m_listBuffers when no buffer is in use std::list m_listBuffers; - boost::signals2::signal m_notifier; + boost::signals2::signal m_notifier; TEvent m_eventAllBuffersAccountedFor; }; Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r685d0da3259dd94327ee8d644a88c155585b8249 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -142,12 +142,12 @@ if(HasPoppableBuffer()) { m_eventHasBuffers.SetEvent(); - m_notifier(true); + m_notifier(); } else { m_eventHasBuffers.ResetEvent(); - m_notifier(false); + m_notifier(); } } @@ -156,7 +156,7 @@ m_eventHasError.SetEvent(m_pFirstErrorBuffer != nullptr); } - boost::signals2::signal& TOrderedBufferQueue::GetNotifier() + boost::signals2::signal& TOrderedBufferQueue::GetNotifier() { return m_notifier; } Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -racd7bcfa7355db4a0d9af99a1bb99d685810d790 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision acd7bcfa7355db4a0d9af99a1bb99d685810d790) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -54,7 +54,7 @@ void ReleaseBuffers(const TBufferListPtr& spBuffers); - boost::signals2::signal& GetNotifier(); + boost::signals2::signal& GetNotifier(); private: void UpdateHasBuffers(); @@ -72,7 +72,7 @@ unsigned long long m_ullExpectedBufferPosition = 0; - boost::signals2::signal m_notifier; + boost::signals2::signal m_notifier; }; template @@ -97,15 +97,15 @@ { // if there is no ptr set then it is being processed somewhere and will be handled separately m_pFirstErrorBuffer->SetErrorCode(ERROR_SUCCESS); - rRetryQueue.Push(m_pFirstErrorBuffer, true); + rRetryQueue.Push(m_pFirstErrorBuffer); } m_pFirstErrorBuffer = pBuffer; m_ullErrorPosition = pBuffer->GetFilePosition(); } else if(pBuffer->GetFilePosition() > m_ullErrorPosition) { pBuffer->SetErrorCode(ERROR_SUCCESS); - rRetryQueue.Push(pBuffer, true); + rRetryQueue.Push(pBuffer); } else if(!m_pFirstErrorBuffer) m_pFirstErrorBuffer = pBuffer; // restore the buffer Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -rdcbfdc95eedacd24d8b1d78fa507029ce12a5a63 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision dcbfdc95eedacd24d8b1d78fa507029ce12a5a63) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -52,13 +52,32 @@ return pBuffer; } - void TOverlappedReader::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) + void TOverlappedReader::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + if(!m_bReleaseMode) + { + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as really-empty; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + } + + m_tInputBuffers.PushEmpty(pBuffer); + } + + void TOverlappedReader::AddRetryBuffer(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + if(m_bReleaseMode) - m_tInputBuffers.Push(pBuffer, false); + m_tInputBuffers.PushEmpty(pBuffer); else { LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as empty; buffer-order: " << pBuffer->GetFilePosition() << @@ -69,7 +88,7 @@ L", status-code: " << pBuffer->GetStatusCode() << L", is-last-part: " << pBuffer->IsLastPart(); - m_tInputBuffers.Push(pBuffer, bKeepPosition); + m_tInputBuffers.Push(pBuffer); } } @@ -79,7 +98,7 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); if(m_bReleaseMode) - m_tInputBuffers.Push(pBuffer, false); + m_tInputBuffers.PushEmpty(pBuffer); else { LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-read; buffer-order: " << pBuffer->GetFilePosition() << @@ -112,9 +131,7 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); if(m_bReleaseMode) - { - m_tInputBuffers.Push(pBuffer, false); - } + m_tInputBuffers.PushEmpty(pBuffer); else { LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-read; buffer-order: " << pBuffer->GetFilePosition() << @@ -149,10 +166,30 @@ return m_spFullBuffers; } + bool TOverlappedReader::IsDataSourceFinished() const + { + return m_tInputBuffers.IsDataSourceFinished(); + } + void TOverlappedReader::ReleaseBuffers() { m_bReleaseMode = true; - m_tInputBuffers.ReleaseBuffers(m_spEmptyBuffers); + m_tInputBuffers.ReleaseBuffers(); m_spFullBuffers->ReleaseBuffers(m_spEmptyBuffers); } + + HANDLE TOverlappedReader::GetEventReadPossibleHandle() const + { + return m_tInputBuffers.GetHasBuffersEvent(); + } + + HANDLE TOverlappedReader::GetEventReadFailedHandle() const + { + return m_spFullBuffers->GetHasErrorEvent(); + } + + HANDLE TOverlappedReader::GetEventReadFinishedHandle() const + { + return m_spFullBuffers->GetHasBuffersEvent(); + } } Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -rdcbfdc95eedacd24d8b1d78fa507029ce12a5a63 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision dcbfdc95eedacd24d8b1d78fa507029ce12a5a63) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -37,7 +37,8 @@ TOverlappedReader& operator=(const TOverlappedReader&) = delete; // buffer management - void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); + void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer); + void AddRetryBuffer(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* GetEmptyBuffer(); void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); @@ -49,12 +50,12 @@ TOrderedBufferQueuePtr GetFinishedQueue() const; // processing info - bool IsDataSourceFinished() const { return m_tInputBuffers.IsDataSourceFinished(); } + bool IsDataSourceFinished() const; // event access - HANDLE GetEventReadPossibleHandle() const { return m_tInputBuffers.GetHasBuffersEvent(); } - HANDLE GetEventReadFailedHandle() const { return m_spFullBuffers->GetHasErrorEvent(); } - HANDLE GetEventReadFinishedHandle() const { return m_spFullBuffers->GetHasBuffersEvent(); } + HANDLE GetEventReadPossibleHandle() const; + HANDLE GetEventReadFailedHandle() const; + HANDLE GetEventReadFinishedHandle() const; void ReleaseBuffers(); Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -rc0d9a798f9fbbeda239b84721ed864f9727e1ddc -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision c0d9a798f9fbbeda239b84721ed864f9727e1ddc) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -19,7 +19,6 @@ #include "stdafx.h" #include "TOverlappedReaderFB.h" #include "TCoreException.h" -#include "TFileInfo.h" namespace chcore { @@ -57,7 +56,7 @@ TSubTaskBase::ESubOperationResult eResult = m_spSrcFile->ReadFileFB(*pBuffer); if(eResult != TSubTaskBase::eSubResult_Continue) - m_spReader->AddEmptyBuffer(pBuffer, false); + m_spReader->AddEmptyBuffer(pBuffer); return eResult; } @@ -73,11 +72,11 @@ if(eResult == TSubTaskBase::eSubResult_Retry) { m_spSrcFile->Close(); - m_spReader->AddEmptyBuffer(pBuffer, true); + m_spReader->AddRetryBuffer(pBuffer); eResult = TSubTaskBase::eSubResult_Continue; } else if(eResult != TSubTaskBase::eSubResult_Continue) - m_spReader->AddEmptyBuffer(pBuffer, false); + m_spReader->AddEmptyBuffer(pBuffer); return eResult; } Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -rc0d9a798f9fbbeda239b84721ed864f9727e1ddc -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision c0d9a798f9fbbeda239b84721ed864f9727e1ddc) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -117,6 +117,7 @@ { eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount }; + std::array arrHandles = { hKill, m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u -re7ca9e2c6306cd94e5e5caecc9cfdb3b562d3cdf -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision e7ca9e2c6306cd94e5e5caecc9cfdb3b562d3cdf) +++ src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -27,9 +27,9 @@ TOverlappedWriter::TOverlappedWriter(const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, unsigned long long ullFilePos, const TBufferListPtr& spEmptyBuffers) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), + m_spEmptyBuffers(spEmptyBuffers), m_tBuffersToWrite(spBuffersToWrite), - m_tFinishedBuffers(ullFilePos), - m_spEmptyBuffers(spEmptyBuffers) + m_tFinishedBuffers(ullFilePos) { if(!spLogFileData) throw TCoreException(eErr_InvalidArgument, L"spLogFileData is NULL", LOCATION); @@ -44,32 +44,29 @@ { } + void TOverlappedWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + { + m_spEmptyBuffers->Push(pBuffer); + } + void TOverlappedWriter::AddRetryBuffer(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_spEmptyBuffers->Push(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as write-retry; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as write-retry; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - m_tBuffersToWrite.Push(pBuffer); - } + m_tBuffersToWrite.Push(pBuffer); } TOverlappedDataBuffer* TOverlappedWriter::GetWriteBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_tBuffersToWrite.Pop(); if (pBuffer) pBuffer->SetParam(this); @@ -82,27 +79,19 @@ if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_spEmptyBuffers->Push(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-write; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as failed-write; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - m_tFinishedBuffers.PushError(pBuffer, m_tBuffersToWrite); - } + m_tFinishedBuffers.PushError(pBuffer, m_tBuffersToWrite); } TOverlappedDataBuffer* TOverlappedWriter::GetFailedWriteBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_tFinishedBuffers.PopError(); if (pBuffer) pBuffer->SetParam(this); @@ -112,9 +101,6 @@ TOverlappedDataBuffer* TOverlappedWriter::GetFinishedBuffer() { - if(m_bReleaseMode) - return nullptr; - TOverlappedDataBuffer* pBuffer = m_tFinishedBuffers.Pop(); if (pBuffer) @@ -137,9 +123,6 @@ if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - return; - if (pBuffer != m_pLastPartBuffer) throw TCoreException(eErr_InvalidArgument, L"Trying to mark different buffer as finalized", LOCATION); @@ -154,32 +137,34 @@ m_pLastPartBuffer = nullptr; } + HANDLE TOverlappedWriter::GetEventWritePossibleHandle() const + { + return m_tBuffersToWrite.GetHasBuffersEvent(); + } + + HANDLE TOverlappedWriter::GetEventWriteFailedHandle() const + { + return m_tFinishedBuffers.GetHasErrorEvent(); + } + + HANDLE TOverlappedWriter::GetEventWriteFinishedHandle() const + { + return m_tFinishedBuffers.GetHasBuffersEvent(); + } + void TOverlappedWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(m_bReleaseMode) - m_spEmptyBuffers->Push(pBuffer); - else - { - LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-write; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); + LOG_TRACE(m_spLog) << L"Queuing buffer " << pBuffer << L" as finished-write; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); - m_tFinishedBuffers.Push(pBuffer); - } + m_tFinishedBuffers.Push(pBuffer); } - - void TOverlappedWriter::ReleaseBuffers() - { - m_bReleaseMode = true; - m_pLastPartBuffer = nullptr; - m_tBuffersToWrite.ReleaseBuffers(m_spEmptyBuffers); - m_tFinishedBuffers.ReleaseBuffers(m_spEmptyBuffers); - } } Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u -re7ca9e2c6306cd94e5e5caecc9cfdb3b562d3cdf -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision e7ca9e2c6306cd94e5e5caecc9cfdb3b562d3cdf) +++ src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -37,6 +37,8 @@ TOverlappedWriter& operator=(const TOverlappedWriter&) = delete; + void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer); + void AddRetryBuffer(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* GetWriteBuffer(); @@ -51,12 +53,10 @@ void MarkAsFinalized(TOverlappedDataBuffer* pBuffer); // event access - HANDLE GetEventWritePossibleHandle() const { return m_tBuffersToWrite.GetHasBuffersEvent(); } - HANDLE GetEventWriteFailedHandle() const { return m_tFinishedBuffers.GetHasErrorEvent(); } - HANDLE GetEventWriteFinishedHandle() const { return m_tFinishedBuffers.GetHasBuffersEvent(); } + HANDLE GetEventWritePossibleHandle() const; + HANDLE GetEventWriteFailedHandle() const; + HANDLE GetEventWriteFinishedHandle() const; - void ReleaseBuffers(); - private: logger::TLoggerPtr m_spLog; @@ -66,8 +66,6 @@ TOrderedBufferQueue m_tFinishedBuffers; TOverlappedDataBuffer* m_pLastPartBuffer = nullptr; - - bool m_bReleaseMode = false; }; using TOverlappedWriterPtr = std::shared_ptr; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -rc0d9a798f9fbbeda239b84721ed864f9727e1ddc -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision c0d9a798f9fbbeda239b84721ed864f9727e1ddc) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -33,8 +33,7 @@ m_spSrcFile(spSrcFile), m_spDstFile(spDstFile), m_spStats(spStats), - m_spSrcFileInfo(spSrcFileInfo), - m_spEmptyBuffers(spEmptyBuffers) + m_spSrcFileInfo(spSrcFileInfo) { if(!spDstFile) throw TCoreException(eErr_InvalidArgument, L"spDstFile is NULL", LOCATION); @@ -58,13 +57,13 @@ if(m_bReleaseMode) { - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return TSubTaskBase::eSubResult_Continue; } TSubTaskBase::ESubOperationResult eResult = m_spDstFile->WriteFileFB(*pBuffer); if(eResult != TSubTaskBase::eSubResult_Continue) - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } @@ -77,7 +76,7 @@ if(m_bReleaseMode) { - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return TSubTaskBase::eSubResult_Continue; } @@ -89,7 +88,7 @@ eResult = TSubTaskBase::eSubResult_Continue; } else if(eResult != TSubTaskBase::eSubResult_Continue) - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } @@ -106,7 +105,7 @@ { AdjustProcessedSize(fsWritten); // ignore return value as we're already in release mode - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return TSubTaskBase::eSubResult_Continue; } @@ -117,7 +116,7 @@ eResult = m_spDstFile->FinalizeFileFB(*pBuffer); if (eResult != TSubTaskBase::eSubResult_Continue) { - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } } @@ -126,7 +125,7 @@ eResult = AdjustProcessedSize(fsWritten); if(eResult != TSubTaskBase::eSubResult_Continue) { - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } @@ -140,14 +139,14 @@ eResult = AdjustFinalSize(); if(eResult != TSubTaskBase::eSubResult_Continue) { - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } m_spStats->ResetCurrentItemProcessedSize(); } - m_spEmptyBuffers->Push(pBuffer); + m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -rc0d9a798f9fbbeda239b84721ed864f9727e1ddc -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision c0d9a798f9fbbeda239b84721ed864f9727e1ddc) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -52,7 +52,6 @@ TSubTaskStatsInfoPtr m_spStats; TFileInfoPtr m_spSrcFileInfo; TFileInfoPtr m_spDstFileInfo; - TBufferListPtr m_spEmptyBuffers; bool m_bReleaseMode = false; }; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -r980c1a0de537813728871676200a0960410b11fb -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -24,7 +24,7 @@ namespace chcore { TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : - m_spUnorderedQueue(spUnorderedQueue), + m_spEmptyBuffers(spUnorderedQueue), m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), m_dwChunkSize(dwChunkSize) @@ -34,7 +34,7 @@ if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); - m_emptyBuffersQueueConnector = m_spUnorderedQueue->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); UpdateHasBuffers(); } @@ -44,41 +44,49 @@ m_emptyBuffersQueueConnector.disconnect(); } - void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) + void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - if(!bKeepPosition) - m_spUnorderedQueue->Push(pBuffer); - else if(IsDataSourceFinished()) + if(IsDataSourceFinished()) { if(!pBuffer->IsLastPart()) { if(pBuffer->GetFilePosition() > m_ullDataSourceFinishedPos) throw TCoreException(eErr_InvalidArgument, L"Adding regular buffer after the queue was marked as finished", LOCATION); - m_tClaimedQueue.Push(pBuffer); + m_tRetryBuffers.Push(pBuffer); } else - m_spUnorderedQueue->Push(pBuffer); + m_spEmptyBuffers->Push(pBuffer); } else - m_tClaimedQueue.Push(pBuffer); + m_tRetryBuffers.Push(pBuffer); UpdateHasBuffers(); } + void TReadBufferQueueWrapper::PushEmpty(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + m_spEmptyBuffers->Push(pBuffer); + + //UpdateHasBuffers(); // already updated using notifier + } + TOverlappedDataBuffer* TReadBufferQueueWrapper::Pop() { if(!IsBufferReady()) return nullptr; // always return retry buffers first - TOverlappedDataBuffer* pBuffer = m_tClaimedQueue.Pop(); + TOverlappedDataBuffer* pBuffer = m_tRetryBuffers.Pop(); if(!pBuffer) { - pBuffer = m_spUnorderedQueue->Pop(); + pBuffer = m_spEmptyBuffers->Pop(); if(pBuffer) { pBuffer->InitForRead(m_ullNextReadPosition, m_dwChunkSize); @@ -95,14 +103,14 @@ bool TReadBufferQueueWrapper::IsBufferReady() const { if(IsDataSourceFinished()) - return !m_tClaimedQueue.empty(); + return !m_tRetryBuffers.empty(); - return !m_tClaimedQueue.empty() || !m_spUnorderedQueue->IsEmpty(); + return !m_tRetryBuffers.empty() || !m_spEmptyBuffers->IsEmpty(); } size_t TReadBufferQueueWrapper::GetCount() const { - return m_tClaimedQueue.size(); + return m_tRetryBuffers.size(); } void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) @@ -117,22 +125,22 @@ m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); // release superfluous finished buffers - auto iterFind = std::find_if(m_tClaimedQueue.begin(), m_tClaimedQueue.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); - if(iterFind == m_tClaimedQueue.end() || ++iterFind == m_tClaimedQueue.end()) + auto iterFind = std::find_if(m_tRetryBuffers.begin(), m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); + if(iterFind == m_tRetryBuffers.end() || ++iterFind == m_tRetryBuffers.end()) { UpdateHasBuffers(); return; } - auto iterInvalidParts = std::find_if(iterFind, m_tClaimedQueue.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); - if(iterInvalidParts != m_tClaimedQueue.end()) + auto iterInvalidParts = std::find_if(iterFind, m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); + if(iterInvalidParts != m_tRetryBuffers.end()) throw TCoreException(eErr_InvalidArgument, L"Found non-last-parts after last-part", LOCATION); - for(auto iter = iterFind; iter != m_tClaimedQueue.end(); ++iter) + for(auto iter = iterFind; iter != m_tRetryBuffers.end(); ++iter) { - m_spUnorderedQueue->Push(*iter); + m_spEmptyBuffers->Push(*iter); } - m_tClaimedQueue.erase(iterFind, m_tClaimedQueue.end()); + m_tRetryBuffers.erase(iterFind, m_tRetryBuffers.end()); UpdateHasBuffers(); } @@ -153,13 +161,8 @@ m_eventHasBuffers.SetEvent(IsBufferReady()); } - void TReadBufferQueueWrapper::UpdateHasBuffers(bool /*bAdded*/) + void TReadBufferQueueWrapper::ReleaseBuffers() { - UpdateHasBuffers(); + m_tRetryBuffers.ReleaseBuffers(m_spEmptyBuffers); } - - void TReadBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) - { - m_tClaimedQueue.ReleaseBuffers(spBuffers); - } } Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -36,7 +36,9 @@ TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize); ~TReadBufferQueueWrapper(); - void Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); + void Push(TOverlappedDataBuffer* pBuffer); + void PushEmpty(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* Pop(); size_t GetCount() const; @@ -45,18 +47,17 @@ bool IsDataSourceFinished() const; HANDLE GetHasBuffersEvent() const; - void ReleaseBuffers(const TBufferListPtr& spBuffers); + void ReleaseBuffers(); private: bool IsBufferReady() const; void UpdateHasBuffers(); - void UpdateHasBuffers(bool bAdded); private: - TBufferListPtr m_spUnorderedQueue; // external queue of buffers to use + TBufferListPtr m_spEmptyBuffers; // external queue of buffers to use boost::signals2::connection m_emptyBuffersQueueConnector; - TSimpleOrderedBufferQueue m_tClaimedQueue; // internal queue of claimed buffers + TSimpleOrderedBufferQueue m_tRetryBuffers; // internal queue of claimed buffers TEvent m_eventHasBuffers; Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -r0fab495f4e17f067b303cc677056a666609622d5 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 0fab495f4e17f067b303cc677056a666609622d5) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -365,7 +365,7 @@ // recreate buffer if needed AdjustBufferIfNeeded(pData->spMemoryPool, pData->tBufferSizes); - ATLTRACE(_T("CustomCopyFile: %s\n"), pData->spSrcFile->GetFullFilePath().ToString()); + //ATLTRACE(_T("CustomCopyFile: %s\n"), pData->spSrcFile->GetFullFilePath().ToString()); // establish count of data to read TBufferSizes::EBufferType eBufferIndex = GetBufferIndex(pData->tBufferSizes, pData->spSrcFile); Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -r685d0da3259dd94327ee8d644a88c155585b8249 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -32,7 +32,7 @@ UpdateHasBuffers(); - m_emptyBuffersQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_emptyBuffersQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); } TWriteBufferQueueWrapper::~TWriteBufferQueueWrapper() @@ -45,7 +45,7 @@ if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - m_tClaimedQueue.Push(pBuffer); + m_tRetryBuffers.Push(pBuffer); UpdateHasBuffers(); } @@ -63,23 +63,23 @@ TOverlappedDataBuffer* TWriteBufferQueueWrapper::InternalPop() { - const TOverlappedDataBuffer* pClaimedQueueBuffer = m_tClaimedQueue.Peek(); + const TOverlappedDataBuffer* pClaimedQueueBuffer = m_tRetryBuffers.Peek(); if (!pClaimedQueueBuffer) return m_spDataQueue->Pop(); const TOverlappedDataBuffer* pDataQueueBuffer = m_spDataQueue->Peek(); if (!pDataQueueBuffer) - return m_tClaimedQueue.Pop(); + return m_tRetryBuffers.Pop(); if (pClaimedQueueBuffer->GetFilePosition() < pDataQueueBuffer->GetFilePosition()) - return m_tClaimedQueue.Pop(); + return m_tRetryBuffers.Pop(); else return m_spDataQueue->Pop(); } bool TWriteBufferQueueWrapper::IsBufferReady() const { - return !m_tClaimedQueue.empty() || m_spDataQueue->HasPoppableBuffer(); + return !m_tRetryBuffers.empty() || m_spDataQueue->HasPoppableBuffer(); } size_t TWriteBufferQueueWrapper::GetCount() const @@ -96,12 +96,4 @@ { m_eventHasBuffers.SetEvent(IsBufferReady()); } - - void TWriteBufferQueueWrapper::ReleaseBuffers(const TBufferListPtr& spBuffers) - { - if(!spBuffers) - throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION); - m_spDataQueue->ReleaseBuffers(spBuffers); - m_tClaimedQueue.ReleaseBuffers(spBuffers); - } } Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -racd7bcfa7355db4a0d9af99a1bb99d685810d790 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision acd7bcfa7355db4a0d9af99a1bb99d685810d790) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -34,26 +34,23 @@ explicit TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue); ~TWriteBufferQueueWrapper(); - void Push(TOverlappedDataBuffer* pBuffer, bool /*bKeepPosition*/) { Push(pBuffer); } void Push(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* Pop(); size_t GetCount() const; HANDLE GetHasBuffersEvent() const; - void ReleaseBuffers(const TBufferListPtr& spBuffers); private: bool IsBufferReady() const; void UpdateHasBuffers(); - void UpdateHasBuffers(bool /*bAdded*/) { UpdateHasBuffers(); } TOverlappedDataBuffer* InternalPop(); private: TOrderedBufferQueuePtr m_spDataQueue; // external queue of buffers to use boost::signals2::connection m_emptyBuffersQueueConnector; - TSimpleOrderedBufferQueue m_tClaimedQueue; // internal queue of claimed buffers + TSimpleOrderedBufferQueue m_tRetryBuffers; // internal queue of claimed buffers TEvent m_eventHasBuffers; }; Index: src/libchcore/Tests/TOrderedBufferQueueTests.cpp =================================================================== diff -u -r685d0da3259dd94327ee8d644a88c155585b8249 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/Tests/TOrderedBufferQueueTests.cpp (.../TOrderedBufferQueueTests.cpp) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/Tests/TOrderedBufferQueueTests.cpp (.../TOrderedBufferQueueTests.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -8,7 +8,7 @@ class FallbackCollection : public std::vector { public: - void Push(chcore::TOverlappedDataBuffer* pBuffer, bool /*bKeepPos*/) + void Push(chcore::TOverlappedDataBuffer* pBuffer) { push_back(pBuffer); } Index: src/libchcore/Tests/TOverlappedReaderTests.cpp =================================================================== diff -u -rdcbfdc95eedacd24d8b1d78fa507029ce12a5a63 -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision dcbfdc95eedacd24d8b1d78fa507029ce12a5a63) +++ src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -84,13 +84,13 @@ EXPECT_TIMEOUT(tReader.GetEventReadPossibleHandle()); - tReader.AddEmptyBuffer(pBuffers[ 0 ], false); + tReader.AddEmptyBuffer(pBuffers[ 0 ]); EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); - tReader.AddEmptyBuffer(pBuffers[ 1 ], false); + tReader.AddEmptyBuffer(pBuffers[ 1 ]); EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); - tReader.AddEmptyBuffer(pBuffers[ 2 ], false); + tReader.AddEmptyBuffer(pBuffers[ 2 ]); EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); } @@ -101,8 +101,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), 0, 4096); - EXPECT_THROW(tReader.AddEmptyBuffer(nullptr, false), TCoreException); - EXPECT_THROW(tReader.AddEmptyBuffer(nullptr, true), TCoreException); + EXPECT_THROW(tReader.AddEmptyBuffer(nullptr), TCoreException); + EXPECT_THROW(tReader.AddRetryBuffer(nullptr), TCoreException); } /////////////////////////////////////////////////////////////////////////////////////////////////// Index: src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp =================================================================== diff -u -r980c1a0de537813728871676200a0960410b11fb -ra1f5b3d99f2f175b102d81379698ea1f08e42cce --- src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 980c1a0de537813728871676200a0960410b11fb) +++ src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision a1f5b3d99f2f175b102d81379698ea1f08e42cce) @@ -76,10 +76,10 @@ TOverlappedDataBuffer buffer2(1024, nullptr); TOverlappedDataBuffer buffer3(1024, nullptr); TOverlappedDataBuffer buffer4(1024, nullptr); - queue.Push(&buffer1, false); - queue.Push(&buffer2, false); - queue.Push(&buffer3, false); - queue.Push(&buffer4, false); + queue.PushEmpty(&buffer1); + queue.PushEmpty(&buffer2); + queue.PushEmpty(&buffer3); + queue.PushEmpty(&buffer4); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer4, queue.Pop()); @@ -117,8 +117,8 @@ TOverlappedDataBuffer buffer3(1024, nullptr); TOverlappedDataBuffer buffer4(1024, nullptr); - queue.Push(&buffer3, false); - queue.Push(&buffer4, false); + queue.PushEmpty(&buffer3); + queue.PushEmpty(&buffer4); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer4, queue.Pop()); @@ -183,11 +183,11 @@ TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetFilePosition(0); buffer1.SetLastPart(true); - queue.Push(&buffer1, true); + queue.Push(&buffer1); TOverlappedDataBuffer buffer2(1024, nullptr); buffer2.SetFilePosition(1024); buffer2.SetLastPart(true); - queue.Push(&buffer2, true); + queue.Push(&buffer2); queue.SetDataSourceFinished(&buffer1); @@ -205,7 +205,7 @@ TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetLastPart(true); - queue.Push(&buffer1, true); + queue.Push(&buffer1); queue.SetDataSourceFinished(&buffer1); @@ -214,7 +214,7 @@ TOverlappedDataBuffer buffer2(1024, nullptr); buffer2.SetLastPart(true); - queue.Push(&buffer2, true); + queue.Push(&buffer2); EXPECT_EQ(1, queue.GetCount()); EXPECT_EQ(&buffer1, queue.Pop()); @@ -230,7 +230,7 @@ TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetLastPart(true); buffer1.SetFilePosition(0); - queue.Push(&buffer1, true); + queue.Push(&buffer1); queue.SetDataSourceFinished(&buffer1); @@ -240,5 +240,5 @@ TOverlappedDataBuffer buffer2(1024, nullptr); buffer2.SetFilePosition(1000); - EXPECT_THROW(queue.Push(&buffer2, true), TCoreException); + EXPECT_THROW(queue.Push(&buffer2), TCoreException); }