Index: src/libchcore/OverlappedCallbacks.cpp =================================================================== diff -u -N -rb941384e121190b6107f1c99b3233667e3daf4ce -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision b941384e121190b6107f1c99b3233667e3daf4ce) +++ src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -19,7 +19,8 @@ #include "stdafx.h" #include "OverlappedCallbacks.h" #include "TOverlappedDataBuffer.h" -#include "TOverlappedReaderWriter.h" +#include "TOverlappedReader.h" +#include "TOverlappedWriter.h" #define STATUS_END_OF_FILE 0xc0000011 Index: src/libchcore/TFilesystemFileFeedbackWrapper.cpp =================================================================== diff -u -N -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) +++ src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -547,6 +547,76 @@ return TSubTaskBase::eSubResult_Continue; } + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::HandleReadError(TOverlappedDataBuffer& rBuffer, bool& bSkip) + { + DWORD dwLastError = rBuffer.GetErrorCode(); + + bSkip = false; + + // log + TString strFormat = _T("Error %errno while requesting read of %count bytes from source file %path (CustomCopyFileFB)"); + strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); + strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetRequestedDataSize()).c_str()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); + LOG_ERROR(m_spLog) << strFormat.c_str(); + + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eReadError, dwLastError); + switch(frResult.GetResult()) + { + case EFeedbackResult::eResult_Cancel: + return TSubTaskBase::eSubResult_CancelRequest; + + case EFeedbackResult::eResult_Retry: + return TSubTaskBase::eSubResult_Retry; + + case EFeedbackResult::eResult_Pause: + return TSubTaskBase::eSubResult_PauseRequest; + + case EFeedbackResult::eResult_Skip: + bSkip = true; + return TSubTaskBase::eSubResult_Continue; + + default: + BOOST_ASSERT(FALSE); // unknown result + throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); + } + } + + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::HandleWriteError(TOverlappedDataBuffer& rBuffer, bool& bSkip) + { + DWORD dwLastError = rBuffer.GetErrorCode(); + + bSkip = false; + + // log + TString strFormat = _T("Error %errno while trying to write %count bytes to destination file %path (CustomCopyFileFB)"); + strFormat.Replace(_T("%errno"), boost::lexical_cast(rBuffer.GetErrorCode()).c_str()); + strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetBytesTransferred()).c_str()); + strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); + LOG_ERROR(m_spLog) << strFormat.c_str(); + + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eWriteError, dwLastError); + switch(frResult.GetResult()) + { + case EFeedbackResult::eResult_Cancel: + return TSubTaskBase::eSubResult_CancelRequest; + + case EFeedbackResult::eResult_Retry: + return TSubTaskBase::eSubResult_Retry; + + case EFeedbackResult::eResult_Pause: + return TSubTaskBase::eSubResult_PauseRequest; + + case EFeedbackResult::eResult_Skip: + bSkip = true; + return TSubTaskBase::eSubResult_Continue; + + default: + BOOST_ASSERT(FALSE); // unknown result + throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); + } + } + bool TFilesystemFileFeedbackWrapper::WasKillRequested(const TFeedbackResult& rFeedbackResult) const { if(m_rThreadController.KillRequested(rFeedbackResult.IsAutomatedReply() ? m_spFeedbackHandler->GetRetryInterval() : 0)) Index: src/libchcore/TFilesystemFileFeedbackWrapper.h =================================================================== diff -u -N -r6c41e7b3cf7711c6f5027c0c0154013f50f08e7b -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision 6c41e7b3cf7711c6f5027c0c0154013f50f08e7b) +++ src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -49,6 +49,9 @@ TSubTaskBase::ESubOperationResult FinalizeFileFB(TOverlappedDataBuffer& rBuffer, bool& bSkip); + TSubTaskBase::ESubOperationResult HandleReadError(TOverlappedDataBuffer& rBuffer, bool& bSkip); + TSubTaskBase::ESubOperationResult HandleWriteError(TOverlappedDataBuffer& rBuffer, bool& bSkip); + TSmartPath GetFilePath() const { return m_spFile->GetFilePath(); } file_size_t GetFileSize() const { return m_spFile->GetFileSize(); } file_size_t GetSeekPositionForResume(file_size_t fsLastAvailablePosition) { return m_spFile->GetSeekPositionForResume(fsLastAvailablePosition); } @@ -65,6 +68,8 @@ logger::TLoggerPtr m_spLog; TWorkerThreadController& m_rThreadController; }; + + using TFilesystemFileFeedbackWrapperPtr = std::shared_ptr; } #endif Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedReaderFB.cpp (revision 0) +++ src/libchcore/TOverlappedReaderFB.cpp (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,101 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedReaderFB.h" +#include "TCoreException.h" +#include "TFileInfo.h" + +namespace chcore +{ + TOverlappedReaderFB::TOverlappedReaderFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TSubTaskStatsInfoPtr& spStats, + const TFileInfoPtr& spSrcFileInfo, + const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, + unsigned long long ullFilePos, DWORD dwChunkSize) : + m_spReader(std::make_shared(spLogFileData, spEmptyBuffers, ullFilePos, dwChunkSize)), + m_spSrcFile(spSrcFile), + m_spStats(spStats), + m_spSrcFileInfo(spSrcFileInfo) + { + if(!spSrcFile) + throw TCoreException(eErr_InvalidArgument, L"spSrcFile is NULL", LOCATION); + if(!spStats) + throw TCoreException(eErr_InvalidArgument, L"spStats is NULL", LOCATION); + if(!spSrcFileInfo) + throw TCoreException(eErr_InvalidArgument, L"spSrcFileInfo is NULL", LOCATION); + } + + TOverlappedReaderFB::~TOverlappedReaderFB() + { + } + + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::OnReadPossible(bool& bStopProcessing, bool& bProcessedFlag) + { + TOverlappedDataBuffer* pBuffer = m_spReader->GetEmptyBuffer(); + if(!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Read was possible, but no buffer is available", LOCATION); + + bool bSkip = false; + TSubTaskBase::ESubOperationResult eResult = m_spSrcFile->ReadFileFB(*pBuffer, bSkip); + if(eResult != TSubTaskBase::eSubResult_Continue) + { + m_spReader->AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + } + else if(bSkip) + { + m_spReader->AddEmptyBuffer(pBuffer, false); + + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), m_spSrcFileInfo->GetLength64()); + + bProcessedFlag = false; + bStopProcessing = true; + } + + return eResult; + } + + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::OnReadFailed(bool& bStopProcessing, bool& bProcessedFlag) + { + TOverlappedDataBuffer* pBuffer = m_spReader->GetFailedReadBuffer(); + if(!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Cannot retrieve failed read buffer", LOCATION); + + // read error encountered - handle it + bool bSkip = false; + TSubTaskBase::ESubOperationResult eResult = m_spSrcFile->HandleReadError(*pBuffer, bSkip); + if(eResult == TSubTaskBase::eSubResult_Retry) + m_spReader->AddEmptyBuffer(pBuffer, true); + else if(eResult != TSubTaskBase::eSubResult_Continue) + { + m_spReader->AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + } + else if(bSkip) + { + m_spReader->AddEmptyBuffer(pBuffer, false); + + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), m_spSrcFileInfo->GetLength64()); + + bProcessedFlag = false; + bStopProcessing = true; + } + + return eResult; + } +} Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -N --- src/libchcore/TOverlappedReaderFB.h (revision 0) +++ src/libchcore/TOverlappedReaderFB.h (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,53 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDREADERFB_H__ +#define __TOVERLAPPEDREADERFB_H__ + +#include "TOverlappedReader.h" +#include "TFilesystemFileFeedbackWrapper.h" + +namespace chcore +{ + class TFilesystemFileFeedbackWrapper; + + class TOverlappedReaderFB + { + public: + TOverlappedReaderFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TSubTaskStatsInfoPtr& spStats, + const TFileInfoPtr& spSrcFileInfo, + const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, + unsigned long long ullFilePos, DWORD dwChunkSize); + ~TOverlappedReaderFB(); + + TOverlappedReaderPtr GetReader() const { return m_spReader; } + + TSubTaskBase::ESubOperationResult OnReadPossible(bool& bStopProcessing, bool& bProcessedFlag); + TSubTaskBase::ESubOperationResult OnReadFailed(bool& bStopProcessing, bool& bProcessedFlag); + + private: + TOverlappedReaderPtr m_spReader; + TFilesystemFileFeedbackWrapperPtr m_spSrcFile; + TSubTaskStatsInfoPtr m_spStats; + TFileInfoPtr m_spSrcFileInfo; + }; + + using TOverlappedReaderFBPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedReaderWriter.cpp (revision b941384e121190b6107f1c99b3233667e3daf4ce) +++ src/libchcore/TOverlappedReaderWriter.cpp (revision 0) @@ -1,71 +0,0 @@ -// ============================================================================ -// Copyright (C) 2001-2015 by Jozef Starosczyk -// ixen@copyhandler.com -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU Library General Public License -// (version 2) as published by the Free Software Foundation; -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this program; if not, write to the -// Free Software Foundation, Inc., -// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -// ============================================================================ -#include "stdafx.h" -#include "TOverlappedReaderWriter.h" -#include "TOverlappedDataBuffer.h" -#include "TCoreException.h" -#include "ErrorCodes.h" -#include -#include - -namespace chcore -{ - TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spMemoryPool, - unsigned long long ullFilePos, DWORD dwChunkSize) : - m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_spMemoryPool(spMemoryPool), - m_spReader(std::make_shared(spLogFileData, spMemoryPool->GetBufferList(), ullFilePos, dwChunkSize)), - m_spWriter(std::make_shared(spLogFileData, m_spReader->GetFinishedQueue(), ullFilePos, spMemoryPool->GetBufferList())) - { - if(!spMemoryPool) - throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); - } - - TOverlappedReaderWriter::~TOverlappedReaderWriter() - { - } - - void TOverlappedReaderWriter::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) - { - m_spReader->ReleaseBuffers(); - m_spWriter->ReleaseBuffers(); - - enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; - std::array arrHandles = { hKillEvent, m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent() }; - - bool bExit = false; - while (!bExit) - { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch (dwResult) - { - case STATUS_USER_APC: - break; - - case WAIT_OBJECT_0 + eAllBuffersReturned: - bExit = true; - break; - - case WAIT_OBJECT_0 + eKillThread: - bExit = true; - break; - } - } - } -} Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedReaderWriterFB.cpp (revision 0) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,153 @@ +// ============================================================================ +// Copyright (C) 2001-2015 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedReaderWriterFB.h" +#include "TCoreException.h" +#include "ErrorCodes.h" +#include +#include + +namespace chcore +{ + TOverlappedReaderWriterFB::TOverlappedReaderWriterFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TFileInfoPtr& spSrcFileInfo, + const TFilesystemFileFeedbackWrapperPtr& spDstFile, + const TSubTaskStatsInfoPtr& spStats, + const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spMemoryPool, unsigned long long ullFilePos, DWORD dwChunkSize) : + + m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), + m_spMemoryPool(spMemoryPool), + m_spReader(std::make_shared(spSrcFile, spStats, spSrcFileInfo, spLogFileData, spMemoryPool->GetBufferList(), ullFilePos, dwChunkSize)), + m_spWriter(std::make_shared(spSrcFile, spDstFile, spStats, spSrcFileInfo, spLogFileData, m_spReader->GetReader()->GetFinishedQueue(), ullFilePos, spMemoryPool->GetBufferList())) + { + if(!spMemoryPool) + throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); + } + + TOverlappedReaderWriterFB::~TOverlappedReaderWriterFB() + { + } + + void TOverlappedReaderWriterFB::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) + { + m_spReader->GetReader()->ReleaseBuffers(); + m_spWriter->GetWriter()->ReleaseBuffers(); + + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_spMemoryPool->GetBufferList()->GetAllBuffersAccountedForEvent() }; + + bool bExit = false; + while (!bExit) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; + + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } + } + } + + TSubTaskBase::ESubOperationResult TOverlappedReaderWriterFB::Start(HANDLE hKill, bool& bProcessed) + { + // read data from file to buffer + // NOTE: order is critical here: + // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) + // - kill request is second, so that we can stop processing as soon as all the data is written to destination location; + // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost + // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible + // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data + enum + { + eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount + }; + std::array arrHandles = { + hKill, + m_spWriter->GetWriter()->GetEventWriteFinishedHandle(), + m_spWriter->GetWriter()->GetEventWriteFailedHandle(), + m_spWriter->GetWriter()->GetEventWritePossibleHandle(), + m_spReader->GetReader()->GetEventReadFailedHandle(), + m_spReader->GetReader()->GetEventReadPossibleHandle() + }; + + TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; + bool bStopProcessing = false; + while(!bStopProcessing) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch(dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eKillThread: + { + // log + LOG_INFO(m_spLog) << L"Received kill request while copying file"; + + eResult = TSubTaskBase::eSubResult_KillRequest; + bStopProcessing = true; + break; + } + + case WAIT_OBJECT_0 + eReadPossible: + { + eResult = m_spReader->OnReadPossible(bStopProcessing, bProcessed); + break; + } + case WAIT_OBJECT_0 + eReadFailed: + { + eResult = m_spReader->OnReadFailed(bStopProcessing, bProcessed); + break; + } + case WAIT_OBJECT_0 + eWritePossible: + { + eResult = m_spWriter->OnWritePossible(bStopProcessing, bProcessed); + break; + } + + case WAIT_OBJECT_0 + eWriteFailed: + { + eResult = m_spWriter->OnWriteFailed(bStopProcessing, bProcessed); + break; + } + + case WAIT_OBJECT_0 + eWriteFinished: + { + eResult = m_spWriter->OnWriteFinished(bStopProcessing, bProcessed); + break; + } + + default: + throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); + } + } + + WaitForMissingBuffersAndResetState(hKill); + + return eResult; + } +} Index: src/libchcore/TOverlappedReaderWriterFB.h =================================================================== diff -u -N --- src/libchcore/TOverlappedReaderWriterFB.h (revision 0) +++ src/libchcore/TOverlappedReaderWriterFB.h (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,61 @@ +// ============================================================================ +// Copyright (C) 2001-2014 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDREADERWRITERFB_H__ +#define __TOVERLAPPEDREADERWRITER_H__ + +#include "../liblogger/TLogFileData.h" +#include "../liblogger/TLogger.h" +#include "TOverlappedMemoryPool.h" +#include "TOverlappedReaderFB.h" +#include "TOverlappedWriterFB.h" + +namespace chcore +{ + class TOverlappedReaderWriterFB + { + public: + explicit TOverlappedReaderWriterFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TFileInfoPtr& spSrcFileInfo, + const TFilesystemFileFeedbackWrapperPtr& spDstFile, + const TSubTaskStatsInfoPtr& spStats, + const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spBuffers, + unsigned long long ullFilePos, DWORD dwChunkSize); + TOverlappedReaderWriterFB(const TOverlappedReaderWriterFB&) = delete; + ~TOverlappedReaderWriterFB(); + + TOverlappedReaderWriterFB& operator=(const TOverlappedReaderWriterFB&) = delete; + + TSubTaskBase::ESubOperationResult Start(HANDLE hKill, bool& bProcessed); + + // reader/writer + TOverlappedReaderFBPtr GetReader() const { return m_spReader; } + TOverlappedWriterFBPtr GetWriter() const { return m_spWriter; } + + // event access + void WaitForMissingBuffersAndResetState(HANDLE hKillEvent); + + private: + logger::TLoggerPtr m_spLog; + + TOverlappedMemoryPoolPtr m_spMemoryPool; + TOverlappedReaderFBPtr m_spReader; + TOverlappedWriterFBPtr m_spWriter; + }; +} + +#endif Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedWriterFB.cpp (revision 0) +++ src/libchcore/TOverlappedWriterFB.cpp (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,191 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedWriterFB.h" +#include "TSubTaskStatsInfo.h" +#include "TFilesystemFileFeedbackWrapper.h" +#include "TFileInfo.h" + +namespace chcore +{ + TOverlappedWriterFB::TOverlappedWriterFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TFilesystemFileFeedbackWrapperPtr& spDstFile, + const TSubTaskStatsInfoPtr& spStats, + const TFileInfoPtr& spSrcFileInfo, + const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, + unsigned long long ullFilePos, const TBufferListPtr& spEmptyBuffers) : + m_spWriter(std::make_shared(spLogFileData, spBuffersToWrite, ullFilePos, spEmptyBuffers)), + m_spSrcFile(spSrcFile), + m_spDstFile(spDstFile), + m_spStats(spStats), + m_spSrcFileInfo(spSrcFileInfo), + m_spEmptyBuffers(spEmptyBuffers) + { + if(!spDstFile) + throw TCoreException(eErr_InvalidArgument, L"spDstFile is NULL", LOCATION); + if(!spStats) + throw TCoreException(eErr_InvalidArgument, L"spStats is NULL", LOCATION); + if(!spSrcFileInfo) + throw TCoreException(eErr_InvalidArgument, L"spSrcFileInfo is NULL", LOCATION); + if(!spEmptyBuffers) + throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); + } + + TOverlappedWriterFB::~TOverlappedWriterFB() + { + } + + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::OnWritePossible(bool& bStopProcessing, bool& bProcessedFlag) + { + TOverlappedDataBuffer* pBuffer = m_spWriter->GetWriteBuffer(); + if(!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); + + bool bSkip = false; + TSubTaskBase::ESubOperationResult eResult = m_spDstFile->WriteFileFB(*pBuffer, bSkip); + if(eResult != TSubTaskBase::eSubResult_Continue) + { + m_spEmptyBuffers->Push(pBuffer); + bStopProcessing = true; + } + else if(bSkip) + { + m_spEmptyBuffers->Push(pBuffer); + + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), m_spSrcFileInfo->GetLength64()); + + bProcessedFlag = false; + bStopProcessing = true; + } + + return eResult; + } + + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::OnWriteFailed(bool& bStopProcessing, bool& bProcessedFlag) + { + TOverlappedDataBuffer* pBuffer = m_spWriter->GetFailedWriteBuffer(); + if(!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Failed to retrieve write failed buffer", LOCATION); + + bool bSkip = false; + TSubTaskBase::ESubOperationResult eResult = m_spDstFile->HandleWriteError(*pBuffer, bSkip); + if(eResult == TSubTaskBase::eSubResult_Retry) + m_spWriter->AddFailedWriteBuffer(pBuffer); + else if(eResult != TSubTaskBase::eSubResult_Continue) + { + m_spEmptyBuffers->Push(pBuffer); + bStopProcessing = true; + } + else if(bSkip) + { + m_spEmptyBuffers->Push(pBuffer); + + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), m_spSrcFileInfo->GetLength64()); + + bProcessedFlag = false; + bStopProcessing = true; + } + + return eResult; + } + + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::OnWriteFinished(bool& bStopProcessing, bool& bProcessedFlag) + { + TOverlappedDataBuffer* pBuffer = m_spWriter->GetFinishedBuffer(); + if(!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); + + bool bSkip = false; + TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; + if(pBuffer->IsLastPart()) + { + eResult = m_spDstFile->FinalizeFileFB(*pBuffer, bSkip); + if(eResult != TSubTaskBase::eSubResult_Continue) + { + m_spEmptyBuffers->Push(pBuffer); + bStopProcessing = true; + return eResult; + } + else if(bSkip) + { + m_spEmptyBuffers->Push(pBuffer); + + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), m_spSrcFileInfo->GetLength64()); + + bProcessedFlag = false; + bStopProcessing = true; + return eResult; + } + } + + file_size_t fsWritten = pBuffer->GetRealDataSize(); + + // in case we read past the original eof, try to get new file size from filesystem + AdjustProcessedSize(fsWritten); + + // stop iterating through file + bStopProcessing = pBuffer->IsLastPart(); + if(bStopProcessing) + { + m_spWriter->MarkAsFinalized(pBuffer); + + // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for + AdjustFinalSize(); + + bProcessedFlag = true; + m_spStats->ResetCurrentItemProcessedSize(); + } + + m_spEmptyBuffers->Push(pBuffer); + return eResult; + } + + void TOverlappedWriterFB::AdjustProcessedSize(file_size_t fsWritten) + { + // in case we read past the original eof, try to get new file size from filesystem + if(m_spStats->WillAdjustProcessedSizeExceedTotalSize(0, fsWritten)) + { + file_size_t fsNewSize = m_spSrcFile->GetFileSize(); + if(fsNewSize == m_spSrcFileInfo->GetLength64()) + throw TCoreException(eErr_InternalProblem, L"Read more data from file than it really contained. Possible destination file corruption.", LOCATION); + + m_spStats->AdjustTotalSize(m_spSrcFileInfo->GetLength64(), fsNewSize); + m_spSrcFileInfo->SetLength64(m_spStats->GetCurrentItemTotalSize()); + } + + m_spStats->AdjustProcessedSize(0, fsWritten); + } + + void TOverlappedWriterFB::AdjustFinalSize() + { + unsigned long long ullCITotalSize = m_spStats->GetCurrentItemTotalSize(); + unsigned long long ullCIProcessedSize = m_spStats->GetCurrentItemProcessedSize(); + if(ullCIProcessedSize < ullCITotalSize) + { + file_size_t fsNewSize = m_spSrcFile->GetFileSize(); + if(fsNewSize == m_spSrcFileInfo->GetLength64()) + throw TCoreException(eErr_InternalProblem, L"Read less data from file than it really contained. Possible destination file corruption.", LOCATION); + + if(fsNewSize != ullCIProcessedSize) + throw TCoreException(eErr_InternalProblem, L"Updated file size still does not match the count of data read. Possible destination file corruption.", LOCATION); + + m_spStats->AdjustTotalSize(ullCITotalSize, fsNewSize); + m_spSrcFileInfo->SetLength64(fsNewSize); + } + } +} Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -N --- src/libchcore/TOverlappedWriterFB.h (revision 0) +++ src/libchcore/TOverlappedWriterFB.h (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -0,0 +1,59 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDWRITERFB_H__ +#define __TOVERLAPPEDWRITERFB_H__ + +#include "TOverlappedWriter.h" +#include "TFilesystemFileFeedbackWrapper.h" + +namespace chcore +{ + class TOverlappedWriterFB + { + public: + TOverlappedWriterFB(const TFilesystemFileFeedbackWrapperPtr& spSrcFile, const TFilesystemFileFeedbackWrapperPtr& spDstFile, const TSubTaskStatsInfoPtr& spStats, + const TFileInfoPtr& spSrcFileInfo, + const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, + unsigned long long ullFilePos, const TBufferListPtr& spEmptyBuffers); + ~TOverlappedWriterFB(); + + TOverlappedWriterPtr GetWriter() const { return m_spWriter; } + + TSubTaskBase::ESubOperationResult OnWritePossible(bool& bStopProcessing, bool& bProcessedFlag); + TSubTaskBase::ESubOperationResult OnWriteFailed(bool& bStopProcessing, bool& bProcessedFlag); + TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing, bool& bProcessedFlag); + + private: + void AdjustProcessedSize(file_size_t fsWritten); + void AdjustFinalSize(); + + private: + TOverlappedWriterPtr m_spWriter; + TFilesystemFileFeedbackWrapperPtr m_spSrcFile; + TFilesystemFileFeedbackWrapperPtr m_spDstFile; + TSubTaskStatsInfoPtr m_spStats; + TFileInfoPtr m_spSrcFileInfo; + TFileInfoPtr m_spDstFileInfo; + TBufferListPtr m_spEmptyBuffers; + }; + + using TOverlappedWriterFBPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -N -rb941384e121190b6107f1c99b3233667e3daf4ce -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision b941384e121190b6107f1c99b3233667e3daf4ce) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -40,13 +40,12 @@ #include "TOverlappedMemoryPool.h" #include "TOverlappedDataBuffer.h" #include "RoundingFunctions.h" -#include #include "TTaskConfigBufferSizes.h" #include "TFileException.h" #include "TFilesystemFeedbackWrapper.h" #include "TFilesystemFileFeedbackWrapper.h" #include "TDestinationPathProvider.h" -#include "TOverlappedReaderWriter.h" +#include "TOverlappedReaderWriterFB.h" namespace chcore { @@ -71,14 +70,14 @@ TSubTaskCopyMove::TSubTaskCopyMove(TSubTaskContext& rContext) : TSubTaskBase(rContext), - m_tSubTaskStats(eSubOperation_Copying), + m_spSubTaskStats(std::make_shared(eSubOperation_Copying)), m_spLog(std::make_unique(rContext.GetLogFileData(), L"ST-CopyMove")) { } void TSubTaskCopyMove::Reset() { - m_tSubTaskStats.Clear(); + m_spSubTaskStats->Clear(); } void TSubTaskCopyMove::InitBeforeExec() @@ -88,21 +87,21 @@ file_count_t fcCount = rFilesCache.GetSize(); if(fcCount == 0) { - m_tSubTaskStats.SetCurrentPath(TString()); + m_spSubTaskStats->SetCurrentPath(TString()); return; } - file_count_t fcIndex = m_tSubTaskStats.GetCurrentIndex(); + file_count_t fcIndex = m_spSubTaskStats->GetCurrentIndex(); if(fcIndex >= fcCount) fcIndex = 0; TFileInfoPtr spFileInfo = rFilesCache.GetAt(fcIndex); - m_tSubTaskStats.SetCurrentPath(spFileInfo->GetFullFilePath().ToString()); + m_spSubTaskStats->SetCurrentPath(spFileInfo->GetFullFilePath().ToString()); } TSubTaskBase::ESubOperationResult TSubTaskCopyMove::Exec(const IFeedbackHandlerPtr& spFeedback) { - TScopedRunningTimeTracker guard(m_tSubTaskStats); + TScopedRunningTimeTracker guard(*m_spSubTaskStats); TFeedbackHandlerWrapperPtr spFeedbackHandler(std::make_shared(spFeedback, guard)); TFileInfoArray& rFilesCache = GetContext().GetFilesCache(); @@ -120,27 +119,27 @@ // initialize stats if not resuming (when resuming we have already initialized // the stats once - it is being restored in Load() too). - if (!m_tSubTaskStats.IsInitialized()) - m_tSubTaskStats.Init(TBufferSizes::eBuffer_Default, rFilesCache.GetSize(), 0, rFilesCache.CalculateTotalSize(), rFilesCache.CalculatePartialSize(m_tSubTaskStats.GetCurrentIndex()), TString()); + if (!m_spSubTaskStats->IsInitialized()) + m_spSubTaskStats->Init(TBufferSizes::eBuffer_Default, rFilesCache.GetSize(), 0, rFilesCache.CalculateTotalSize(), rFilesCache.CalculatePartialSize(m_spSubTaskStats->GetCurrentIndex()), TString()); else { - _ASSERTE(rFilesCache.GetSize() == m_tSubTaskStats.GetTotalCount()); - if (rFilesCache.GetSize() != m_tSubTaskStats.GetTotalCount()) + _ASSERTE(rFilesCache.GetSize() == m_spSubTaskStats->GetTotalCount()); + if (rFilesCache.GetSize() != m_spSubTaskStats->GetTotalCount()) throw TCoreException(eErr_InternalProblem, L"Size of files' cache differs from stats information", LOCATION); } // now it's time to check if there is enough space on destination device - unsigned long long ullNeededSize = rFilesCache.CalculateTotalSize() - rFilesCache.CalculatePartialSize(m_tSubTaskStats.GetCurrentIndex()); + unsigned long long ullNeededSize = rFilesCache.CalculateTotalSize() - rFilesCache.CalculatePartialSize(m_spSubTaskStats->GetCurrentIndex()); TSmartPath pathSingleSrc = spSrcPaths->GetAt(0)->GetSrcPath(); TSubTaskBase::ESubOperationResult eResult = tFilesystemFBWrapper.CheckForFreeSpaceFB(pathSingleSrc, pathDestination, ullNeededSize); if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; // begin at index which wasn't processed previously file_count_t fcSize = rFilesCache.GetSize(); - file_count_t fcIndex = m_tSubTaskStats.GetCurrentIndex(); - unsigned long long ullCurrentItemProcessedSize = m_tSubTaskStats.GetCurrentItemProcessedSize(); - bool bCurrentFileSilentResume = m_tSubTaskStats.CanCurrentItemSilentResume(); + file_count_t fcIndex = m_spSubTaskStats->GetCurrentIndex(); + unsigned long long ullCurrentItemProcessedSize = m_spSubTaskStats->GetCurrentItemProcessedSize(); + bool bCurrentFileSilentResume = m_spSubTaskStats->CanCurrentItemSilentResume(); // create a buffer of size m_nBufferSize CUSTOM_COPY_PARAMS ccp; @@ -186,12 +185,12 @@ TSmartPath pathCurrent = spFileInfo->GetFullFilePath(); // new stats - m_tSubTaskStats.SetCurrentIndex(fcIndex); - m_tSubTaskStats.SetProcessedCount(fcIndex); - m_tSubTaskStats.SetCurrentPath(pathCurrent.ToString()); - m_tSubTaskStats.SetCurrentItemSizes(ullCurrentItemProcessedSize, spFileInfo->GetLength64()); // preserve the processed size for the first item + m_spSubTaskStats->SetCurrentIndex(fcIndex); + m_spSubTaskStats->SetProcessedCount(fcIndex); + m_spSubTaskStats->SetCurrentPath(pathCurrent.ToString()); + m_spSubTaskStats->SetCurrentItemSizes(ullCurrentItemProcessedSize, spFileInfo->GetLength64()); // preserve the processed size for the first item ullCurrentItemProcessedSize = 0; // in next iteration we're not resuming anymore - m_tSubTaskStats.SetCurrentItemSilentResume(bCurrentFileSilentResume); + m_spSubTaskStats->SetCurrentItemSilentResume(bCurrentFileSilentResume); bCurrentFileSilentResume = false; // if the file was already processed (e.g. by fast-move), just consider the file skipped @@ -280,9 +279,9 @@ } // stats - m_tSubTaskStats.SetCurrentIndex(fcIndex); - m_tSubTaskStats.SetProcessedCount(fcIndex); - m_tSubTaskStats.SetCurrentPath(TString()); + m_spSubTaskStats->SetCurrentIndex(fcIndex); + m_spSubTaskStats->SetProcessedCount(fcIndex); + m_spSubTaskStats->SetCurrentPath(TString()); // log LOG_INFO(m_spLog) << _T("Finished processing in ProcessFiles"); @@ -292,7 +291,7 @@ void TSubTaskCopyMove::GetStatsSnapshot(TSubTaskStatsSnapshotPtr& spStats) const { - m_tSubTaskStats.GetSnapshot(spStats); + m_spSubTaskStats->GetSnapshot(spStats); // if this subtask is not started yet, try to get the most fresh information for processing if(!spStats->IsRunning() && spStats->GetTotalCount() == 0 && spStats->GetTotalSize() == 0) { @@ -350,11 +349,11 @@ IFilesystemFilePtr fileSrc = spFilesystem->CreateFileObject(pData->spSrcFile->GetFullFilePath(), bNoBuffer); IFilesystemFilePtr fileDst = spFilesystem->CreateFileObject(pData->pathDstFile, bNoBuffer); - TFilesystemFileFeedbackWrapper srcFileWrapper(fileSrc, spFeedbackHandler, GetContext().GetLogFileData(), rThreadController, spFilesystem); - TFilesystemFileFeedbackWrapper dstFileWrapper(fileDst, spFeedbackHandler, GetContext().GetLogFileData(), rThreadController, spFilesystem); + TFilesystemFileFeedbackWrapperPtr spSrcFileWrapper(std::make_shared(fileSrc, spFeedbackHandler, GetContext().GetLogFileData(), rThreadController, spFilesystem)); + TFilesystemFileFeedbackWrapperPtr spDstFileWrapper(std::make_shared(fileDst, spFeedbackHandler, GetContext().GetLogFileData(), rThreadController, spFilesystem)); bool bSkip = false; - TSubTaskBase::ESubOperationResult eResult = OpenSrcAndDstFilesFB(srcFileWrapper, dstFileWrapper, pData, bSkip); + TSubTaskBase::ESubOperationResult eResult = OpenSrcAndDstFilesFB(*spSrcFileWrapper, *spDstFileWrapper, pData, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; else if(bSkip) @@ -367,256 +366,26 @@ // establish count of data to read TBufferSizes::EBufferType eBufferIndex = GetBufferIndex(pData->tBufferSizes, pData->spSrcFile); - m_tSubTaskStats.SetCurrentBufferIndex(eBufferIndex); + m_spSubTaskStats->SetCurrentBufferIndex(eBufferIndex); // determine buffer size to use for the operation DWORD dwCurrentBufferSize = RoundUp(pData->tBufferSizes.GetSizeByType(eBufferIndex), IFilesystemFile::MaxSectorSize); // resume copying from the position after the last processed mark; the proper value should be set // by OpenSrcAndDstFilesFB() - that includes the no-buffering setting if required. - unsigned long long ullNextReadPos = m_tSubTaskStats.GetCurrentItemProcessedSize(); + unsigned long long ullNextReadPos = m_spSubTaskStats->GetCurrentItemProcessedSize(); - TOverlappedReaderWriter tReaderWriter(m_spLog->GetLogFileData(), pData->spMemoryPool, ullNextReadPos, dwCurrentBufferSize); + TOverlappedReaderWriterFB tReaderWriter(spSrcFileWrapper, pData->spSrcFile, spDstFileWrapper, m_spSubTaskStats, m_spLog->GetLogFileData(), + pData->spMemoryPool, ullNextReadPos, dwCurrentBufferSize); - // read data from file to buffer - // NOTE: order is critical here: - // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) - // - kill request is second, so that we can stop processing as soon as all the data is written to destination location; - // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost - // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible - // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data - enum - { - eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount - }; - std::array arrHandles = { - rThreadController.GetKillThreadHandle(), - tReaderWriter.GetWriter()->GetEventWriteFinishedHandle(), - tReaderWriter.GetWriter()->GetEventWriteFailedHandle(), - tReaderWriter.GetWriter()->GetEventWritePossibleHandle(), - tReaderWriter.GetReader()->GetEventReadFailedHandle(), - tReaderWriter.GetReader()->GetEventReadPossibleHandle() - }; + eResult = tReaderWriter.Start(rThreadController.GetKillThreadHandle(), pData->bProcessed); - bool bStopProcessing = false; - while(!bStopProcessing) - { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch(dwResult) - { - case STATUS_USER_APC: - break; - - case WAIT_OBJECT_0 + eKillThread: - { - // log - LOG_INFO(m_spLog) << L"Kill request while main copying file " << pData->spSrcFile->GetFullFilePath().ToString() << - L" -> " << pData->pathDstFile.ToString(); - - eResult = TSubTaskBase::eSubResult_KillRequest; - bStopProcessing = true; - break; - } - - case WAIT_OBJECT_0 + eReadPossible: - { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetReader()->GetEmptyBuffer(); - if (!pBuffer) - throw TCoreException(eErr_InternalProblem, L"Read was possible, but no buffer is available", LOCATION); - - eResult = srcFileWrapper.ReadFileFB(*pBuffer, bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - break; - } - case WAIT_OBJECT_0 + eReadFailed: - { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetReader()->GetFailedReadBuffer(); - if (!pBuffer) - throw TCoreException(eErr_InternalProblem, L"Cannot retrieve failed read buffer", LOCATION); - - // read error encountered - handle it - eResult = HandleReadError(spFeedbackHandler, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); - if(eResult == TSubTaskBase::eSubResult_Retry) - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, true); - else if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - - break; - } - case WAIT_OBJECT_0 + eWritePossible: - { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetWriter()->GetWriteBuffer(); - if (!pBuffer) - throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); - - eResult = dstFileWrapper.WriteFileFB(*pBuffer, bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - - break; - } - - case WAIT_OBJECT_0 + eWriteFailed: - { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetWriter()->GetFailedWriteBuffer(); - if (!pBuffer) - throw TCoreException(eErr_InternalProblem, L"Failed to retrieve write failed buffer", LOCATION); - - eResult = HandleWriteError(spFeedbackHandler, *pBuffer, pData->pathDstFile, bSkip); - if(eResult == TSubTaskBase::eSubResult_Retry) - tReaderWriter.GetWriter()->AddFailedWriteBuffer(pBuffer); - else if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - - break; - } - - case WAIT_OBJECT_0 + eWriteFinished: - { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetWriter()->GetFinishedBuffer(); - if (!pBuffer) - throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); - - if(pBuffer->IsLastPart()) - { - eResult = dstFileWrapper.FinalizeFileFB(*pBuffer, bSkip); - if (eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - bStopProcessing = true; - break; - } - else if (bSkip) - { - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - break; - } - } - - file_size_t fsWritten = pBuffer->GetRealDataSize(); - - // in case we read past the original eof, try to get new file size from filesystem - AdjustProcessedSize(fsWritten, pData->spSrcFile, fileSrc); - - // stop iterating through file - bStopProcessing = pBuffer->IsLastPart(); - if(bStopProcessing) - { - tReaderWriter.GetWriter()->MarkAsFinalized(pBuffer); - - // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for - AdjustFinalSize(pData->spSrcFile, fileSrc); - - pData->bProcessed = true; - m_tSubTaskStats.ResetCurrentItemProcessedSize(); - } - - tReaderWriter.GetReader()->AddEmptyBuffer(pBuffer, false); - - break; - } - - default: - throw TCoreException(eErr_UnhandledCase, L"Unknown result from async waiting function", LOCATION); - } - } - - tReaderWriter.WaitForMissingBuffersAndResetState(rThreadController.GetKillThreadHandle()); - return eResult; } - void TSubTaskCopyMove::AdjustProcessedSize(file_size_t fsWritten, const TFileInfoPtr& spSrcFileInfo, const IFilesystemFilePtr& spSrcFile) - { - // in case we read past the original eof, try to get new file size from filesystem - if (m_tSubTaskStats.WillAdjustProcessedSizeExceedTotalSize(0, fsWritten)) - { - file_size_t fsNewSize = spSrcFile->GetFileSize(); - if (fsNewSize == spSrcFileInfo->GetLength64()) - throw TCoreException(eErr_InternalProblem, L"Read more data from file than it really contained. Possible destination file corruption.", LOCATION); - - m_tSubTaskStats.AdjustTotalSize(spSrcFileInfo->GetLength64(), fsNewSize); - spSrcFileInfo->SetLength64(m_tSubTaskStats.GetCurrentItemTotalSize()); - } - - m_tSubTaskStats.AdjustProcessedSize(0, fsWritten); - } - - void TSubTaskCopyMove::AdjustFinalSize(const TFileInfoPtr& spSrcFileInfo, const IFilesystemFilePtr& spSrcFile) - { - unsigned long long ullCITotalSize = m_tSubTaskStats.GetCurrentItemTotalSize(); - unsigned long long ullCIProcessedSize = m_tSubTaskStats.GetCurrentItemProcessedSize(); - if (ullCIProcessedSize < ullCITotalSize) - { - file_size_t fsNewSize = spSrcFile->GetFileSize(); - if (fsNewSize == spSrcFileInfo->GetLength64()) - throw TCoreException(eErr_InternalProblem, L"Read less data from file than it really contained. Possible destination file corruption.", LOCATION); - - if (fsNewSize != ullCIProcessedSize) - throw TCoreException(eErr_InternalProblem, L"Updated file size still does not match the count of data read. Possible destination file corruption.", LOCATION); - - m_tSubTaskStats.AdjustTotalSize(ullCITotalSize, fsNewSize); - spSrcFileInfo->SetLength64(fsNewSize); - } - } - void TSubTaskCopyMove::AdjustProcessedSizeForSkip(const TFileInfoPtr& spSrcFileInfo) { - m_tSubTaskStats.AdjustProcessedSize(m_tSubTaskStats.GetCurrentItemProcessedSize(), spSrcFileInfo->GetLength64()); + m_spSubTaskStats->AdjustProcessedSize(m_spSubTaskStats->GetCurrentItemProcessedSize(), spSrcFileInfo->GetLength64()); } TSubTaskCopyMove::ESubOperationResult TSubTaskCopyMove::OpenSrcAndDstFilesFB(TFilesystemFileFeedbackWrapper& rSrcFile, TFilesystemFileFeedbackWrapper& rDstFile, @@ -627,7 +396,7 @@ bSkip = false; - unsigned long long ullProcessedSize = m_tSubTaskStats.GetCurrentItemProcessedSize(); + unsigned long long ullProcessedSize = m_spSubTaskStats->GetCurrentItemProcessedSize(); // first open the source file and handle any failures TSubTaskCopyMove::ESubOperationResult eResult = rSrcFile.OpenSourceFileFB(); @@ -652,7 +421,7 @@ file_size_t fsOldSize = pData->spSrcFile->GetLength64(); if(fsNewSize != fsOldSize) { - m_tSubTaskStats.AdjustTotalSize(fsOldSize, fsNewSize); + m_spSubTaskStats->AdjustTotalSize(fsOldSize, fsNewSize); pData->spSrcFile->SetLength64(fsNewSize); } @@ -662,7 +431,7 @@ // try to resume if possible bool bResumeSucceeded = false; - if (m_tSubTaskStats.CanCurrentItemSilentResume()) + if (m_spSubTaskStats->CanCurrentItemSilentResume()) { bool bContinue = true; TFileInfoPtr spDstFileInfo(std::make_shared()); @@ -739,7 +508,7 @@ throw TCoreException(eErr_InternalProblem, L"File position to move to is placed after the end of file", LOCATION); // adjust the stats for the difference between what was already processed and what will now be considered processed - m_tSubTaskStats.AdjustProcessedSize(ullProcessedSize, fsMoveTo); + m_spSubTaskStats->AdjustProcessedSize(ullProcessedSize, fsMoveTo); // if the destination file already exists - truncate it to the current file position if(!bDstFileFreshlyCreated) @@ -757,7 +526,7 @@ // at this point user already decided that he want to write data into destination file; // so if we're to resume copying after this point, we don't have to ask user for overwriting existing file - m_tSubTaskStats.SetCurrentItemSilentResume(true); + m_spSubTaskStats->SetCurrentItemSilentResume(true); return eResult; } @@ -792,90 +561,14 @@ return false; // buffer did not need adjusting } - TSubTaskBase::ESubOperationResult TSubTaskCopyMove::HandleReadError(const IFeedbackHandlerPtr& spFeedbackHandler, - TOverlappedDataBuffer& rBuffer, - const TSmartPath& pathFile, - bool& bSkip) - { - DWORD dwLastError = rBuffer.GetErrorCode(); - - bSkip = false; - - // log - TString strFormat = _T("Error %errno while requesting read of %count bytes from source file %path (CustomCopyFileFB)"); - strFormat.Replace(_T("%errno"), boost::lexical_cast(dwLastError).c_str()); - strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetRequestedDataSize()).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); - LOG_ERROR(m_spLog) << strFormat.c_str(); - - TFeedbackResult frResult = spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eReadError, dwLastError); - switch(frResult.GetResult()) - { - case EFeedbackResult::eResult_Cancel: - return TSubTaskBase::eSubResult_CancelRequest; - - case EFeedbackResult::eResult_Retry: - return TSubTaskBase::eSubResult_Retry; - - case EFeedbackResult::eResult_Pause: - return TSubTaskBase::eSubResult_PauseRequest; - - case EFeedbackResult::eResult_Skip: - bSkip = true; - return TSubTaskBase::eSubResult_Continue; - - default: - BOOST_ASSERT(FALSE); // unknown result - throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); - } - } - - TSubTaskBase::ESubOperationResult TSubTaskCopyMove::HandleWriteError(const IFeedbackHandlerPtr& spFeedbackHandler, - TOverlappedDataBuffer& rBuffer, - const TSmartPath& pathFile, - bool& bSkip) - { - DWORD dwLastError = rBuffer.GetErrorCode(); - - bSkip = false; - - // log - TString strFormat = _T("Error %errno while trying to write %count bytes to destination file %path (CustomCopyFileFB)"); - strFormat.Replace(_T("%errno"), boost::lexical_cast(rBuffer.GetErrorCode()).c_str()); - strFormat.Replace(_T("%count"), boost::lexical_cast(rBuffer.GetBytesTransferred()).c_str()); - strFormat.Replace(_T("%path"), pathFile.ToString()); - LOG_ERROR(m_spLog) << strFormat.c_str(); - - TFeedbackResult frResult = spFeedbackHandler->FileError(pathFile.ToWString(), TString(), EFileError::eWriteError, dwLastError); - switch (frResult.GetResult()) - { - case EFeedbackResult::eResult_Cancel: - return TSubTaskBase::eSubResult_CancelRequest; - - case EFeedbackResult::eResult_Retry: - return TSubTaskBase::eSubResult_Retry; - - case EFeedbackResult::eResult_Pause: - return TSubTaskBase::eSubResult_PauseRequest; - - case EFeedbackResult::eResult_Skip: - bSkip = true; - return TSubTaskBase::eSubResult_Continue; - - default: - BOOST_ASSERT(FALSE); // unknown result - throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); - } - } - void TSubTaskCopyMove::Store(const ISerializerPtr& spSerializer) const { ISerializerContainerPtr spContainer = spSerializer->GetContainer(_T("subtask_copymove")); InitColumns(spContainer); - ISerializerRowData& rRow = spContainer->GetRow(0, m_tSubTaskStats.WasAdded()); + ISerializerRowData& rRow = spContainer->GetRow(0, m_spSubTaskStats->WasAdded()); - m_tSubTaskStats.Store(rRow); + m_spSubTaskStats->Store(rRow); } void TSubTaskCopyMove::Load(const ISerializerPtr& spSerializer) @@ -886,7 +579,7 @@ ISerializerRowReaderPtr spRowReader = spContainer->GetRowReader(); if(spRowReader->Next()) - m_tSubTaskStats.Load(spRowReader); + m_spSubTaskStats->Load(spRowReader); } void TSubTaskCopyMove::InitColumns(const ISerializerContainerPtr& spContainer) const Index: src/libchcore/TSubTaskCopyMove.h =================================================================== diff -u -N -rb6a48931b8155a01d871d050f52d915abb2df8ca -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision b6a48931b8155a01d871d050f52d915abb2df8ca) +++ src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -25,7 +25,6 @@ #include "libchcore.h" #include "TSubTaskBase.h" -#include "CommonDataTypes.h" #include "TBufferSizes.h" #include "IFilesystemFile.h" #include "../liblogger/TLogger.h" @@ -68,19 +67,12 @@ ESubOperationResult OpenSrcAndDstFilesFB(TFilesystemFileFeedbackWrapper& rSrcFile, TFilesystemFileFeedbackWrapper& rDstFile, CUSTOM_COPY_PARAMS* pData, bool& bSkip); - ESubOperationResult HandleReadError(const IFeedbackHandlerPtr& spFeedbackHandler, TOverlappedDataBuffer& rBuffer, - const TSmartPath& pathFile, bool& bSkip); - ESubOperationResult HandleWriteError(const IFeedbackHandlerPtr& spFeedbackHandler, TOverlappedDataBuffer& rBuffer, - const TSmartPath& pathFile, bool& bSkip); - - void AdjustProcessedSize(file_size_t fsWritten, const TFileInfoPtr& spSrcFileInfo, const IFilesystemFilePtr& spSrcFile); - void AdjustFinalSize(const TFileInfoPtr& spSrcFileInfo, const IFilesystemFilePtr& spSrcFile); void AdjustProcessedSizeForSkip(const TFileInfoPtr& spSrcFileInfo); private: #pragma warning(push) #pragma warning(disable: 4251) - TSubTaskStatsInfo m_tSubTaskStats; + TSubTaskStatsInfoPtr m_spSubTaskStats; logger::TLoggerPtr m_spLog; #pragma warning(pop) }; Index: src/libchcore/TSubTaskStatsInfo.h =================================================================== diff -u -N -r8a2ff3b2b71b45fb525e030167e62f316cb32869 -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/TSubTaskStatsInfo.h (.../TSubTaskStatsInfo.h) (revision 8a2ff3b2b71b45fb525e030167e62f316cb32869) +++ src/libchcore/TSubTaskStatsInfo.h (.../TSubTaskStatsInfo.h) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -184,6 +184,8 @@ friend class TSubTaskProcessingGuard; }; + + using TSubTaskStatsInfoPtr = std::shared_ptr; } #endif Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -N -r685d0da3259dd94327ee8d644a88c155585b8249 -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -511,8 +511,10 @@ - + + + @@ -814,8 +816,10 @@ - + + + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -N -r685d0da3259dd94327ee8d644a88c155585b8249 -r734408890246965d47e6bbf2c2978371269dd1fd --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 685d0da3259dd94327ee8d644a88c155585b8249) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 734408890246965d47e6bbf2c2978371269dd1fd) @@ -90,6 +90,18 @@ {64486dcc-0c2a-468c-9498-c4ff0c2183af} + + {2e089b8f-c085-4bce-a342-64c2611a717a} + + + {301c07b0-8112-4a01-9e47-9b32f6637be8} + + + {cc825cb7-bb97-45eb-86ed-ac031e1ca9e6} + + + {712c0e1a-c6e3-4aab-8322-a9eea42e6d81} + @@ -374,9 +386,6 @@ Source Files\Task Config - - Source Files\Tools\Data Buffer - Source Files\Feedback @@ -461,42 +470,51 @@ Source Files\Tools - - Source Files\Tools\Data Buffer - Source Files\Tools\Data Buffer - - Source Files\Tools\Data Buffer + + Tests + + Source Files\Tools\Data Buffer\ReaderWriter + - Source Files\Tools\Data Buffer + Source Files\Tools\Data Buffer\ReaderWriter - - Source Files\Tools\Data Buffer + + Source Files\Tools\Data Buffer\Buffers + + Source Files\Tools\Data Buffer\Buffers + + + Source Files\Tools\Data Buffer\Buffers + - Source Files\Tools\Data Buffer\Queues + Source Files\Tools\Data Buffer\Queues\Simple - Source Files\Tools\Data Buffer\Queues + Source Files\Tools\Data Buffer\Queues\Simple - - Source Files\Tools\Data Buffer\Queues + + Source Files\Tools\Data Buffer\Queues\Simple - - Source Files\Tools\Data Buffer\Queues - - Source Files\Tools\Data Buffer\Queues + Source Files\Tools\Data Buffer\Queues\Complex - - Tests + + Source Files\Tools\Data Buffer\Queues\Complex - - Source Files\Tools\Data Buffer\Queues + + Source Files\Tools\Data Buffer\ReaderWriter + + Source Files\Tools\Data Buffer\ReaderWriter + + + Source Files\Tools\Data Buffer\ReaderWriter + @@ -775,9 +793,6 @@ Source Files\Task Config - - Source Files\Tools\Data Buffer - Tests @@ -862,36 +877,9 @@ Source Files\Library files - - Source Files\Tools\Data Buffer - Source Files\Tools\Data Buffer - - Source Files\Tools\Data Buffer - - - Source Files\Tools\Data Buffer - - - Source Files\Tools\Data Buffer - - - Source Files\Tools\Data Buffer\Queues - - - Source Files\Tools\Data Buffer\Queues - - - Source Files\Tools\Data Buffer\Queues - - - Source Files\Tools\Data Buffer\Queues - - - Source Files\Tools\Data Buffer\Queues - Tests\DataBuffer @@ -928,5 +916,41 @@ Tests\DataBuffer + + Source Files\Tools\Data Buffer\ReaderWriter + + + Source Files\Tools\Data Buffer\ReaderWriter + + + Source Files\Tools\Data Buffer\Buffers + + + Source Files\Tools\Data Buffer\Buffers + + + Source Files\Tools\Data Buffer\Buffers + + + Source Files\Tools\Data Buffer\Queues\Simple + + + Source Files\Tools\Data Buffer\Queues\Simple + + + Source Files\Tools\Data Buffer\Queues\Complex + + + Source Files\Tools\Data Buffer\Queues\Complex + + + Source Files\Tools\Data Buffer\ReaderWriter + + + Source Files\Tools\Data Buffer\ReaderWriter + + + Source Files\Tools\Data Buffer\ReaderWriter + \ No newline at end of file