Index: src/libchcore/TBufferList.cpp =================================================================== diff -u -N --- src/libchcore/TBufferList.cpp (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TBufferList.cpp (revision 0) @@ -1,117 +0,0 @@ -// ============================================================================ -// Copyright (C) 2001-2016 by Jozef Starosczyk -// ixen@copyhandler.com -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU Library General Public License -// (version 2) as published by the Free Software Foundation; -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this program; if not, write to the -// Free Software Foundation, Inc., -// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -// ============================================================================ -#include "stdafx.h" -#include "TBufferList.h" -#include "TCoreException.h" -#include - -namespace chcore -{ - TBufferList::TBufferList() : - m_eventAllBuffersAccountedFor(true, true) - { - } - - void TBufferList::Push(TOverlappedDataBuffer* pBuffer) - { - if(!pBuffer) - throw TCoreException(eErr_InvalidArgument, L"pBuffer", LOCATION); - - { - boost::unique_lock lock(m_mutex); - - m_listBuffers.push_front(pBuffer); - UpdateEvent(); - } - - m_notifier(); - } - - TOverlappedDataBuffer* TBufferList::Pop() - { - TOverlappedDataBuffer* pBuffer = nullptr; - - { - boost::unique_lock lock(m_mutex); - - if(m_listBuffers.empty()) - return nullptr; - - pBuffer = m_listBuffers.front(); - m_listBuffers.pop_front(); - - UpdateEvent(); - } - - m_notifier(); - - return pBuffer; - } - - void TBufferList::Clear() - { - bool bRemoved = false; - { - boost::unique_lock lock(m_mutex); - - bRemoved = !m_listBuffers.empty(); - m_listBuffers.clear(); - - if(bRemoved) - UpdateEvent(); - } - - if(bRemoved) - m_notifier(); - } - - size_t TBufferList::GetCount() const - { - boost::shared_lock lock(m_mutex); - return m_listBuffers.size(); - } - - bool TBufferList::IsEmpty() const - { - boost::shared_lock lock(m_mutex); - return m_listBuffers.empty(); - } - - void TBufferList::SetExpectedBuffersCount(size_t stExpectedBuffers) - { - boost::shared_lock lock(m_mutex); - m_stExpectedBuffers = stExpectedBuffers; - UpdateEvent(); - } - - HANDLE TBufferList::GetAllBuffersAccountedForEvent() const - { - return m_eventAllBuffersAccountedFor.Handle(); - } - - boost::signals2::signal& TBufferList::GetNotifier() - { - return m_notifier; - } - - void TBufferList::UpdateEvent() - { - m_eventAllBuffersAccountedFor.SetEvent(m_listBuffers.size() == m_stExpectedBuffers); - } -} Index: src/libchcore/TBufferList.h =================================================================== diff -u -N -r10d42e85d810f6da082cb2ce4415dcb72903410e -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -20,7 +20,9 @@ #define __TBUFFERLIST_H__ #include -#include "TEvent.h" +#include +#include "TCoreException.h" +#include namespace chcore { @@ -29,32 +31,93 @@ class TBufferList { public: - TBufferList(); + TBufferList() + { + } - void Push(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* Pop(); + void Push(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidArgument, L"pBuffer", LOCATION); - void Clear(); + { + boost::unique_lock lock(m_mutex); - size_t GetCount() const; - bool IsEmpty() const; + m_queueBuffers.push_front(pBuffer); + } - void SetExpectedBuffersCount(size_t stExpectedBuffers); - HANDLE GetAllBuffersAccountedForEvent() const; + m_notifier(); + } - boost::signals2::signal& GetNotifier(); + TOverlappedDataBuffer* Pop() + { + TOverlappedDataBuffer* pBuffer = nullptr; - private: - void UpdateEvent(); + { + boost::unique_lock lock(m_mutex); + if(m_queueBuffers.empty()) + return nullptr; + + pBuffer = m_queueBuffers.front(); + m_queueBuffers.pop_front(); + } + + m_notifier(); + + return pBuffer; + } + + void Clear() + { + bool bRemoved = false; + { + boost::unique_lock lock(m_mutex); + + bRemoved = !m_queueBuffers.empty(); + m_queueBuffers.clear(); + } + + if(bRemoved) + m_notifier(); + } + + size_t GetCount() const + { + boost::shared_lock lock(m_mutex); + return m_queueBuffers.size(); + } + + bool IsEmpty() const + { + boost::shared_lock lock(m_mutex); + return m_queueBuffers.empty(); + } + + void SetExpectedBuffersCount(size_t stExpectedBuffers) // thread-unsafe by design + { + boost::unique_lock lock(m_mutex); + m_stExpectedBuffers = stExpectedBuffers; + } + + bool AreAllBuffersAccountedFor() const + { + boost::shared_lock lock(m_mutex); + return m_stExpectedBuffers == m_queueBuffers.size(); + } + + boost::signals2::signal& GetNotifier() + { + return m_notifier; + } + private: mutable boost::shared_mutex m_mutex; - size_t m_stExpectedBuffers = 0; // count of buffers there should be in m_listBuffers when no buffer is in use - std::list m_listBuffers; + size_t m_stExpectedBuffers = 0; // count of buffers there should be in m_queueBuffers when no buffer is in use + std::deque m_queueBuffers; boost::signals2::signal m_notifier; - TEvent m_eventAllBuffersAccountedFor; }; using TBufferListPtr = std::shared_ptr; Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -25,7 +25,6 @@ #include "TBufferList.h" #include "TCoreException.h" #include -#include namespace chcore { Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -28,7 +28,6 @@ const TOverlappedProcessorRangePtr& spDataRange, DWORD dwChunkSize) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_spEmptyBuffers(spEmptyBuffers), m_tInputBuffers(spEmptyBuffers, spDataRange ? spDataRange->GetResumePosition() : 0, dwChunkSize), m_spFullBuffers(std::make_shared(spDataRange ? spDataRange->GetResumePosition() : 0)) { @@ -173,11 +172,6 @@ return m_spFullBuffers->GetHasErrorEvent(); } - HANDLE TOverlappedReader::GetEventReadFinishedHandle() const - { - return m_spFullBuffers->GetHasBuffersEvent(); - } - HANDLE TOverlappedReader::GetEventDataSourceFinishedHandle() const { return m_spFullBuffers->GetHasReadingFinished(); Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -58,7 +58,6 @@ // event access HANDLE GetEventReadPossibleHandle() const; HANDLE GetEventReadFailedHandle() const; - HANDLE GetEventReadFinishedHandle() const; HANDLE GetEventDataSourceFinishedHandle() const; void ClearBuffers(); @@ -69,7 +68,6 @@ logger::TLoggerPtr m_spLog; // queues - TBufferListPtr m_spEmptyBuffers; TReadBufferQueueWrapper m_tInputBuffers; TOrderedBufferQueuePtr m_spFullBuffers; // buffers with data Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -71,6 +71,7 @@ TSubTaskBase::ESubOperationResult TOverlappedReaderFB::StopThreaded() { + WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); return m_eThreadResult; } @@ -81,6 +82,7 @@ void TOverlappedReaderFB::StartThreaded() { + m_eventProcessingFinished.ResetEvent(); TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); TEvent eventNonSignaled(true, false); @@ -104,40 +106,47 @@ bool bDataSourceFinished = false; - while(m_eThreadResult == TSubTaskBase::eSubResult_Continue && !bDataSourceFinished) + try { - DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); - switch(dwResult) + while(m_eThreadResult == TSubTaskBase::eSubResult_Continue && !bDataSourceFinished) { - case STATUS_USER_APC: - break; + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; - case WAIT_OBJECT_0 + eKillThread: - m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; - break; + case WAIT_OBJECT_0 + eKillThread: + m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; + break; - case WAIT_OBJECT_0 + eReadPossible: - m_eThreadResult = OnReadPossible(); - break; + case WAIT_OBJECT_0 + eReadPossible: + m_eThreadResult = OnReadPossible(); + break; - case WAIT_OBJECT_0 + eReadFailed: - m_eThreadResult = OnReadFailed(); - break; + case WAIT_OBJECT_0 + eReadFailed: + m_eThreadResult = OnReadFailed(); + break; - case WAIT_OBJECT_0 + eDataSourceFinished: - bDataSourceFinished = true; - m_eThreadResult = TSubTaskBase::eSubResult_Continue; - break; + case WAIT_OBJECT_0 + eDataSourceFinished: + bDataSourceFinished = true; + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + break; - default: - throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } } } + catch(const std::exception&) + { + m_eThreadResult = TSubTaskBase::eSubResult_Error; + } WaitForOnTheFlyBuffers(); ClearQueues(); - if(bDataSourceFinished) + if(m_eThreadResult == TSubTaskBase::eSubResult_Continue && bDataSourceFinished) m_eventReadingFinished.SetEvent(); } Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -44,7 +44,6 @@ bool bNoBuffering, bool bProtectReadOnlyFiles); TOverlappedReaderFB(const TOverlappedReaderFB& rSrc) = delete; - ~TOverlappedReaderFB(); TOverlappedReaderFB& operator=(const TOverlappedReaderFB& rSrc) = delete; Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -57,33 +57,8 @@ { } - void TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState() + TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::Process() { - bool bStopProcessing = false; - while(!bStopProcessing) - { - DWORD dwResult = WaitForSingleObjectEx(m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent(), INFINITE, TRUE); - switch(dwResult) - { - case STATUS_USER_APC: - break; - - case WAIT_OBJECT_0: - { - LOG_DEBUG(m_spLog) << L"All buffer accounted for."; - - bStopProcessing = true; - break; - } - - default: - throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); - } - } - } - - TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::Start() - { TSubTaskBase::ESubOperationResult eResult = m_spReader->Start(); if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; @@ -146,7 +121,9 @@ } } - WaitForMissingBuffersAndResetState(); + // ensure that no buffer was lost in the process + if(!m_spMemoryPool->GetBufferList()->AreAllBuffersAccountedFor()) + throw TCoreException(eErr_InternalProblem, L"", LOCATION); return eResult; } Index: src/libchcore/TOverlappedReaderWriterFB.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -51,15 +51,8 @@ TOverlappedReaderWriterFB& operator=(const TOverlappedReaderWriterFB&) = delete; - TSubTaskBase::ESubOperationResult Start(); + TSubTaskBase::ESubOperationResult Process(); - // reader/writer - TOverlappedReaderFBPtr GetReader() const { return m_spReader; } - TOverlappedWriterFBPtr GetWriter() const { return m_spWriter; } - - // event access - void WaitForMissingBuffersAndResetState(); - private: logger::TLoggerPtr m_spLog; TOverlappedThreadPool& m_rThreadPool; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -309,6 +309,7 @@ void TOverlappedWriterFB::StartThreaded() { + m_eventProcessingFinished.ResetEvent(); TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); TEvent eventNonSignaled(true, false); @@ -325,35 +326,43 @@ }; bool bWrittenLastBuffer = false; - while(!bWrittenLastBuffer && m_eThreadResult == TSubTaskBase::eSubResult_Continue) + + try { - DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); - switch(dwResult) + while(!bWrittenLastBuffer && m_eThreadResult == TSubTaskBase::eSubResult_Continue) { - case STATUS_USER_APC: - break; + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; - case WAIT_OBJECT_0 + eKillThread: - m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; - break; + case WAIT_OBJECT_0 + eKillThread: + m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; + break; - case WAIT_OBJECT_0 + eWritePossible: - m_eThreadResult = OnWritePossible(); - break; + case WAIT_OBJECT_0 + eWritePossible: + m_eThreadResult = OnWritePossible(); + break; - case WAIT_OBJECT_0 + eWriteFailed: - m_eThreadResult = OnWriteFailed(); - break; + case WAIT_OBJECT_0 + eWriteFailed: + m_eThreadResult = OnWriteFailed(); + break; - case WAIT_OBJECT_0 + eWriteFinished: - m_eThreadResult = OnWriteFinished(bWrittenLastBuffer); - break; + case WAIT_OBJECT_0 + eWriteFinished: + m_eThreadResult = OnWriteFinished(bWrittenLastBuffer); + break; - default: - DWORD dwLastError = GetLastError(); - throw TCoreWin32Exception(eErr_UnhandledCase, dwLastError, L"Unknown result from async waiting function", LOCATION); + default: + DWORD dwLastError = GetLastError(); + throw TCoreWin32Exception(eErr_UnhandledCase, dwLastError, L"Unknown result from async waiting function", LOCATION); + } } } + catch(const std::exception&) + { + m_eThreadResult = TSubTaskBase::eSubResult_Error; + } WaitForOnTheFlyBuffers(); ClearBuffers(); @@ -380,11 +389,7 @@ TSubTaskBase::ESubOperationResult TOverlappedWriterFB::StopThreaded() { + WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); return m_eThreadResult; } - - TOverlappedWriterPtr TOverlappedWriterFB::GetWriter() const - { - return m_spWriter; - } } Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -45,7 +45,6 @@ bool bProtectReadOnlyFiles); TOverlappedWriterFB(const TOverlappedWriterFB& rSrc) = delete; - ~TOverlappedWriterFB(); TOverlappedWriterFB& operator=(const TOverlappedWriterFB& rSrc) = delete; @@ -55,12 +54,6 @@ void StartThreaded(); TSubTaskBase::ESubOperationResult StopThreaded(); - TOverlappedWriterPtr GetWriter() const; - - TSubTaskBase::ESubOperationResult OnWritePossible(); - TSubTaskBase::ESubOperationResult OnWriteFailed(); - TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing); - HANDLE GetEventWritingFinishedHandle() const; HANDLE GetEventProcessingFinishedHandle() const; @@ -72,6 +65,10 @@ void WaitForOnTheFlyBuffers(); void ClearBuffers(); + TSubTaskBase::ESubOperationResult OnWritePossible(); + TSubTaskBase::ESubOperationResult OnWriteFailed(); + TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing); + private: TOverlappedWriterPtr m_spWriter; TFilesystemFileFeedbackWrapperPtr m_spDstFile; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -23,14 +23,14 @@ namespace chcore { - TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : - m_spEmptyBuffers(spUnorderedQueue), + TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : + m_spEmptyBuffers(spEmptyBuffers), m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), m_dwChunkSize(dwChunkSize) { - if(!spUnorderedQueue) - throw TCoreException(eErr_InvalidArgument, L"spUnorderedQueue is NULL", LOCATION); + if(!spEmptyBuffers) + throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); @@ -103,14 +103,14 @@ bool TReadBufferQueueWrapper::IsBufferReady() const { if(IsDataSourceFinished()) - return !m_tRetryBuffers.empty(); + return !m_tRetryBuffers.IsEmpty(); - return !m_tRetryBuffers.empty() || !m_spEmptyBuffers->IsEmpty(); + return !m_tRetryBuffers.IsEmpty() || !m_spEmptyBuffers->IsEmpty(); } size_t TReadBufferQueueWrapper::GetCount() const { - return m_tRetryBuffers.size(); + return m_tRetryBuffers.GetCount(); } void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) @@ -123,25 +123,6 @@ if(pBuffer->GetFilePosition() < m_ullDataSourceFinishedPos) { m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); - - // release superfluous finished buffers - auto iterFind = std::find_if(m_tRetryBuffers.begin(), m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); - if(iterFind == m_tRetryBuffers.end() || ++iterFind == m_tRetryBuffers.end()) - { - UpdateHasBuffers(); - return; - } - - auto iterInvalidParts = std::find_if(iterFind, m_tRetryBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); - if(iterInvalidParts != m_tRetryBuffers.end()) - throw TCoreException(eErr_InvalidArgument, L"Found non-last-parts after last-part", LOCATION); - - for(auto iter = iterFind; iter != m_tRetryBuffers.end(); ++iter) - { - m_spEmptyBuffers->Push(*iter); - } - m_tRetryBuffers.erase(iterFind, m_tRetryBuffers.end()); - UpdateHasBuffers(); } } @@ -168,7 +149,7 @@ void TReadBufferQueueWrapper::UpdateProcessingRange(unsigned long long ullNewPosition) { - if(!m_tRetryBuffers.empty()) + if(!m_tRetryBuffers.IsEmpty()) throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION); m_ullNextReadPosition = ullNewPosition; } Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -33,7 +33,7 @@ static const unsigned long long NoPosition = 0xffffffffffffffff; public: - TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize); + TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, unsigned long long ullNextReadPosition, DWORD dwChunkSize); ~TReadBufferQueueWrapper(); void Push(TOverlappedDataBuffer* pBuffer); Index: src/libchcore/TSimpleOrderedBufferQueue.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -26,7 +26,7 @@ namespace chcore { - class TSimpleOrderedBufferQueue : public std::set + class TSimpleOrderedBufferQueue : private std::set { public: void Push(TOverlappedDataBuffer* pBuffer) @@ -65,6 +65,16 @@ } clear(); } + + bool IsEmpty() const + { + return empty(); + } + + size_t GetCount() const + { + return size(); + } }; } Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -N -ra4635addad389b9e117679437a3e1b64a739ea96 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -385,7 +385,7 @@ GetTaskPropValue(rConfig), pData->bOnlyCreate); - ESubOperationResult eResult = tReaderWriter.Start(); + ESubOperationResult eResult = tReaderWriter.Process(); return eResult; } Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -32,12 +32,12 @@ UpdateHasBuffers(); - m_emptyBuffersQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_dataQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); } TWriteBufferQueueWrapper::~TWriteBufferQueueWrapper() { - m_emptyBuffersQueueConnector.disconnect(); + m_dataQueueConnector.disconnect(); } void TWriteBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) @@ -95,7 +95,7 @@ void TWriteBufferQueueWrapper::UpdateHasBuffers(bool bDataQueueHasPoppableBuffer) { - bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.empty(); + bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.IsEmpty(); m_eventHasBuffers.SetEvent(bIsReady); } Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -N -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -50,7 +50,7 @@ private: TOrderedBufferQueuePtr m_spDataQueue; // external queue of buffers to use - boost::signals2::connection m_emptyBuffersQueueConnector; + boost::signals2::connection m_dataQueueConnector; TSimpleOrderedBufferQueue m_tRetryBuffers; // internal queue of claimed buffers Index: src/libchcore/Tests/TOverlappedReaderTests.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -22,11 +22,9 @@ EXPECT_EQ(nullptr, tReader.GetFailedReadBuffer()); EXPECT_NE(nullptr, tReader.GetEventReadPossibleHandle()); - EXPECT_NE(nullptr, tReader.GetEventReadFinishedHandle()); EXPECT_NE(nullptr, tReader.GetEventReadFailedHandle()); EXPECT_TIMEOUT(tReader.GetEventReadPossibleHandle()); - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); EXPECT_TIMEOUT(tReader.GetEventReadFailedHandle()); EXPECT_FALSE(tReader.IsDataSourceFinished()); @@ -42,7 +40,6 @@ EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); EXPECT_TIMEOUT(tReader.GetEventReadFailedHandle()); - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); EXPECT_NE(nullptr, tReader.GetEmptyBuffer()); EXPECT_NE(nullptr, tReader.GetEmptyBuffer()); @@ -121,10 +118,7 @@ TOverlappedDataBuffer* pBuffer = tReader.GetEmptyBuffer(); tReader.AddFinishedReadBuffer(pBuffer); - EXPECT_SIGNALED(tReader.GetEventReadFinishedHandle()); - EXPECT_NE(nullptr, tReader.GetFinishedReadBuffer()); - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); } TEST(TOverlappedReaderTests, GetFullBuffer_WrongOrder) @@ -191,37 +185,3 @@ tReader.AddFinishedReadBuffer(pBuffer); EXPECT_THROW(tReader.AddFinishedReadBuffer(pBuffer), TCoreException); } - -TEST(TOverlappedReaderTests, GetFullBuffer_AddFullBuffer_OutOfOrder) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); - TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); - TOverlappedDataBuffer* pBuffers[ 3 ] = { tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer() }; - - pBuffers[ 0 ]->InitForRead(0, 1000); - pBuffers[ 0 ]->SetBytesTransferred(1000); - pBuffers[ 0 ]->SetStatusCode(0); - - pBuffers[ 1 ]->InitForRead(1000, 1200); - pBuffers[ 1 ]->SetBytesTransferred(1200); - pBuffers[ 1 ]->SetStatusCode(0); - - pBuffers[ 2 ]->InitForRead(2200, 1400); - pBuffers[ 2 ]->SetBytesTransferred(800); - pBuffers[ 2 ]->SetStatusCode(0); - pBuffers[ 2 ]->SetLastPart(true); - - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); - - tReader.AddFinishedReadBuffer(pBuffers[ 1 ]); - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); - - tReader.AddFinishedReadBuffer(pBuffers[ 2 ]); - EXPECT_TIMEOUT(tReader.GetEventReadFinishedHandle()); - - tReader.AddFinishedReadBuffer(pBuffers[ 0 ]); - EXPECT_SIGNALED(tReader.GetEventReadFinishedHandle()); -} Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -N -ra4635addad389b9e117679437a3e1b64a739ea96 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -909,7 +909,6 @@ - Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -N -ra4635addad389b9e117679437a3e1b64a739ea96 -rc719644bb4360fcf7ccf6f1139bcae852bd6effd --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) @@ -965,9 +965,6 @@ Source Files\Filesystems\OverlappedIO\Data Buffer\Buffers - - Source Files\Filesystems\OverlappedIO\ReaderWriter\Queues\Simple - Source Files\Filesystems\OverlappedIO\ReaderWriter\Queues\Simple