Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -N -rf1d25f23712f5de7459f690ab51b2640d0f81b91 -r10d42e85d810f6da082cb2ce4415dcb72903410e --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision f1d25f23712f5de7459f690ab51b2640d0f81b91) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 10d42e85d810f6da082cb2ce4415dcb72903410e) @@ -20,6 +20,7 @@ #include "TOverlappedReaderFB.h" #include "TCoreException.h" #include "TFileInfo.h" +#include "TWorkerThreadController.h" namespace chcore { @@ -38,7 +39,8 @@ m_spFilesystem(spFilesystem), m_spSrcFile(), m_spStats(spStats), - m_spSrcFileInfo(spSrcFileInfo) + m_spSrcFileInfo(spSrcFileInfo), + m_rThreadController(rThreadController) { if(!spFeedbackHandler) throw TCoreException(eErr_InvalidArgument, L"spFeedbackHandler is NULL", LOCATION); @@ -63,6 +65,24 @@ return eResult; } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::StopThreaded() + { + if(m_spReadThread) + { + if(m_spReadThread->joinable()) + m_spReadThread->join(); + m_spReadThread.reset(); + } + + return m_eThreadResult; + } + + void TOverlappedReaderFB::StartThreaded() + { + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + m_spReadThread = std::make_unique(&TOverlappedReaderFB::ThreadProc, this); + } + TOverlappedReaderPtr TOverlappedReaderFB::GetReader() const { return m_spReader; @@ -91,6 +111,57 @@ return eResult; } + void TOverlappedReaderFB::ThreadProc() + { + // read data from file to buffer + // NOTE: order is critical here: + // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) + // - kill request is second, so that we can stop processing as soon as all the data is written to destination location; + // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost + // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible + // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data + enum + { + eKillThread, eReadFailed, eReadPossible, eDataSourceFinished + }; + + std::vector vHandles = { + m_rThreadController.GetKillThreadHandle(), + GetReader()->GetEventReadFailedHandle(), + GetReader()->GetEventReadPossibleHandle(), + GetReader()->GetEventDataSourceFinishedHandle() + }; + + while(m_eThreadResult == TSubTaskBase::eSubResult_Continue) + { + DWORD dwResult = WaitForMultipleObjectsEx(boost::numeric_cast(vHandles.size()), vHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eKillThread: + m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; + break; + + case WAIT_OBJECT_0 + eReadPossible: + m_eThreadResult = OnReadPossible(); + break; + + case WAIT_OBJECT_0 + eReadFailed: + m_eThreadResult = OnReadFailed(); + break; + + case WAIT_OBJECT_0 + eDataSourceFinished: + m_eThreadResult = TSubTaskBase::eSubResult_Continue; + return; + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } + } + } + void TOverlappedReaderFB::SetReleaseMode() { m_spReader->ReleaseBuffers();