/*************************************************************************** * Copyright (C) 2001-2014 by Jozef Starosczyk * * ixen@copyhandler.com * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License * * (version 2) as published by the Free Software Foundation; * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #include "Stdafx.h" #include "TTask.h" #include "TSubTaskScanDirectory.h" #include "TSubTaskCopyMove.h" #include #include "TSubTaskArray.h" #include "TTaskStatsSnapshot.h" #include "TScopedRunningTimeTracker.h" #include "TScopedRunningTimeTrackerPause.h" #include "TTaskConfigBufferSizes.h" #include #include "TLocalFilesystem.h" #include "TTaskConfigVerifier.h" #include "../liblogger/TAsyncMultiLogger.h" #include using namespace chcore; using namespace string; using namespace serializer; namespace chengine { //////////////////////////////////////////////////////////////////////////// // TTask members TTask::TTask(const ISerializerPtr& spSerializer, const IFeedbackHandlerPtr& spFeedbackHandler, const TTaskDefinition& rTaskDefinition, const logger::TLogFileDataPtr& spLogFileData) : m_spSerializer(spSerializer), m_spFeedbackManager(std::make_shared(spFeedbackHandler)), m_spLog(logger::MakeLogger(spLogFileData, L"Task")), m_spSrcPaths(new TBasePathDataContainer), m_tSubTaskContext(m_tConfiguration, m_spSrcPaths, m_afFilters, m_cfgTracker, spLogFileData, m_workerThread, std::make_shared(spLogFileData), m_spFeedbackManager), m_tSubTasksArray(m_tSubTaskContext), m_bForce(false), m_bContinue(false) { if(!spFeedbackHandler) throw TCoreException(eErr_InvalidArgument, L"spFeedbackHandler", LOCATION); if(!spSerializer) throw TCoreException(eErr_InvalidArgument, L"spSerializer", LOCATION); SetTaskDefinition(rTaskDefinition); } TTask::TTask(const ISerializerPtr& spSerializer, const IFeedbackHandlerPtr& spFeedbackHandler, const logger::TLogFileDataPtr& spLogFileData) : m_spSerializer(spSerializer), m_spFeedbackManager(std::make_shared(spFeedbackHandler)), m_spLog(logger::MakeLogger(spLogFileData, L"Task")), m_spSrcPaths(new TBasePathDataContainer), m_tSubTaskContext(m_tConfiguration, m_spSrcPaths, m_afFilters, m_cfgTracker, m_spLog->GetLogFileData(), m_workerThread, std::make_shared(m_spLog->GetLogFileData()), m_spFeedbackManager), m_tSubTasksArray(m_tSubTaskContext), m_bForce(false), m_bContinue(false) { if(!spSerializer) throw TCoreException(eErr_InvalidArgument, L"spSerializer", LOCATION); } TTask::~TTask() { try { KillThread(); } catch (const std::exception&) { } } void TTask::SetTaskDefinition(const TTaskDefinition& rTaskDefinition) { m_tBaseData.SetDestinationPath(rTaskDefinition.GetDestinationPath()); m_tConfiguration = rTaskDefinition.GetConfiguration(); *m_spSrcPaths = rTaskDefinition.GetSourcePaths(); m_afFilters = rTaskDefinition.GetFilters(); m_spFeedbackManager->SetRules(rTaskDefinition.GetFeedbackRules()); m_tBaseData.SetTaskName(rTaskDefinition.GetTaskName()); m_tSubTasksArray.Init(rTaskDefinition.GetOperationPlan()); m_tSubTaskContext.SetOperationType(m_tSubTasksArray.GetOperationType()); m_tSubTaskContext.SetDestinationPath(m_tBaseData.GetDestinationPath()); } void TTask::OnRegisterTask() { } void TTask::OnUnregisterTask() { } void TTask::SetTaskState(ETaskCurrentState eTaskState) { // NOTE: we could check some transition rules here boost::unique_lock lock(m_lock); m_tBaseData.SetCurrentState(eTaskState); } ETaskCurrentState TTask::GetTaskState() const { boost::shared_lock lock(m_lock); return m_tBaseData.GetCurrentState(); } void TTask::SetBufferSizes(const TBufferSizes& bsSizes) { m_tConfiguration.DelayNotifications(); SetTaskPropBufferSizes(m_tConfiguration, bsSizes); m_tConfiguration.ResumeNotifications(); } void TTask::GetBufferSizes(TBufferSizes& bsSizes) { bsSizes = GetTaskPropBufferSizes(m_tConfiguration); } // thread void TTask::SetPriority(int nPriority) { SetTaskPropValue(m_tConfiguration, nPriority); } void TTask::Load() { bool bLoadFailed = false; const size_t stMaxSize = 1024; wchar_t szErr[stMaxSize]; try { boost::unique_lock lock(m_lock); ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("task")); m_tBaseData.Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("base_paths")); m_spSrcPaths->Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("scanned_files")); m_tSubTaskContext.GetFilesCache().Load(spContainer, m_spSrcPaths); spContainer = m_spSerializer->GetContainer(_T("task_config")); m_tConfiguration.Load(spContainer); spContainer = m_spSerializer->GetContainer(_T("filters")); m_afFilters.Load(spContainer); m_spFeedbackManager->Load(m_spSerializer); spContainer = m_spSerializer->GetContainer(_T("local_stats")); m_tLocalStats.Load(spContainer); // ensure copy-based context entries are properly updated after loading m_tSubTaskContext.SetDestinationPath(m_tBaseData.GetDestinationPath()); m_tSubTaskContext.SetOperationType(m_tSubTasksArray.GetOperationType()); m_tSubTasksArray.Load(m_spSerializer); } catch (const TBaseException& e) { SetTaskState(eTaskState_LoadError); bLoadFailed = true; _tcscpy_s(szErr, stMaxSize, _T("Task load error: ")); size_t stLen = _tcslen(szErr); e.GetDetailedErrorInfo(szErr + stLen, stMaxSize - stLen); } catch (const std::exception& e) { SetTaskState(eTaskState_LoadError); bLoadFailed = true; _snwprintf_s(szErr, stMaxSize, _TRUNCATE, _T("Task load error. %hs"), e.what()); } if (bLoadFailed) { try { LOG_ERROR(m_spLog) << szErr; } catch (const std::exception&) { } } } TTaskPtr TTask::Load(const ISerializerPtr& spSerializer, const IFeedbackHandlerPtr& spFeedbackHandler, const logger::TLogFileDataPtr& spLogFileData) { TTaskPtr spTask = std::shared_ptr(new TTask(spSerializer, spFeedbackHandler, spLogFileData)); spTask->Load(); return spTask; } void TTask::Store(bool bForce) { if (GetTaskState() == eTaskState_LoadError) { LOG_DEBUG(m_spLog) << L"Task::Store() - not storing task as it was not loaded correctly"; return; } LOG_DEBUG(m_spLog) << L"Task::Store() - starting"; // ensure we're only running one serialization of this task at a time; // (this is usually called from the gui thread (on timer) and at specific points // of task processing; there were times where if(!bForce) { if (!m_mutexSerializer.try_lock()) { LOG_DEBUG(m_spLog) << L"Task::Store() - serialization already running. Skipping."; return; } } else { if (!m_mutexSerializer.try_lock()) { LOG_DEBUG(m_spLog) << L"Task::Store() - waiting for serialization mutex..."; m_mutexSerializer.lock(); } } std::unique_lock locke(m_mutexSerializer, std::adopt_lock); TSimpleTimer timer(true); using namespace chengine; { boost::shared_lock lock(m_lock); ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("task")); m_tBaseData.Store(spContainer); // base paths spContainer = m_spSerializer->GetContainer(_T("base_paths")); m_spSrcPaths->Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("scanned_files")); m_tSubTaskContext.GetFilesCache().Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("task_config")); m_tConfiguration.Store(spContainer); spContainer = m_spSerializer->GetContainer(_T("filters")); m_afFilters.Store(spContainer); m_spFeedbackManager->Store(m_spSerializer); spContainer = m_spSerializer->GetContainer(_T("local_stats")); m_tLocalStats.Store(spContainer); m_tSubTasksArray.Store(m_spSerializer); } unsigned long long ullGatherTime = timer.Checkpoint(); ullGatherTime; m_spSerializer->Flush(); unsigned long long ullFlushTime = timer.Stop(); ullFlushTime; LOG_DEBUG(m_spLog) << L"Task::Store() - finished - gather: " << ullGatherTime << L" ms, flush: " << ullFlushTime << L" ms"; } void TTask::KillThread() { m_workerThread.StopThread(); } void TTask::BeginProcessing() { LOG_INFO(m_spLog) << _T("Requested task to begin processing"); boost::unique_lock lock(m_lock); if (m_tBaseData.GetCurrentState() != eTaskState_LoadError) m_workerThread.StartThread(DelegateThreadProc, this, GetTaskPropValue(m_tConfiguration)); } void TTask::ResumeProcessing() { // the same as retry but less demanding if (GetTaskState() == eTaskState_Paused) { LOG_INFO(m_spLog) << _T("Requested task resume"); SetTaskState(eTaskState_Processing); BeginProcessing(); } } bool TTask::RetryProcessing() { // retry used to auto-resume, after loading ETaskCurrentState eState = GetTaskState(); switch (eState) { case eTaskState_Paused: case eTaskState_Finished: case eTaskState_Cancelled: case eTaskState_LoadError: return false; case eTaskState_Processing: case eTaskState_Waiting: { if(IsRunning()) return false; //else go to default clause } default: BeginProcessing(); return true; } } void TTask::RestartProcessing() { LOG_INFO(m_spLog) << _T("Requested task restart"); KillThread(); SetTaskState(eTaskState_None); m_spFeedbackManager->RestoreDefaults(); m_tSubTasksArray.ResetProgressAndStats(); m_tLocalStats.Clear(); m_spSrcPaths->ResetProcessingFlags(); m_tSubTaskContext.GetFilesCache().Clear(); Store(true); BeginProcessing(); } void TTask::RestoreFeedbackDefaults() { m_spFeedbackManager->RestoreDefaults(); } void TTask::PauseProcessing() { if (GetTaskState() != eTaskState_Finished && GetTaskState() != eTaskState_Cancelled) { LOG_INFO(m_spLog) << _T("Requested task pause"); KillThread(); SetTaskState(eTaskState_Paused); } } void TTask::CancelProcessing() { // change to ST_CANCELLED if (GetTaskState() != eTaskState_Finished) { LOG_INFO(m_spLog) << _T("Requested task cancel"); KillThread(); SetTaskState(eTaskState_Cancelled); } } void TTask::GetStatsSnapshot(TTaskStatsSnapshotPtr& spSnapshot) { if (!spSnapshot) throw TCoreException(eErr_InvalidArgument, L"spSnapshot", LOCATION); spSnapshot->Clear(); boost::shared_lock lock(m_lock); m_tSubTasksArray.GetStatsSnapshot(spSnapshot->GetSubTasksStats()); m_tLocalStats.GetSnapshot(spSnapshot); spSnapshot->SetTaskName(m_tBaseData.GetTaskName()); spSnapshot->SetThreadPriority(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetDestinationPath(m_tBaseData.GetDestinationPath().ToString()); spSnapshot->SetFilters(m_afFilters); spSnapshot->SetFeedbackRules(m_spFeedbackManager->GetRules()); spSnapshot->SetTaskState(m_tBaseData.GetCurrentState()); spSnapshot->SetOperationType(m_tSubTasksArray.GetOperationType()); spSnapshot->SetIgnoreDirectories(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetCreateEmptyFiles(GetTaskPropValue(m_tConfiguration)); spSnapshot->SetBufferCount(GetTaskPropValue(m_tConfiguration)); TSubTaskStatsSnapshotPtr spCurrentSubTask = spSnapshot->GetSubTasksStats().GetCurrentSubTaskSnapshot(); if(spCurrentSubTask) spSnapshot->SetSourcePath(spCurrentSubTask->GetCurrentPath()); else if(m_spSrcPaths->GetCount() > 0) { TBasePathDataPtr spBasePath = m_spSrcPaths->GetAt(0); if(spBasePath) spSnapshot->SetSourcePath(spBasePath->GetSrcPath().ToString()); } int iCurrentBufferIndex = spCurrentSubTask ? spCurrentSubTask->GetCurrentBufferIndex() : TBufferSizes::eBuffer_Default; switch (iCurrentBufferIndex) { case TBufferSizes::eBuffer_Default: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_OneDisk: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_TwoDisks: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_CD: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; case TBufferSizes::eBuffer_LAN: spSnapshot->SetCurrentBufferSize(GetTaskPropValue(m_tConfiguration)); break; default: throw TCoreException(eErr_UnhandledCase, L"Unknown buffer index", LOCATION); //BOOST_ASSERT(false); // assertions are dangerous here, because we're inside critical section // (and there could be conflict with Get(Mini)Snapshot called OnTimer in several places. } } bool TTask::CanBegin() { bool bRet = true; boost::unique_lock lock(m_lock); if (GetContinueFlagNL() || GetForceFlagNL()) { SetForceFlagNL(false); SetContinueFlagNL(false); } else bRet = false; return bRet; } void TTask::SetForceFlag(bool bFlag) { boost::unique_lock lock(m_lock); m_bForce = bFlag; } bool TTask::GetForceFlag() { boost::shared_lock lock(m_lock); return m_bForce; } void TTask::SetContinueFlag(bool bFlag) { boost::unique_lock lock(m_lock); m_bContinue = bFlag; } bool TTask::GetContinueFlag() { boost::shared_lock lock(m_lock); return m_bContinue; } ///////////////////////////////////////////////////////////////////////////////////////////////////////////// void TTask::SetForceFlagNL(bool bFlag) { m_bForce = bFlag; } bool TTask::GetForceFlagNL() { return m_bForce; } void TTask::SetContinueFlagNL(bool bFlag) { m_bContinue = bFlag; } bool TTask::GetContinueFlagNL() { return m_bContinue; } TSubTaskBase::ESubOperationResult TTask::CheckForWaitState() { // limiting operation count SetTaskState(eTaskState_Waiting); bool bContinue = false; while (!bContinue) { if (CanBegin()) { SetTaskState(eTaskState_Processing); bContinue = true; LOG_INFO(m_spLog) << _T("Finished waiting for begin permission"); // return; // skips sleep and kill flag checking } else Sleep(50); // not to make it too hard for processor if (m_workerThread.KillRequested()) { // log LOG_INFO(m_spLog) << _T("Kill request while waiting for begin permission (wait state)"); return TSubTaskBase::eSubResult_KillRequest; } } return TSubTaskBase::eSubResult_Continue; } DWORD WINAPI TTask::DelegateThreadProc(LPVOID pParam) { BOOST_ASSERT(pParam); if (!pParam) return 1; TTask* pTask = (TTask*)pParam; return pTask->ThrdProc(); } DWORD TTask::ThrdProc() { // start tracking time for this thread TScopedRunningTimeTracker tProcessingGuard(m_tLocalStats); // set time tracker and ensure it is unset on scope exit m_spFeedbackManager->SetTimeTracker(&tProcessingGuard); #pragma warning(push) #pragma warning(disable: 4459) BOOST_SCOPE_EXIT(&m_spFeedbackManager) { m_spFeedbackManager->SetTimeTracker(nullptr); } BOOST_SCOPE_EXIT_END #pragma warning(pop) const size_t ExceptionBufferSize = 2048; std::unique_ptr upExceptionInfoBuffer(new wchar_t[ExceptionBufferSize]); try { // start operation OnBeginOperation(); // enable configuration changes tracking m_tConfiguration.ConnectToNotifier(TTaskConfigTracker::NotificationProc, &m_cfgTracker); m_tConfiguration.ConnectToNotifier(TTask::OnCfgOptionChanged, this); // verify configuration is valid TTaskConfigVerifier::VerifyAndUpdate(m_tConfiguration, m_spLog); // set thread options HANDLE hThread = GetCurrentThread(); ::SetThreadPriorityBoost(hThread, GetTaskPropValue(m_tConfiguration)); // initialize subtask array m_tSubTasksArray.InitBeforeExec(); // exec the estimation subtasks TSubTaskBase::ESubOperationResult eResult = m_tSubTasksArray.Execute(true); // go into wait state only in case the preprocessing did not finish the operation already // (only fast move can do that right now) if (eResult == TSubTaskBase::eSubResult_Continue && !m_tSubTasksArray.AreAllBasePathsProcessed()) { TScopedRunningTimeTrackerPause scopedPause(tProcessingGuard); eResult = CheckForWaitState(); // operation limiting } if (eResult == TSubTaskBase::eSubResult_Continue) eResult = m_tSubTasksArray.Execute(false); // change status to finished if (eResult == TSubTaskBase::eSubResult_Continue) SetTaskState(eTaskState_Finished); // finishing processing // change task status switch (eResult) { case TSubTaskBase::eSubResult_Error: m_spFeedbackManager->OperationEvent(eOperationEvent_Error); SetTaskState(eTaskState_Error); break; case TSubTaskBase::eSubResult_CancelRequest: SetTaskState(eTaskState_Cancelled); break; case TSubTaskBase::eSubResult_PauseRequest: SetTaskState(eTaskState_Paused); break; case TSubTaskBase::eSubResult_KillRequest: // the only operation if (GetTaskState() == eTaskState_Waiting) SetTaskState(eTaskState_Processing); break; case TSubTaskBase::eSubResult_Continue: m_spFeedbackManager->OperationEvent(eOperationEvent_Finished); SetTaskState(eTaskState_Finished); break; default: BOOST_ASSERT(false); throw TCoreException(eErr_UnhandledCase, L"Unknown feedback result", LOCATION); } // if the files cache is not completely read - clean it up if (!m_tSubTaskContext.GetFilesCache().IsComplete()) m_tSubTaskContext.GetFilesCache().Clear(); // scanning for files did not finish processing, so the content of the files cache are useless // save progress before killed Store(true); // reset flags SetContinueFlag(false); SetForceFlag(false); m_tConfiguration.DisconnectFromNotifier(TTaskConfigTracker::NotificationProc); m_tConfiguration.DisconnectFromNotifier(TTask::OnCfgOptionChanged); // and the real end OnEndOperation(); return 0; } catch(const TBaseException& e) { e.GetDetailedErrorInfo(upExceptionInfoBuffer.get(), ExceptionBufferSize); } catch(const std::exception& e) { swprintf_s(upExceptionInfoBuffer.get(), ExceptionBufferSize, L"%S", e.what()); } catch (...) { swprintf_s(upExceptionInfoBuffer.get(), ExceptionBufferSize, L"Unspecified error occurred"); } m_tConfiguration.DisconnectFromNotifier(TTaskConfigTracker::NotificationProc); m_tConfiguration.DisconnectFromNotifier(TTask::OnCfgOptionChanged); // log TString strMsg = TString(L"Caught exception in ThrdProc: ") + upExceptionInfoBuffer.get(); LOG_ERROR(m_spLog) << strMsg.c_str(); // let others know some error happened m_spFeedbackManager->OperationEvent(eOperationEvent_Error); SetTaskState(eTaskState_Error); SetContinueFlag(false); SetForceFlag(false); OnEndOperation(); return 1; } void TTask::OnBeginOperation() { LOG_INFO(m_spLog) << _T("Processing thread started"); } void TTask::OnEndOperation() { LOG_INFO(m_spLog) << _T("Finished processing data"); } void TTask::RequestStopThread() { m_workerThread.SignalThreadToStop(); } void TTask::OnCfgOptionChanged(const TStringSet& rsetChanges, void* pParam) { TTask* pTask = (TTask*)pParam; if (!pTask) throw TCoreException(eErr_InvalidArgument, L"pParam is null, task pointer not provided", LOCATION); if (rsetChanges.HasValue(TaskPropData::GetPropertyName())) { pTask->m_workerThread.ChangePriority(GetTaskPropValue(pTask->m_tConfiguration)); } } bool TTask::IsRunning() const { return m_tLocalStats.IsRunning(); } TSmartPath TTask::GetLogPath() const { std::wstring wstrPath = m_spLog->GetLogFilePath(); return PathFromString(wstrPath.c_str()); } void TTask::GetLogPaths(logger::TLoggerPaths& rLogPaths) const { logger::TLogFileDataPtr spFileData = m_spLog->GetLogFileData(); spFileData->GetAllLogPaths(rLogPaths); } TString TTask::GetTaskName() const { return m_tBaseData.GetTaskName(); } ISerializerPtr TTask::GetSerializer() const { return m_spSerializer; } }