Index: src/libchcore/TEventGuard.cpp =================================================================== diff -u --- src/libchcore/TEventGuard.cpp (revision 0) +++ src/libchcore/TEventGuard.cpp (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -0,0 +1,34 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TEventGuard.h" + +namespace chcore +{ + TEventGuard::TEventGuard(TEvent& rEvent, bool bValueToSetAtExit) : + m_event(rEvent), + m_bValueToSetAtExit(bValueToSetAtExit) + { + } + + TEventGuard::~TEventGuard() + { + m_event.SetEvent(m_bValueToSetAtExit); + } +} Index: src/libchcore/TEventGuard.h =================================================================== diff -u --- src/libchcore/TEventGuard.h (revision 0) +++ src/libchcore/TEventGuard.h (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -0,0 +1,38 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TEVENTGUARD_H__ +#define __TEVENTGUARD_H__ + +#include "TEvent.h" + +namespace chcore +{ + class LIBCHCORE_API TEventGuard + { + public: + TEventGuard(TEvent& rEvent, bool bValueToSetAtExit); + ~TEventGuard(); + + private: + TEvent& m_event; + bool m_bValueToSetAtExit = false; + }; +} + +#endif Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -21,6 +21,7 @@ #include "TCoreException.h" #include "TFileInfo.h" #include "TWorkerThreadController.h" +#include "TEventGuard.h" namespace chcore { @@ -41,7 +42,8 @@ m_spStats(spStats), m_spSrcFileInfo(spSrcFileInfo), m_rThreadController(rThreadController), - m_eventDataSourceFinished(true, false) + m_eventReadingFinished(true, false), + m_eventProcessingFinished(true, false) { if(!spFeedbackHandler) throw TCoreException(eErr_InvalidArgument, L"spFeedbackHandler is NULL", LOCATION); @@ -78,6 +80,8 @@ void TOverlappedReaderFB::StartThreaded() { + TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); + m_eThreadResult = TSubTaskBase::eSubResult_Continue; // read data from file to buffer @@ -121,7 +125,7 @@ case WAIT_OBJECT_0 + eDataSourceFinished: m_eThreadResult = TSubTaskBase::eSubResult_Continue; - m_eventDataSourceFinished.SetEvent(); + m_eventReadingFinished.SetEvent(); return; default: @@ -163,11 +167,16 @@ m_spReader->ReleaseBuffers(); } - HANDLE TOverlappedReaderFB::GetEventDataSourceFinishedHandle() const + HANDLE TOverlappedReaderFB::GetEventReadingFinishedHandle() const { - return m_eventDataSourceFinished.Handle(); + return m_eventReadingFinished.Handle(); } + HANDLE TOverlappedReaderFB::GetEventProcessingFinishedHandle() const + { + return m_eventProcessingFinished.Handle(); + } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::OnReadPossible() { TOverlappedDataBuffer* pBuffer = m_spReader->GetEmptyBuffer(); Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -57,7 +57,8 @@ TOrderedBufferQueuePtr GetFinishedQueue() const; void SetReleaseMode(); - HANDLE GetEventDataSourceFinishedHandle() const; + HANDLE GetEventReadingFinishedHandle() const; + HANDLE GetEventProcessingFinishedHandle() const; private: TSubTaskBase::ESubOperationResult UpdateFileStats(); @@ -69,7 +70,8 @@ private: TOverlappedReaderPtr m_spReader; - TEvent m_eventDataSourceFinished; + TEvent m_eventReadingFinished; + TEvent m_eventProcessingFinished; IFilesystemPtr m_spFilesystem; TFilesystemFileFeedbackWrapperPtr m_spSrcFile; Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -131,6 +131,7 @@ return eResult; m_rThreadPool.QueueRead(m_spReader); + m_rThreadPool.QueueWrite(m_spWriter); // read data from file to buffer // NOTE: order is critical here: @@ -141,17 +142,15 @@ // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data enum { - eKillThread, eDataSourceFinished, eWriteFinished, eWriteFailed, eWritePossible + eKillThread, eReadingFinished, eWritingFinished }; TEvent unsignaledEvent(true, false); std::vector vHandles = { - m_rThreadController.GetKillThreadHandle(), - m_spReader->GetEventDataSourceFinishedHandle(), - m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), - m_spWriter->GetWriter()->GetEventWriteFailedHandle(), - m_spWriter->GetWriter()->GetEventWritePossibleHandle() + m_spReader->GetEventProcessingFinishedHandle(), + m_spWriter->GetEventProcessingFinishedHandle(), + m_rThreadController.GetKillThreadHandle() // kill is last to allow reader and writer to exit first }; bool bStopProcessing = false; @@ -164,28 +163,19 @@ break; case WAIT_OBJECT_0 + eKillThread: - // log - LOG_INFO(m_spLog) << L"Received kill request while copying file"; - eResult = TSubTaskBase::eSubResult_KillRequest; bStopProcessing = true; break; - case WAIT_OBJECT_0 + eWritePossible: - eResult = m_spWriter->OnWritePossible(); + case WAIT_OBJECT_0 + eWritingFinished: + eResult = m_spWriter->StopThreaded(); + vHandles[eWritingFinished] = unsignaledEvent.Handle(); + bStopProcessing = true; break; - case WAIT_OBJECT_0 + eWriteFailed: - eResult = m_spWriter->OnWriteFailed(); - break; - - case WAIT_OBJECT_0 + eWriteFinished: - eResult = m_spWriter->OnWriteFinished(bStopProcessing); - break; - - case WAIT_OBJECT_0 + eDataSourceFinished: + case WAIT_OBJECT_0 + eReadingFinished: eResult = m_spReader->StopThreaded(); - vHandles[eDataSourceFinished] = unsignaledEvent.Handle(); + vHandles[eReadingFinished] = unsignaledEvent.Handle(); break; default: Index: src/libchcore/TOverlappedThreadPool.cpp =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedThreadPool.cpp (.../TOverlappedThreadPool.cpp) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedThreadPool.cpp (.../TOverlappedThreadPool.cpp) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -41,4 +41,9 @@ { ReaderThread().PushTask(std::function(std::bind(&TOverlappedReaderFB::StartThreaded, std::ref(*spReader.get())))); } + + void TOverlappedThreadPool::QueueWrite(const TOverlappedWriterFBPtr& spWriter) + { + WriterThread().PushTask(std::function(std::bind(&TOverlappedWriterFB::StartThreaded, std::ref(*spWriter.get())))); + } } Index: src/libchcore/TOverlappedThreadPool.h =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedThreadPool.h (.../TOverlappedThreadPool.h) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedThreadPool.h (.../TOverlappedThreadPool.h) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -22,6 +22,7 @@ #include #include "TThreadedQueueRunner.h" #include "TOverlappedReaderFB.h" +#include "TOverlappedWriterFB.h" namespace chcore { @@ -37,6 +38,7 @@ TWriterThread& WriterThread(); void QueueRead(const TOverlappedReaderFBPtr& spReader); + void QueueWrite(const TOverlappedWriterFBPtr& spWriter); private: TReaderThread m_threadReader; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -21,6 +21,9 @@ #include "TSubTaskStatsInfo.h" #include "TFilesystemFileFeedbackWrapper.h" #include "TFileInfo.h" +#include "TCoreWin32Exception.h" +#include "TWorkerThreadController.h" +#include "TEventGuard.h" namespace chcore { @@ -41,7 +44,10 @@ m_spStats(spStats), m_spSrcFileInfo(spSrcFileInfo), m_spDataRange(spRange), - m_bOnlyCreate(bOnlyCreate) + m_bOnlyCreate(bOnlyCreate), + m_eventProcessingFinished(true, false), + m_eventWritingFinished(true, false), + m_rThreadController(rThreadController) { if(!spFilesystem) throw TCoreException(eErr_InvalidArgument, L"spFilesystem is NULL", LOCATION); @@ -157,9 +163,20 @@ } m_spWriter->AddEmptyBuffer(pBuffer); + return eResult; } + HANDLE TOverlappedWriterFB::GetEventWritingFinishedHandle() const + { + return m_eventWritingFinished.Handle(); + } + + HANDLE TOverlappedWriterFB::GetEventProcessingFinishedHandle() const + { + return m_eventProcessingFinished.Handle(); + } + void TOverlappedWriterFB::AdjustProcessedSize(file_size_t fsWritten) { // in case we read past the original eof, try to get new file size from filesystem @@ -271,6 +288,63 @@ return eResult; } + void TOverlappedWriterFB::StartThreaded() + { + TEventGuard guardProcessingFinished(m_eventProcessingFinished, true); + + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + + enum { eKillThread, eWriteFinished, eWriteFailed, eWritePossible }; + + std::vector vHandles = { + m_rThreadController.GetKillThreadHandle(), + m_spWriter->GetEventWriteFinishedHandle(), + m_spWriter->GetEventWriteFailedHandle(), + m_spWriter->GetEventWritePossibleHandle() + }; + + bool bStopProcessing = false; + while(!bStopProcessing && m_eThreadResult == TSubTaskBase::eSubResult_Continue) + { + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eKillThread: + m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; + bStopProcessing = true; + break; + + case WAIT_OBJECT_0 + eWritePossible: + m_eThreadResult = OnWritePossible(); + break; + + case WAIT_OBJECT_0 + eWriteFailed: + m_eThreadResult = OnWriteFailed(); + break; + + case WAIT_OBJECT_0 + eWriteFinished: + { + m_eThreadResult = OnWriteFinished(bStopProcessing); + if(m_eThreadResult == TSubTaskBase::eSubResult_Continue && bStopProcessing) + m_eventWritingFinished.SetEvent(); + break; + } + + default: + DWORD dwLastError = GetLastError(); + throw TCoreWin32Exception(eErr_UnhandledCase, dwLastError, L"Unknown result from async waiting function", LOCATION); + } + } + } + + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::StopThreaded() + { + return m_eThreadResult; + } + TOverlappedWriterPtr TOverlappedWriterFB::GetWriter() const { return m_spWriter; Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -51,6 +51,9 @@ TSubTaskBase::ESubOperationResult Start(); + void StartThreaded(); + TSubTaskBase::ESubOperationResult StopThreaded(); + TOverlappedWriterPtr GetWriter() const; void SetReleaseMode(); @@ -59,6 +62,9 @@ TSubTaskBase::ESubOperationResult OnWriteFailed(); TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing); + HANDLE GetEventWritingFinishedHandle() const; + HANDLE GetEventProcessingFinishedHandle() const; + private: void AdjustProcessedSize(file_size_t fsWritten); TSubTaskBase::ESubOperationResult AdjustFinalSize(); @@ -71,6 +77,12 @@ TOverlappedProcessorRangePtr m_spDataRange; bool m_bReleaseMode = false; bool m_bOnlyCreate = false; + + TEvent m_eventProcessingFinished; + TEvent m_eventWritingFinished; + + TWorkerThreadController& m_rThreadController; + TSubTaskBase::ESubOperationResult m_eThreadResult = TSubTaskBase::eSubResult_Continue; }; using TOverlappedWriterFBPtr = std::shared_ptr; Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -504,6 +504,7 @@ + @@ -818,6 +819,7 @@ true + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -rb0a003dc39e6d21e34779cf1cf5d8a07318c1f5f -r7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0 --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision b0a003dc39e6d21e34779cf1cf5d8a07318c1f5f) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 7892d3d5ca43da7dca4d9e8e0c321c21c3e13ea0) @@ -383,9 +383,6 @@ Source Files\Serialization\Fake - - Source Files\Tools - Source Files\Tools @@ -558,6 +555,12 @@ Source Files\Filesystems\OverlappedIO\ThreadSupport + + Source Files\Tools\Threading + + + Source Files\Tools\Threading + @@ -824,9 +827,6 @@ Source Files\Serialization\Fake - - Source Files\Tools - Source Files\Tools @@ -1010,6 +1010,12 @@ Source Files\Filesystems\OverlappedIO\ThreadSupport + + Source Files\Tools\Threading + + + Source Files\Tools\Threading +