Index: src/libchcore/TTaskManager.cpp =================================================================== diff -u -N -r2e384de25de613cb582a966df7d1cb9468f1c825 -re96806b7f8ff7ca7e9f4afbea603e6351a3dc3e3 --- src/libchcore/TTaskManager.cpp (.../TTaskManager.cpp) (revision 2e384de25de613cb582a966df7d1cb9468f1c825) +++ src/libchcore/TTaskManager.cpp (.../TTaskManager.cpp) (revision e96806b7f8ff7ca7e9f4afbea603e6351a3dc3e3) @@ -29,177 +29,143 @@ #include "SerializerTrace.h" #include "TFakeFileSerializer.h" -BEGIN_CHCORE_NAMESPACE - -//////////////////////////////////////////////////////////////////////////////// -// TTaskManager members -TTaskManager::TTaskManager(const ISerializerFactoryPtr& spSerializerFactory, - const IFeedbackHandlerFactoryPtr& spFeedbackHandlerFactory, - const TSmartPath& pathLogDir, - bool bForceRecreateSerializer) : - m_spSerializerFactory(spSerializerFactory), - m_spFeedbackFactory(spFeedbackHandlerFactory), - m_pathLogDir(pathLogDir) +namespace chcore { - if(!spFeedbackHandlerFactory || !spSerializerFactory) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - m_spSerializer = m_spSerializerFactory->CreateTaskManagerSerializer(bForceRecreateSerializer); -} - -TTaskManager::~TTaskManager() -{ - // NOTE: do not delete the feedback factory, since we are not responsible for releasing it -} - -TTaskPtr TTaskManager::CreateTask(const TTaskDefinition& tTaskDefinition) -{ - IFeedbackHandlerPtr spHandler = m_spFeedbackFactory->Create(); - ISerializerPtr spSerializer = m_spSerializerFactory->CreateTaskSerializer(tTaskDefinition.GetTaskName()); - - TTaskPtr spTask(new TTask(spSerializer, spHandler)); - spTask->SetLogPath(CreateTaskLogPath(tTaskDefinition.GetTaskName())); - spTask->SetTaskDefinition(tTaskDefinition); - - Add(spTask); - - spTask->Store(); - - return spTask; -} - -TTaskPtr TTaskManager::ImportTask(const TSmartPath& strTaskPath) -{ - // load task definition from the new location - TTaskDefinition tTaskDefinition; - tTaskDefinition.Load(strTaskPath); - - for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + //////////////////////////////////////////////////////////////////////////////// + // TTaskManager members + TTaskManager::TTaskManager(const ISerializerFactoryPtr& spSerializerFactory, + const IFeedbackHandlerFactoryPtr& spFeedbackHandlerFactory, + const TSmartPath& pathLogDir, + bool bForceRecreateSerializer) : + m_spSerializerFactory(spSerializerFactory), + m_spFeedbackFactory(spFeedbackHandlerFactory), + m_pathLogDir(pathLogDir) { - const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); + if (!spFeedbackHandlerFactory || !spSerializerFactory) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + m_spSerializer = m_spSerializerFactory->CreateTaskManagerSerializer(bForceRecreateSerializer); + } - if (spTask->GetTaskName() == tTaskDefinition.GetTaskName()) - THROW_CORE_EXCEPTION(eErr_TaskAlreadyExists); + TTaskManager::~TTaskManager() + { + // NOTE: do not delete the feedback factory, since we are not responsible for releasing it } - return CreateTask(tTaskDefinition); -} + TTaskPtr TTaskManager::CreateTask(const TTaskDefinition& tTaskDefinition) + { + IFeedbackHandlerPtr spHandler = m_spFeedbackFactory->Create(); + ISerializerPtr spSerializer = m_spSerializerFactory->CreateTaskSerializer(tTaskDefinition.GetTaskName()); -size_t TTaskManager::GetSize() const -{ - boost::shared_lock lock(m_lock); - return m_tTasks.GetCount(); -} + TTaskPtr spTask(new TTask(spSerializer, spHandler)); + spTask->SetLogPath(CreateTaskLogPath(tTaskDefinition.GetTaskName())); + spTask->SetTaskDefinition(tTaskDefinition); -TTaskPtr TTaskManager::GetAt(size_t nIndex) const -{ - boost::shared_lock lock(m_lock); - const TTaskInfoEntry& rInfo = m_tTasks.GetAt(nIndex); - return rInfo.GetTask(); -} + Add(spTask); -TTaskPtr TTaskManager::GetTaskByTaskID(taskid_t tTaskID) const -{ - if(tTaskID == NoTaskID) - return TTaskPtr(); + spTask->Store(); - TTaskInfoEntry tEntry; + return spTask; + } - boost::shared_lock lock(m_lock); + TTaskPtr TTaskManager::ImportTask(const TSmartPath& strTaskPath) + { + // load task definition from the new location + TTaskDefinition tTaskDefinition; + tTaskDefinition.Load(strTaskPath); - if(!m_tTasks.GetByTaskID(tTaskID, tEntry)) - return TTaskPtr(); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); - return tEntry.GetTask(); -} + if (spTask->GetTaskName() == tTaskDefinition.GetTaskName()) + THROW_CORE_EXCEPTION(eErr_TaskAlreadyExists); + } -void TTaskManager::Add(const TTaskPtr& spNewTask) -{ - if(!spNewTask) - THROW_CORE_EXCEPTION(eErr_InvalidArgument); + return CreateTask(tTaskDefinition); + } - boost::unique_lock lock(m_lock); + size_t TTaskManager::GetSize() const + { + boost::shared_lock lock(m_lock); + return m_tTasks.GetCount(); + } - int iOrder = 1; - if(!m_tTasks.IsEmpty()) + TTaskPtr TTaskManager::GetAt(size_t nIndex) const { - const TTaskInfoEntry& rEntry = m_tTasks.GetAt(m_tTasks.GetCount() - 1); - iOrder = rEntry.GetOrder() + 1; + boost::shared_lock lock(m_lock); + const TTaskInfoEntry& rInfo = m_tTasks.GetAt(nIndex); + return rInfo.GetTask(); } - m_tTasks.Add(spNewTask->GetSerializer()->GetLocation(), iOrder, spNewTask); + TTaskPtr TTaskManager::GetTaskByTaskID(taskid_t tTaskID) const + { + if (tTaskID == NoTaskID) + return TTaskPtr(); - spNewTask->OnRegisterTask(); -} + TTaskInfoEntry tEntry; -void TTaskManager::ClearBeforeExit() -{ - StopAllTasks(); + boost::shared_lock lock(m_lock); - // ensure everything is stored so that we can resume processing in the future - Store(); + if (!m_tTasks.GetByTaskID(tTaskID, tEntry)) + return TTaskPtr(); - // now remove all tasks without serializing anymore (prevent accidental - // serialization) - { - boost::unique_lock lock(m_lock); - m_tTasks.Clear(); - m_tTasks.ClearModifications(); + return tEntry.GetTask(); } -} -void TTaskManager::RemoveAllFinished() -{ - // separate scope for locking + void TTaskManager::Add(const TTaskPtr& spNewTask) { + if (!spNewTask) + THROW_CORE_EXCEPTION(eErr_InvalidArgument); + boost::unique_lock lock(m_lock); - size_t stIndex = m_tTasks.GetCount(); - while(stIndex--) + int iOrder = 1; + if (!m_tTasks.IsEmpty()) { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(m_tTasks.GetCount() - 1); + iOrder = rEntry.GetOrder() + 1; + } - // delete only when the thread is finished - ETaskCurrentState eState = spTask->GetTaskState(); + m_tTasks.Add(spNewTask->GetSerializer()->GetLocation(), iOrder, spNewTask); - if((eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_LoadError)) - { - spTask->KillThread(); + spNewTask->OnRegisterTask(); + } - spTask->OnUnregisterTask(); + void TTaskManager::ClearBeforeExit() + { + StopAllTasks(); - m_tObsoleteFiles.DeleteObsoleteFile(spTask->GetSerializer()->GetLocation()); - m_tObsoleteFiles.DeleteObsoleteFile(spTask->GetLogPath()); + // ensure everything is stored so that we can resume processing in the future + Store(); - m_tTasks.RemoveAt(stIndex); - } + // now remove all tasks without serializing anymore (prevent accidental + // serialization) + { + boost::unique_lock lock(m_lock); + m_tTasks.Clear(); + m_tTasks.ClearModifications(); } } -} -void TTaskManager::RemoveFinished(const TTaskPtr& spSelTask) -{ - // separate scope for locking + void TTaskManager::RemoveAllFinished() { - boost::unique_lock lock(m_lock); - - size_t stIndex = m_tTasks.GetCount(); - while(stIndex--) + // separate scope for locking { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + boost::unique_lock lock(m_lock); - // delete only when the thread is finished - if(spTask == spSelTask) + size_t stIndex = m_tTasks.GetCount(); + while (stIndex--) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + + // delete only when the thread is finished ETaskCurrentState eState = spTask->GetTaskState(); - if(eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_LoadError) + if ((eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_LoadError)) { spTask->KillThread(); @@ -210,331 +176,364 @@ m_tTasks.RemoveAt(stIndex); } - break; } } } -} -void TTaskManager::StopAllTasks() -{ - boost::unique_lock lock(m_lock); + void TTaskManager::RemoveFinished(const TTaskPtr& spSelTask) + { + // separate scope for locking + { + boost::unique_lock lock(m_lock); - StopAllTasksNL(); -} + size_t stIndex = m_tTasks.GetCount(); + while (stIndex--) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); -void TTaskManager::ResumeWaitingTasks(size_t stMaxRunningTasks) -{ - size_t stRunningCount = GetCountOfRunningTasks(); + // delete only when the thread is finished + if (spTask == spSelTask) + { + ETaskCurrentState eState = spTask->GetTaskState(); - boost::shared_lock lock(m_lock); + if (eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_LoadError) + { + spTask->KillThread(); - size_t stTasksToRun = stMaxRunningTasks == 0 ? std::numeric_limits::max() : stMaxRunningTasks; - stTasksToRun -= stRunningCount; + spTask->OnUnregisterTask(); - if(stTasksToRun > 0) - { - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) - { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + m_tObsoleteFiles.DeleteObsoleteFile(spTask->GetSerializer()->GetLocation()); + m_tObsoleteFiles.DeleteObsoleteFile(spTask->GetLogPath()); - // turn on some thread - find something with wait state - if(spTask->GetTaskState() == eTaskState_Waiting) - { - spTask->SetContinueFlagNL(true); - if(--stTasksToRun == 0) + m_tTasks.RemoveAt(stIndex); + } break; + } } } } -} -void TTaskManager::TasksBeginProcessing() -{ - boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::StopAllTasks() { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->BeginProcessing(); + boost::unique_lock lock(m_lock); + + StopAllTasksNL(); } -} -void TTaskManager::TasksPauseProcessing() -{ - boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::ResumeWaitingTasks(size_t stMaxRunningTasks) { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->PauseProcessing(); + size_t stRunningCount = GetCountOfRunningTasks(); + + boost::shared_lock lock(m_lock); + + size_t stTasksToRun = stMaxRunningTasks == 0 ? std::numeric_limits::max() : stMaxRunningTasks; + stTasksToRun -= stRunningCount; + + if (stTasksToRun > 0) + { + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + + // turn on some thread - find something with wait state + if (spTask->GetTaskState() == eTaskState_Waiting) + { + spTask->SetContinueFlagNL(true); + if (--stTasksToRun == 0) + break; + } + } + } } -} -void TTaskManager::TasksResumeProcessing() -{ - boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::TasksBeginProcessing() { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->ResumeProcessing(); + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->BeginProcessing(); + } } -} -void TTaskManager::TasksRestartProcessing() -{ - boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::TasksPauseProcessing() { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->RestartProcessing(); + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->PauseProcessing(); + } } -} -bool TTaskManager::TasksRetryProcessing() -{ - boost::shared_lock lock(m_lock); - bool bChanged=false; - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::TasksResumeProcessing() { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - if(spTask->RetryProcessing()) - bChanged = true; + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->ResumeProcessing(); + } } - return bChanged; -} - -void TTaskManager::TasksCancelProcessing() -{ - boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + void TTaskManager::TasksRestartProcessing() { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->CancelProcessing(); + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->RestartProcessing(); + } } -} -bool TTaskManager::AreAllFinished() -{ - bool bFlag=true; - - if(GetCountOfRunningTasks() != 0) - bFlag = false; - else + bool TTaskManager::TasksRetryProcessing() { boost::shared_lock lock(m_lock); - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + bool bChanged = false; + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) + if (!spTask) THROW_CORE_EXCEPTION(eErr_InvalidPointer); + if (spTask->RetryProcessing()) + bChanged = true; + } - ETaskCurrentState eState = spTask->GetTaskState(); - bFlag = (eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_Paused || eState == eTaskState_Error || eState == eTaskState_LoadError); + return bChanged; + } - if(!bFlag) - break; + void TTaskManager::TasksCancelProcessing() + { + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->CancelProcessing(); } } - return bFlag; -} + bool TTaskManager::AreAllFinished() + { + bool bFlag = true; -void TTaskManager::GetStatsSnapshot(TTaskManagerStatsSnapshotPtr& spSnapshot) const -{ - if(!spSnapshot) - THROW_CORE_EXCEPTION(eErr_InvalidArgument); + if (GetCountOfRunningTasks() != 0) + bFlag = false; + else + { + boost::shared_lock lock(m_lock); + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spSnapshot->Clear(); + ETaskCurrentState eState = spTask->GetTaskState(); + bFlag = (eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_Paused || eState == eTaskState_Error || eState == eTaskState_LoadError); - boost::shared_lock lock(m_lock); + if (!bFlag) + break; + } + } - size_t stRunningTasks = 0; - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + return bFlag; + } + + void TTaskManager::GetStatsSnapshot(TTaskManagerStatsSnapshotPtr& spSnapshot) const { - const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + if (!spSnapshot) + THROW_CORE_EXCEPTION(eErr_InvalidArgument); - TTaskStatsSnapshotPtr spStats(new TTaskStatsSnapshot); - spTask->GetStatsSnapshot(spStats); - spStats->SetTaskID(rEntry.GetObjectID()); + spSnapshot->Clear(); - if(spStats->IsTaskRunning() && spStats->GetTaskState()) - ++stRunningTasks; + boost::shared_lock lock(m_lock); - spSnapshot->AddTaskStats(spStats); - } + size_t stRunningTasks = 0; + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spSnapshot->SetRunningTasks(stRunningTasks); -} + TTaskStatsSnapshotPtr spStats(new TTaskStatsSnapshot); + spTask->GetStatsSnapshot(spStats); + spStats->SetTaskID(rEntry.GetObjectID()); -size_t TTaskManager::GetCountOfRunningTasks() const -{ - boost::shared_lock lock(m_lock); + if (spStats->IsTaskRunning() && spStats->GetTaskState()) + ++stRunningTasks; - TTaskStatsSnapshot tTaskStats; + spSnapshot->AddTaskStats(spStats); + } - size_t stRunningTasks = 0; + spSnapshot->SetRunningTasks(stRunningTasks); + } - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + size_t TTaskManager::GetCountOfRunningTasks() const { - const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); + boost::shared_lock lock(m_lock); - if(spTask->IsRunning() && spTask->GetTaskState() == eTaskState_Processing) - ++stRunningTasks; - } + TTaskStatsSnapshot tTaskStats; - return stRunningTasks; -} + size_t stRunningTasks = 0; -void TTaskManager::StopAllTasksNL() -{ - // kill all unfinished tasks - send kill request - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) - { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->RequestStopThread(); - } + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); - // wait for finishing - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) - { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - TTaskPtr spTask = rEntry.GetTask(); - if(!spTask) - THROW_CORE_EXCEPTION(eErr_InvalidPointer); - spTask->KillThread(); + if (spTask->IsRunning() && spTask->GetTaskState() == eTaskState_Processing) + ++stRunningTasks; + } + + return stRunningTasks; } -} -void TTaskManager::Store() -{ - TSimpleTimer timer(true); - - // store this container information + void TTaskManager::StopAllTasksNL() { - ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("tasks")); + // kill all unfinished tasks - send kill request + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->RequestStopThread(); + } - boost::shared_lock lock(m_lock); - m_tTasks.Store(spContainer); - - for(size_t stIndex = 0; stIndex != m_tTasks.GetCount(); ++stIndex) + // wait for finishing + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { - TTaskPtr spTask = m_tTasks.GetAt(stIndex).GetTask(); - spTask->Store(); + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + if (!spTask) + THROW_CORE_EXCEPTION(eErr_InvalidPointer); + spTask->KillThread(); } } - // store obsolete info + void TTaskManager::Store() { - ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("obsolete_tasks")); + TSimpleTimer timer(true); - boost::shared_lock lock(m_lock); - m_tObsoleteFiles.Store(spContainer); - } + // store this container information + { + ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("tasks")); - unsigned long long ullGatherTime = timer.Checkpoint(); ullGatherTime; + boost::shared_lock lock(m_lock); + m_tTasks.Store(spContainer); - m_spSerializer->Flush(); + for (size_t stIndex = 0; stIndex != m_tTasks.GetCount(); ++stIndex) + { + TTaskPtr spTask = m_tTasks.GetAt(stIndex).GetTask(); + spTask->Store(); + } + } - unsigned long long ullFlushTime = timer.Stop(); ullFlushTime; - DBTRACE2(_T("TaskManager::Store() - gather: %I64u ms, flush: %I64u ms\n"), ullGatherTime, ullFlushTime); -} + // store obsolete info + { + ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("obsolete_tasks")); -void TTaskManager::Load() -{ - // load list of tasks (without loading tasks themselves) - { - boost::unique_lock lock(m_lock); + boost::shared_lock lock(m_lock); + m_tObsoleteFiles.Store(spContainer); + } - if(!m_tTasks.IsEmpty()) - THROW_CORE_EXCEPTION(eErr_InternalProblem); + unsigned long long ullGatherTime = timer.Checkpoint(); ullGatherTime; - ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("tasks")); - m_tTasks.Load(spContainer); + m_spSerializer->Flush(); + + unsigned long long ullFlushTime = timer.Stop(); ullFlushTime; + DBTRACE2(_T("TaskManager::Store() - gather: %I64u ms, flush: %I64u ms\n"), ullGatherTime, ullFlushTime); } - // load list of task files to delete + void TTaskManager::Load() { - boost::unique_lock lock(m_lock); + // load list of tasks (without loading tasks themselves) + { + boost::unique_lock lock(m_lock); - ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("obsolete_tasks")); - m_tObsoleteFiles.Load(spContainer); // loader also tries to delete files - } + if (!m_tTasks.IsEmpty()) + THROW_CORE_EXCEPTION(eErr_InternalProblem); - // retrieve information about tasks to load - std::vector > vObjects; - { - boost::shared_lock lock(m_lock); + ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("tasks")); + m_tTasks.Load(spContainer); + } - for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + // load list of task files to delete { - TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); - if(!rEntry.GetTask()) - vObjects.push_back(std::make_pair(rEntry.GetObjectID(), rEntry.GetTaskSerializeLocation())); + boost::unique_lock lock(m_lock); + + ISerializerContainerPtr spContainer = m_spSerializer->GetContainer(_T("obsolete_tasks")); + m_tObsoleteFiles.Load(spContainer); // loader also tries to delete files } - } - for(const auto& rInfo : vObjects) - { - IFeedbackHandlerPtr spHandler = m_spFeedbackFactory->Create(); - ISerializerPtr spSerializer; - - try + // retrieve information about tasks to load + std::vector > vObjects; { - spSerializer = m_spSerializerFactory->CreateTaskSerializer(rInfo.second.ToWString()); + boost::shared_lock lock(m_lock); + + for (size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + if (!rEntry.GetTask()) + vObjects.push_back(std::make_pair(rEntry.GetObjectID(), rEntry.GetTaskSerializeLocation())); + } } - catch (const std::exception&) + + for (const auto& rInfo : vObjects) { - // ignore the exception - } + IFeedbackHandlerPtr spHandler = m_spFeedbackFactory->Create(); + ISerializerPtr spSerializer; - if (!spSerializer) - spSerializer = boost::make_shared(rInfo.second); + try + { + spSerializer = m_spSerializerFactory->CreateTaskSerializer(rInfo.second.ToWString()); + } + catch (const std::exception&) + { + // ignore the exception + } - TTaskPtr spTask(new TTask(spSerializer, spHandler)); - spTask->Load(); + if (!spSerializer) + spSerializer = boost::make_shared(rInfo.second); - boost::unique_lock lock(m_lock); + TTaskPtr spTask(new TTask(spSerializer, spHandler)); + spTask->Load(); - TTaskInfoEntry& rInfoEntry = m_tTasks.GetAtOid(rInfo.first); - rInfoEntry.SetTask(spTask); + boost::unique_lock lock(m_lock); + + TTaskInfoEntry& rInfoEntry = m_tTasks.GetAtOid(rInfo.first); + rInfoEntry.SetTask(spTask); + } } -} -TSmartPath TTaskManager::CreateTaskLogPath(const TString& strTaskUuid) const -{ - TSmartPath pathLog = m_pathLogDir + PathFromWString(TString(_T("Task-")) + strTaskUuid + _T(".log")); - return pathLog; + TSmartPath TTaskManager::CreateTaskLogPath(const TString& strTaskUuid) const + { + TSmartPath pathLog = m_pathLogDir + PathFromWString(TString(_T("Task-")) + strTaskUuid + _T(".log")); + return pathLog; + } } - -END_CHCORE_NAMESPACE