Index: src/libchcore/ErrorCodes.h =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/ErrorCodes.h (.../ErrorCodes.h) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/ErrorCodes.h (.../ErrorCodes.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -88,6 +88,7 @@ eErr_CannotRemoveDirectory = 3013, eErr_CannotFastMove = 3014, eErr_CannotGetFreeSpace = 3015, + eErr_CancelIoFailed = 3016, // Task handling errors (4000+) eErr_MissingTaskSerializationPath = 4000, Index: src/libchcore/IFilesystemFile.h =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/IFilesystemFile.h (.../IFilesystemFile.h) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/IFilesystemFile.h (.../IFilesystemFile.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -49,6 +49,8 @@ virtual void WriteFile(TOverlappedDataBuffer& rBuffer) = 0; virtual void FinalizeFile(TOverlappedDataBuffer& rBuffer) = 0; + virtual void CancelIo() = 0; + virtual void Close() = 0; virtual bool IsOpen() const = 0; Index: src/libchcore/TFilesystemFileFeedbackWrapper.cpp =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TFilesystemFileFeedbackWrapper.cpp (.../TFilesystemFileFeedbackWrapper.cpp) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -296,6 +296,25 @@ return TSubTaskBase::eSubResult_Continue; } + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::CancelIo() + { + try + { + m_spFile->CancelIo(); + } + catch(const TFileException& e) + { + const size_t stMaxBuffer = 1024; + wchar_t szData[ stMaxBuffer ]; + e.GetDetailedErrorInfo(szData, stMaxBuffer); + + LOG_ERROR(m_spLog) << L"File error while cancelling io operations. Error message: " << szData; + return TSubTaskBase::eSubResult_Error; + } + + return TSubTaskBase::eSubResult_Continue; + } + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::HandleReadError(TOverlappedDataBuffer& rBuffer) { DWORD dwLastError = rBuffer.GetErrorCode(); @@ -418,7 +437,7 @@ return m_spFile->GetFilePath(); } - TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::GetFileSize(file_size_t& fsSize) const + TSubTaskBase::ESubOperationResult TFilesystemFileFeedbackWrapper::GetFileSize(file_size_t& fsSize, bool bSilent) const { bool bRetry = false; do @@ -442,6 +461,9 @@ strFormat.Replace(_T("%path"), m_spFile->GetFilePath().ToString()); LOG_ERROR(m_spLog) << strFormat.c_str(); + if(bSilent) + return TSubTaskBase::eSubResult_Error; + TFeedbackResult frResult = m_spFeedbackHandler->FileError(m_spFile->GetFilePath().ToWString(), TString(), EFileError::eRetrieveFileInfo, dwLastError); switch(frResult.GetResult()) { Index: src/libchcore/TFilesystemFileFeedbackWrapper.h =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TFilesystemFileFeedbackWrapper.h (.../TFilesystemFileFeedbackWrapper.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -46,13 +46,15 @@ TSubTaskBase::ESubOperationResult FinalizeFileFB(TOverlappedDataBuffer& rBuffer); + TSubTaskBase::ESubOperationResult CancelIo(); + TSubTaskBase::ESubOperationResult HandleReadError(TOverlappedDataBuffer& rBuffer); TSubTaskBase::ESubOperationResult HandleWriteError(TOverlappedDataBuffer& rBuffer); TSubTaskBase::ESubOperationResult IsFreshlyCreated(bool& bIsFreshlyCreated) const; TSmartPath GetFilePath() const; - TSubTaskBase::ESubOperationResult GetFileSize(file_size_t& fsSize) const; + TSubTaskBase::ESubOperationResult GetFileSize(file_size_t& fsSize, bool bSilent = false) const; file_size_t GetSeekPositionForResume(file_size_t fsLastAvailablePosition); bool IsOpen() const { return m_spFile->IsOpen(); } Index: src/libchcore/TLocalFilesystemFile.cpp =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -325,6 +325,19 @@ } } + void TLocalFilesystemFile::CancelIo() + { + if(IsOpen()) + { + if(!::CancelIo(m_hFile)) + { + DWORD dwLastError = GetLastError(); + LOG_ERROR(m_spLog) << L"CancelIo request failed with error " << dwLastError << GetFileInfoForLog(m_bNoBuffering); + throw TFileException(eErr_CancelIoFailed, dwLastError, m_pathFile, L"Error while writing to file", LOCATION); + } + } + } + bool TLocalFilesystemFile::IsOpen() const { return m_hFile != INVALID_HANDLE_VALUE; Index: src/libchcore/TLocalFilesystemFile.h =================================================================== diff -u -ra4635addad389b9e117679437a3e1b64a739ea96 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TLocalFilesystemFile.h (.../TLocalFilesystemFile.h) (revision a4635addad389b9e117679437a3e1b64a739ea96) +++ src/libchcore/TLocalFilesystemFile.h (.../TLocalFilesystemFile.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -34,23 +34,25 @@ public: virtual ~TLocalFilesystemFile(); - virtual void Truncate(file_size_t fsNewSize) override; + void Truncate(file_size_t fsNewSize) override; - virtual void ReadFile(TOverlappedDataBuffer& rBuffer) override; - virtual void WriteFile(TOverlappedDataBuffer& rBuffer) override; - virtual void FinalizeFile(TOverlappedDataBuffer& rBuffer) override; + void ReadFile(TOverlappedDataBuffer& rBuffer) override; + void WriteFile(TOverlappedDataBuffer& rBuffer) override; + void FinalizeFile(TOverlappedDataBuffer& rBuffer) override; - virtual bool IsOpen() const override; - virtual bool IsFreshlyCreated() override; + void CancelIo() override; - virtual file_size_t GetFileSize() override; - virtual void GetFileInfo(TFileInfo& tFileInfo) override; + bool IsOpen() const override; + bool IsFreshlyCreated() override; - virtual TSmartPath GetFilePath() const override; + file_size_t GetFileSize() override; + void GetFileInfo(TFileInfo& tFileInfo) override; - virtual void Close() override; - virtual file_size_t GetSeekPositionForResume(file_size_t fsLastAvailablePosition) override; + TSmartPath GetFilePath() const override; + void Close() override; + file_size_t GetSeekPositionForResume(file_size_t fsLastAvailablePosition) override; + private: TLocalFilesystemFile(EOpenMode eMode, const TSmartPath& pathFile, bool bNoBuffering, bool bProtectReadOnlyFiles, const logger::TLogFileDataPtr& spLogFileData); Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -39,6 +39,7 @@ m_spReader(std::make_shared(spLogFileData, spEmptyBuffers, spDataRange, dwChunkSize)), m_eventReadingFinished(true, false), m_eventProcessingFinished(true, false), + m_eventLocalKill(true, false), m_counterOnTheFly(), m_spFilesystem(spFilesystem), m_spSrcFileInfo(spSrcFileInfo), @@ -70,14 +71,6 @@ return eResult; } - TSubTaskBase::ESubOperationResult TOverlappedReaderFB::StopThreaded() - { - DWORD dwResult = WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); - _ASSERTE(dwResult == WAIT_OBJECT_0); dwResult; - - return m_eThreadResult; - } - TOrderedBufferQueuePtr TOverlappedReaderFB::GetFinishedQueue() const { return m_spReader->GetFinishedQueue(); @@ -98,10 +91,11 @@ // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data - enum { eKillThread, eReadFailed, eReadPossible, eDataSourceFinished }; + enum { eKillThread, eLocalKill, eReadFailed, eReadPossible, eDataSourceFinished }; std::vector vHandles = { m_rThreadController.GetKillThreadHandle(), + m_eventLocalKill.Handle(), m_spReader->GetEventReadFailedHandle(), m_spReader->GetEventReadPossibleHandle(), m_spReader->GetEventDataSourceFinishedHandle() @@ -120,6 +114,7 @@ break; case WAIT_OBJECT_0 + eKillThread: + case WAIT_OBJECT_0 + eLocalKill: m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; break; @@ -146,6 +141,8 @@ m_eThreadResult = TSubTaskBase::eSubResult_Error; } + m_spSrcFile->CancelIo(); + WaitForOnTheFlyBuffers(); ClearQueues(); @@ -155,6 +152,17 @@ m_eventReadingFinished.SetEvent(); } + TSubTaskBase::ESubOperationResult TOverlappedReaderFB::StopThreaded() + { + m_eventLocalKill.SetEvent(); + + DWORD dwResult = WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); + if(dwResult != WAIT_OBJECT_0) + throw TCoreException(eErr_InternalProblem, L"Failed to wait writer processing to finish", LOCATION); + + return m_eThreadResult; + } + void TOverlappedReaderFB::WaitForOnTheFlyBuffers() { bool bStop = false; Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -rc175b6c9d5be6ee6ee0eae90abb167b9eaf2abc3 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision c175b6c9d5be6ee6ee0eae90abb167b9eaf2abc3) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -73,6 +73,7 @@ TOverlappedReaderPtr m_spReader; TEvent m_eventReadingFinished; TEvent m_eventProcessingFinished; + TEvent m_eventLocalKill; TEventCounter m_counterOnTheFly; Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -47,6 +47,7 @@ m_bOnlyCreate(bOnlyCreate), m_eventProcessingFinished(true, false), m_eventWritingFinished(true, false), + m_eventLocalKill(true, false), m_counterOnTheFly(), m_rThreadController(rThreadController), m_spLog(logger::MakeLogger(spLogFileData, L"File-Writer")) @@ -142,8 +143,6 @@ m_spWriter->AddEmptyBuffer(pBuffer); return eResult; } - - m_spStats->ResetCurrentItemProcessedSize(); } m_spWriter->AddEmptyBuffer(pBuffer); @@ -295,18 +294,18 @@ m_eThreadResult = TSubTaskBase::eSubResult_Continue; - enum { eKillThread, eWriteFinished, eWriteFailed, eWritePossible, eNoBuffersOnTheFly }; + enum { eKillThread, eLocalKill, eWriteFinished, eWriteFailed, eWritePossible, eNoBuffersOnTheFly }; std::vector vHandles = { m_rThreadController.GetKillThreadHandle(), + m_eventLocalKill.Handle(), m_spWriter->GetEventWriteFinishedHandle(), m_spWriter->GetEventWriteFailedHandle(), m_spWriter->GetEventWritePossibleHandle(), eventNonSignaled.Handle() }; bool bWrittenLastBuffer = false; - try { while(!bWrittenLastBuffer && m_eThreadResult == TSubTaskBase::eSubResult_Continue) @@ -318,6 +317,7 @@ break; case WAIT_OBJECT_0 + eKillThread: + case WAIT_OBJECT_0 + eLocalKill: m_eThreadResult = TSubTaskBase::eSubResult_KillRequest; break; @@ -344,15 +344,46 @@ m_eThreadResult = TSubTaskBase::eSubResult_Error; } + m_spDstFile->CancelIo(); + WaitForOnTheFlyBuffers(); ClearBuffers(); + // update stats with the written bytes info + UpdateCurrentItemStatsFromFileSize(bWrittenLastBuffer); + LOG_DEBUG(m_spLog) << L"Writer stopping processing. Max on-the-fly requests: " << m_counterOnTheFly.GetMaxUsed(); if(m_eThreadResult == TSubTaskBase::eSubResult_Continue && bWrittenLastBuffer) m_eventWritingFinished.SetEvent(); } + void TOverlappedWriterFB::UpdateCurrentItemStatsFromFileSize(bool bFileWritingFinished) + { + if(bFileWritingFinished) + { + m_spStats->ResetCurrentItemProcessedSize(); + } + else + { + file_size_t fsDstFileSize = 0; + TSubTaskBase::ESubOperationResult eResult = m_spDstFile->GetFileSize(fsDstFileSize, true); + if(eResult == TSubTaskBase::eSubResult_Continue) + m_spStats->AdjustProcessedSize(m_spStats->GetCurrentItemProcessedSize(), fsDstFileSize); + } + } + + TSubTaskBase::ESubOperationResult TOverlappedWriterFB::StopThreaded() + { + m_eventLocalKill.SetEvent(); + + DWORD dwResult = WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); + if(dwResult != WAIT_OBJECT_0) + throw TCoreException(eErr_InternalProblem, L"Failed to wait writer processing to finish", LOCATION); + + return m_eThreadResult; + } + void TOverlappedWriterFB::WaitForOnTheFlyBuffers() { bool bStop = false; @@ -374,12 +405,4 @@ } while(!bStop); } - - TSubTaskBase::ESubOperationResult TOverlappedWriterFB::StopThreaded() - { - DWORD dwResult = WaitForSingleObjectEx(m_eventProcessingFinished.Handle(), INFINITE, FALSE); - _ASSERTE(dwResult == WAIT_OBJECT_0); dwResult; - - return m_eThreadResult; - } } Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -rc175b6c9d5be6ee6ee0eae90abb167b9eaf2abc3 -r38031c379667959206abe69507d8a4e3b040dba6 --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision c175b6c9d5be6ee6ee0eae90abb167b9eaf2abc3) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision 38031c379667959206abe69507d8a4e3b040dba6) @@ -52,6 +52,7 @@ TSubTaskBase::ESubOperationResult Start(); void StartThreaded(); + TSubTaskBase::ESubOperationResult StopThreaded(); HANDLE GetEventWritingFinishedHandle() const; @@ -69,6 +70,8 @@ TSubTaskBase::ESubOperationResult OnWriteFailed(); TSubTaskBase::ESubOperationResult OnWriteFinished(bool& bStopProcessing); + void UpdateCurrentItemStatsFromFileSize(bool bFileWritingFinished); + private: TOverlappedWriterPtr m_spWriter; TFilesystemFileFeedbackWrapperPtr m_spDstFile; @@ -79,6 +82,7 @@ TEvent m_eventProcessingFinished; TEvent m_eventWritingFinished; + TEvent m_eventLocalKill; TEventCounter m_counterOnTheFly;