/*************************************************************************** * Copyright (C) 2001-2014 by Jozef Starosczyk * * ixen@copyhandler.com * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License * * (version 2) as published by the Free Software Foundation; * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #include "Stdafx.h" #include "TTask.h" #include #include "TSubTaskScanDirectory.h" #include "TSubTaskCopyMove.h" #include "TSubTaskDelete.h" #include #include #include "TFileInfo.h" #include "TSubTaskArray.h" #include "TTaskStatsSnapshot.h" #include "TCoreException.h" #include "ErrorCodes.h" #include "ISerializerRowData.h" #include "TStringSet.h" #include "SerializerTrace.h" #include "TScopedRunningTimeTracker.h" #include "TScopedRunningTimeTrackerPause.h" #include "TFeedbackHandlerWrapper.h" #include #include "TTaskConfigBufferSizes.h" #include "log.h" #include #include "TLocalFilesystem.h" namespace chcore { //////////////////////////////////////////////////////////////////////////// // TTask members TTask::TTask(const ISerializerPtr& spSerializer, const IFeedbackHandlerPtr& spFeedbackHandler) : m_log(), m_spInternalFeedbackHandler(spFeedbackHandler), m_spSrcPaths(new TBasePathDataContainer), m_bForce(false), m_bContinue(false), m_tSubTaskContext(m_tConfiguration, m_spSrcPaths, m_afFilters, m_cfgTracker, m_log, m_workerThread, std::make_shared()), m_tSubTasksArray(m_tSubTaskContext), m_spSerializer(spSerializer) { if(!spFeedbackHandler) throw TCoreException(eErr_InvalidArgument, L"spFeedbackHandler", LOCATION); if(!spSerializer) throw TCoreException(eErr_InvalidArgument, L"spSerializer", LOCATION); } TTask::~TTask() { KillThread(); } void TTask::SetTaskDefinition(const TTaskDefinition& rTaskDefinition) { m_tBaseData.SetDestinationPath(rTaskDefinition.GetDestinationPath()); m_tConfiguration = rTaskDefinition.GetConfiguration(); *m_spSrcPaths = rTaskDefinition.GetSourcePaths(); m_afFilters = rTaskDefinition.GetFilters(); m_tBaseData.SetTaskName(rTaskDefinition.GetTaskName()); m_tSubTasksArray.Init(rTaskDefinition.GetOperationPlan()); m_tSubTaskContext.SetOperationType(m_tSubTasksArray.GetOperationType()); m_tSubTaskContext.SetDestinationPath(m_tBaseData.GetDestinationPath()); } void TTask::OnRegisterTask() { } void TTask::OnUnregisterTask() { } void TTask::SetTaskState(ETaskCurrentState eTaskState) { // NOTE: we could check some transition rules here boost::unique_lock lock(m_lock); m_tBaseData.SetCurrentState(eTaskState); } ETaskCurrentState TTask::GetTaskState() const { boost::shared_lock lock(m_lock); return m_tBaseData.GetCurrentState(); } void TTask::SetBufferSizes(const TBufferSizes& bsSizes) { m_tConfiguration.DelayNotifications(); SetTaskPropBufferSizes(m_tConfiguration, bsSizes); m_tConfiguration.ResumeNotifications(); } void TTask::GetBufferSizes(TBufferSizes& bsSizes) { bsSizes = GetTaskPropBufferSizes(m_tConfiguration); } // thread void TTask::SetPriority(int nPriority) { SetTaskPropValue(m_tConfiguration, nPriority); } void TTask::Load() { using namespace chcore; bool bLogPathLoaded = false; bool bLoadFailed = false; const size_t stMaxSize = 1024; wchar_t szErr[stMaxSize]; try { boost::unique_lock lock(m_lock); ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("task")); m_tBaseData.Load(spContainer); bLogPathLoaded = true; spContainer = m_spSerializer->GetContainer(_T("base_paths")); m_spSrcPaths->Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("scanned_files")); m_tSubTaskContext.GetFilesCache().Load(spContainer, m_spSrcPaths); spContainer = m_spSerializer->GetContainer(_T("task_config")); m_tConfiguration.Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("filters")); m_afFilters.Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("local_stats")); m_tLocalStats.Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("feedback")); m_spInternalFeedbackHandler->Load(spContainer); m_tSubTasksArray.Load(m_spSerializer); // ensure copy-based context entries are properly updated after loading m_tSubTaskContext.SetDestinationPath(m_tBaseData.GetDestinationPath()); m_tSubTaskContext.SetOperationType(m_tSubTasksArray.GetOperationType()); } catch (const TBaseException& e) { SetTaskState(eTaskState_LoadError); bLoadFailed = true; _tcscpy_s(szErr, stMaxSize, _T("Task load error: ")); size_t stLen = _tcslen(szErr); e.GetDetailedErrorInfo(szErr + stLen, stMaxSize - stLen); } catch (const std::exception& e) { SetTaskState(eTaskState_LoadError); bLoadFailed = true; _snwprintf_s(szErr, stMaxSize, _TRUNCATE, _T("Task load error. %hs"), e.what()); } if (bLoadFailed) { try { if (bLogPathLoaded) GetLog().loge(szErr); } catch (const std::exception&) { } } } void TTask::Store(bool bForce) { if (GetTaskState() == eTaskState_LoadError) { DBTRACE0(_T("Task::Store() - not storing task as it was not loaded correctly\n")); return; } DBTRACE0(_T("###### Task::Store() - starting\n")); // ensure we're only running one serialization of this task at a time; // (this is usually called from the gui thread (on timer) and at specific points // of task processing; there were times where if(!bForce) { if (!m_mutexSerializer.try_lock()) { DBTRACE0(_T("###### Task::Store() - serialization already running. Skipping.\n")); return; } } else { if (!m_mutexSerializer.try_lock()) { DBTRACE0(_T("###### Task::Store() - waiting for serialization mutex...\n")); m_mutexSerializer.lock(); } } std::unique_lock locke(m_mutexSerializer, std::adopt_lock); TSimpleTimer timer(true); using namespace chcore; { boost::shared_lock lock(m_lock); ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("task")); m_tBaseData.Store(spContainer); // base paths spContainer = m_spSerializer->GetContainer(_T("base_paths")); m_spSrcPaths->Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("scanned_files")); m_tSubTaskContext.GetFilesCache().Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("task_config")); m_tConfiguration.Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("filters")); m_afFilters.Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("local_stats")); m_tLocalStats.Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("feedback")); m_spInternalFeedbackHandler->Store(spContainer); m_tSubTasksArray.Store(m_spSerializer); } unsigned long long ullGatherTime = timer.Checkpoint(); ullGatherTime; m_spSerializer->Flush(); unsigned long long ullFlushTime = timer.Stop(); ullFlushTime; DBTRACE2(_T("###### Task::Store() - finished - gather: %I64u ms, flush: %I64u ms\n"), ullGatherTime, ullFlushTime); } void TTask::KillThread() { m_workerThread.StopThread(); } void TTask::BeginProcessing() { GetLog().logi(_T("Requested task to begin processing")); boost::unique_lock lock(m_lock); if (m_tBaseData.GetCurrentState() != eTaskState_LoadError) m_workerThread.StartThread(DelegateThreadProc, this, GetTaskPropValue(m_tConfiguration)); } void TTask::ResumeProcessing() { // the same as retry but less demanding if (GetTaskState() == eTaskState_Paused) { GetLog().logi(_T("Requested task resume")); SetTaskState(eTaskState_Processing); BeginProcessing(); } } bool TTask::RetryProcessing() { // retry used to auto-resume, after loading ETaskCurrentState eState = GetTaskState(); switch (eState) { case eTaskState_Paused: case eTaskState_Finished: case eTaskState_Cancelled: case eTaskState_LoadError: return false; case eTaskState_Processing: case eTaskState_Waiting: { if(IsRunning()) return false; //else go to default clause } default: BeginProcessing(); return true; } } void TTask::RestartProcessing() { GetLog().logi(_T("Requested task restart")); KillThread(); SetTaskState(eTaskState_None); m_spInternalFeedbackHandler->RestoreDefaults(); m_tSubTasksArray.ResetProgressAndStats(); m_tLocalStats.Clear(); m_spSrcPaths->ResetProcessingFlags(); m_tSubTaskContext.GetFilesCache().Clear(); Store(true); BeginProcessing(); } void TTask::RestoreFeedbackDefaults() { m_spInternalFeedbackHandler->RestoreDefaults(); } void TTask::PauseProcessing() { if (GetTaskState() != eTaskState_Finished && GetTaskState() != eTaskState_Cancelled) { GetLog().logi(_T("Requested task pause")); KillThread(); SetTaskState(eTaskState_Paused); } } void TTask::CancelProcessing() { // change to ST_CANCELLED if (GetTaskState() != eTaskState_Finished) { GetLog().logi(_T("Requested task cancel")); KillThread(); SetTaskState(eTaskState_Cancelled); } } void TTask::GetStatsSnapshot(TTaskStatsSnapshotPtr& spSnapshot) { if (!spSnapshot) throw TCoreException(eErr_InvalidArgument, L"spSnapshot", LOCATION); spSnapshot->Clear(); boost::shared_lock lock(m_lock); m_tSubTasksArray.GetStatsSnapshot(spSnapshot->GetSubTasksStats()); m_tLocalStats.GetSnapshot(spSnapshot); spSnapshot->SetTaskName(m_tBaseData.GetTaskName()); spSnapshot->SetThreadPriority(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetDestinationPath(m_tBaseData.GetDestinationPath().ToString()); spSnapshot->SetFilters(m_afFilters); spSnapshot->SetTaskState(m_tBaseData.GetCurrentState()); spSnapshot->SetOperationType(m_tSubTasksArray.GetOperationType()); spSnapshot->SetIgnoreDirectories(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetCreateEmptyFiles(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetBufferCount(GetTaskPropValue(m_tConfiguration)); TSubTaskStatsSnapshotPtr spCurrentSubTask = spSnapshot->GetSubTasksStats().GetCurrentSubTaskSnapshot(); if(spCurrentSubTask) spSnapshot->SetSourcePath(spCurrentSubTask->GetCurrentPath()); else if(m_spSrcPaths->GetCount() > 0) { TBasePathDataPtr spBasePath = m_spSrcPaths->GetAt(0); if(spBasePath) spSnapshot->SetSourcePath(spBasePath->GetSrcPath().ToString()); } int iCurrentBufferIndex = spCurrentSubTask ? spCurrentSubTask->GetCurrentBufferIndex() : TBufferSizes::eBuffer_Default; switch (iCurrentBufferIndex) { case TBufferSizes::eBuffer_Default: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_OneDisk: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_TwoDisks: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_CD: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_LAN: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; default: throw TCoreException(eErr_UnhandledCase, L"Unknown buffer index", LOCATION); //BOOST_ASSERT(false); // assertions are dangerous here, because we're inside critical section // (and there could be conflict with Get(Mini)Snapshot called OnTimer in several places. } } bool TTask::CanBegin() { bool bRet = true; boost::unique_lock lock(m_lock); if (GetContinueFlagNL() || GetForceFlagNL()) { SetForceFlagNL(false); SetContinueFlagNL(false); } else bRet = false; return bRet; } void TTask::SetForceFlag(bool bFlag) { boost::unique_lock lock(m_lock); m_bForce = bFlag; } bool TTask::GetForceFlag() { boost::shared_lock lock(m_lock); return m_bForce; } void TTask::SetContinueFlag(bool bFlag) { boost::unique_lock lock(m_lock); m_bContinue = bFlag; } bool TTask::GetContinueFlag() { boost::shared_lock lock(m_lock); return m_bContinue; } ///////////////////////////////////////////////////////////////////////////////////////////////////////////// void TTask::SetForceFlagNL(bool bFlag) { m_bForce = bFlag; } bool TTask::GetForceFlagNL() { return m_bForce; } void TTask::SetContinueFlagNL(bool bFlag) { m_bContinue = bFlag; } bool TTask::GetContinueFlagNL() { return m_bContinue; } TSubTaskBase::ESubOperationResult TTask::CheckForWaitState() { // limiting operation count SetTaskState(eTaskState_Waiting); bool bContinue = false; while (!bContinue) { if (CanBegin()) { SetTaskState(eTaskState_Processing); bContinue = true; m_log.logi(_T("Finished waiting for begin permission")); // return; // skips sleep and kill flag checking } else Sleep(50); // not to make it too hard for processor if (m_workerThread.KillRequested()) { // log m_log.logi(_T("Kill request while waiting for begin permission (wait state)")); return TSubTaskBase::eSubResult_KillRequest; } } return TSubTaskBase::eSubResult_Continue; } DWORD WINAPI TTask::DelegateThreadProc(LPVOID pParam) { BOOST_ASSERT(pParam); if (!pParam) return 1; TTask* pTask = (TTask*)pParam; return pTask->ThrdProc(); } DWORD TTask::ThrdProc() { // start tracking time for this thread TScopedRunningTimeTracker tProcessingGuard(m_tLocalStats); TFeedbackHandlerWrapperPtr spFeedbackHandler(std::make_shared(m_spInternalFeedbackHandler, tProcessingGuard)); const size_t ExceptionBufferSize = 2048; std::unique_ptr upExceptionInfoBuffer(new wchar_t[ExceptionBufferSize]); try { TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue; // initialize log file m_log.init(m_tBaseData.GetLogPath().ToString(), 262144, chcore::log_file::level_debug, false, false); // start operation OnBeginOperation(); // enable configuration changes tracking m_tConfiguration.ConnectToNotifier(TTaskConfigTracker::NotificationProc, &m_cfgTracker); m_tConfiguration.ConnectToNotifier(TTask::OnCfgOptionChanged, this); // set thread options HANDLE hThread = GetCurrentThread(); ::SetThreadPriorityBoost(hThread, GetTaskPropValue(m_tConfiguration)); // initialize subtask array m_tSubTasksArray.InitBeforeExec(); // exec the estimation subtasks eResult = m_tSubTasksArray.Execute(spFeedbackHandler, true); // go into wait state only in case the preprocessing did not finish the operation already // (only fast move can do that right now) if (eResult == TSubTaskBase::eSubResult_Continue && !m_tSubTasksArray.AreAllBasePathsProcessed()) { TScopedRunningTimeTrackerPause scopedPause(tProcessingGuard); eResult = CheckForWaitState(); // operation limiting } if (eResult == TSubTaskBase::eSubResult_Continue) eResult = m_tSubTasksArray.Execute(spFeedbackHandler, false); // change status to finished if (eResult == TSubTaskBase::eSubResult_Continue) SetTaskState(eTaskState_Finished); // finishing processing // change task status switch (eResult) { case TSubTaskBase::eSubResult_Error: spFeedbackHandler->OperationError(); SetTaskState(eTaskState_Error); break; case TSubTaskBase::eSubResult_CancelRequest: SetTaskState(eTaskState_Cancelled); break; case TSubTaskBase::eSubResult_PauseRequest: SetTaskState(eTaskState_Paused); break; case TSubTaskBase::eSubResult_KillRequest: // the only operation if (GetTaskState() == eTaskState_Waiting) SetTaskState(eTaskState_Processing); break; case TSubTaskBase::eSubResult_Continue: spFeedbackHandler->OperationFinished(); SetTaskState(eTaskState_Finished); break; default: BOOST_ASSERT(false); throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); } // if the files cache is not completely read - clean it up if (!m_tSubTaskContext.GetFilesCache().IsComplete()) m_tSubTaskContext.GetFilesCache().Clear(); // scanning for files did not finish processing, so the content of the files cache are useless // save progress before killed Store(true); // reset flags SetContinueFlag(false); SetForceFlag(false); m_tConfiguration.DisconnectFromNotifier(TTaskConfigTracker::NotificationProc); m_tConfiguration.DisconnectFromNotifier(TTask::OnCfgOptionChanged); // and the real end OnEndOperation(); return 0; } catch(const TBaseException& e) { e.GetDetailedErrorInfo(upExceptionInfoBuffer.get(), ExceptionBufferSize); } catch(const std::exception& e) { swprintf_s(upExceptionInfoBuffer.get(), ExceptionBufferSize, L"%S", e.what()); } catch (...) { swprintf_s(upExceptionInfoBuffer.get(), ExceptionBufferSize, L"Unspecified error occurred"); } m_tConfiguration.DisconnectFromNotifier(TTaskConfigTracker::NotificationProc); m_tConfiguration.DisconnectFromNotifier(TTask::OnCfgOptionChanged); // log TString strMsg = TString(L"Caught exception in ThrdProc: ") + upExceptionInfoBuffer.get(); m_log.loge(strMsg.c_str()); // let others know some error happened spFeedbackHandler->OperationError(); SetTaskState(eTaskState_Error); SetContinueFlag(false); SetForceFlag(false); OnEndOperation(); return 1; } void TTask::OnBeginOperation() { CTime tm = CTime::GetCurrentTime(); TString strFormat = _T("\r\n# COPYING THREAD STARTED #\r\nBegan processing data (dd:mm:yyyy) %day.%month.%year at %hour:%minute.%second"); strFormat.Replace(_T("%year"), boost::lexical_cast(tm.GetYear()).c_str()); strFormat.Replace(_T("%month"), boost::lexical_cast(tm.GetMonth()).c_str()); strFormat.Replace(_T("%day"), boost::lexical_cast(tm.GetDay()).c_str()); strFormat.Replace(_T("%hour"), boost::lexical_cast(tm.GetHour()).c_str()); strFormat.Replace(_T("%minute"), boost::lexical_cast(tm.GetMinute()).c_str()); strFormat.Replace(_T("%second"), boost::lexical_cast(tm.GetSecond()).c_str()); m_log.logi(strFormat.c_str()); } void TTask::OnEndOperation() { CTime tm = CTime::GetCurrentTime(); TString strFormat = _T("Finished processing data (dd:mm:yyyy) %day.%month.%year at %hour:%minute.%second"); strFormat.Replace(_T("%year"), boost::lexical_cast(tm.GetYear()).c_str()); strFormat.Replace(_T("%month"), boost::lexical_cast(tm.GetMonth()).c_str()); strFormat.Replace(_T("%day"), boost::lexical_cast(tm.GetDay()).c_str()); strFormat.Replace(_T("%hour"), boost::lexical_cast(tm.GetHour()).c_str()); strFormat.Replace(_T("%minute"), boost::lexical_cast(tm.GetMinute()).c_str()); strFormat.Replace(_T("%second"), boost::lexical_cast(tm.GetSecond()).c_str()); m_log.logi(strFormat.c_str()); } void TTask::RequestStopThread() { m_workerThread.SignalThreadToStop(); } void TTask::OnCfgOptionChanged(const TStringSet& rsetChanges, void* pParam) { TTask* pTask = (TTask*)pParam; if (!pTask) throw TCoreException(eErr_InvalidArgument, L"pParam is null, task pointer not provided", LOCATION); if (rsetChanges.HasValue(TaskPropData::GetPropertyName())) { pTask->m_workerThread.ChangePriority(GetTaskPropValue(pTask->m_tConfiguration)); } } bool TTask::IsRunning() const { return m_tLocalStats.IsRunning(); } TSmartPath TTask::GetLogPath() const { return m_tBaseData.GetLogPath(); } TString TTask::GetTaskName() const { return m_tBaseData.GetTaskName(); } void TTask::SetLogPath(const TSmartPath& pathLog) { m_tBaseData.SetLogPath(pathLog); } ISerializerPtr TTask::GetSerializer() const { return m_spSerializer; } chcore::log_file& TTask::GetLog() { if (!m_log.is_initialized()) m_log.init(m_tBaseData.GetLogPath().ToString(), 262144, chcore::log_file::level_debug, false, false); return m_log; } }