Index: src/libchcore/TTaskManager.cpp =================================================================== diff -u -N -ra5f396da5ed5ffb3fcd9fdf22afb5a7fd07e1ab8 -rb1ecc12ba4c1f2a7b4acd6e82fc4193535e55ff0 --- src/libchcore/TTaskManager.cpp (.../TTaskManager.cpp) (revision a5f396da5ed5ffb3fcd9fdf22afb5a7fd07e1ab8) +++ src/libchcore/TTaskManager.cpp (.../TTaskManager.cpp) (revision b1ecc12ba4c1f2a7b4acd6e82fc4193535e55ff0) @@ -28,14 +28,20 @@ #include "TTaskManagerStatsSnapshot.h" #include "TCoreException.h" #include "ErrorCodes.h" +#include "TTaskInfo.h" +#include +#include +#include +#include BEGIN_CHCORE_NAMESPACE //////////////////////////////////////////////////////////////////////////////// // TTaskManager members -TTaskManager::TTaskManager() : +TTaskManager::TTaskManager(const ITaskManagerSerializerPtr& spSerializer) : m_piFeedbackFactory(NULL), - m_stNextSessionUniqueID(NO_TASK_SESSION_UNIQUE_ID + 1) + m_stNextTaskID(NoTaskID + 1), + m_spSerializer(spSerializer) { } @@ -53,14 +59,21 @@ TTaskPtr TTaskManager::CreateTask(const TTaskDefinition& tTaskDefinition) { - TTaskPtr spTask = CreateEmptyTask(); - if(spTask) - { - spTask->SetTaskDefinition(tTaskDefinition); - Add(spTask); - spTask->Store(); - } + TString strUuid = GetUuid(); + TSmartPath pathTaskSerializer = CreateTaskSerializePath(strUuid); + TSmartPath pathTaskLog = CreateTaskLogPath(strUuid); + IFeedbackHandler* piHandler = CreateNewFeedbackHandler(); + ITaskSerializerPtr spSerializer = m_spSerializer->CreateTaskSerializer(pathTaskSerializer); + + TTaskPtr spTask(new TTask(spSerializer, piHandler)); + spTask->SetLogPath(pathTaskLog); + spTask->SetTaskDefinition(tTaskDefinition); + + Add(spTask); + + spTask->Store(); + return spTask; } @@ -73,165 +86,136 @@ return CreateTask(tTaskDefinition); } -TTaskPtr TTaskManager::CreateEmptyTask() -{ - BOOST_ASSERT(m_piFeedbackFactory); - if(!m_piFeedbackFactory) - return TTaskPtr(); - - IFeedbackHandler* piHandler = m_piFeedbackFactory->Create(); - if(!piHandler) - return TTaskPtr(); - - BOOST_ASSERT(m_stNextSessionUniqueID != NO_TASK_SESSION_UNIQUE_ID); - TTaskPtr spTask(new TTask(piHandler, m_stNextSessionUniqueID++)); - - // NO_TASK_SESSION_UNIQUE_ID is a special value so it should not be used to identify tasks - if(m_stNextSessionUniqueID == NO_TASK_SESSION_UNIQUE_ID) - ++m_stNextSessionUniqueID; - - return spTask; -} - size_t TTaskManager::GetSize() const { boost::shared_lock lock(m_lock); - return m_vTasks.size(); + return m_tTasks.GetCount(); } TTaskPtr TTaskManager::GetAt(size_t nIndex) const { boost::shared_lock lock(m_lock); - - _ASSERTE(nIndex >= 0 && nIndex < m_vTasks.size()); - if(nIndex >= m_vTasks.size()) - THROW_CORE_EXCEPTION(eErr_InvalidArgument); - - return m_vTasks.at(nIndex); + const TTaskInfoEntry& rInfo = m_tTasks.GetAt(nIndex); + return rInfo.GetTask(); } -TTaskPtr TTaskManager::GetTaskBySessionUniqueID(size_t stSessionUniqueID) const +TTaskPtr TTaskManager::GetTaskByTaskID(taskid_t tTaskID) const { - if(stSessionUniqueID == NO_TASK_SESSION_UNIQUE_ID) + if(tTaskID == NoTaskID) return TTaskPtr(); - TTaskPtr spFoundTask; + TTaskInfoEntry tEntry; boost::shared_lock lock(m_lock); - BOOST_FOREACH(const TTaskPtr& spTask, m_vTasks) - { - if(spTask->GetSessionUniqueID() == stSessionUniqueID) - { - spFoundTask = spTask; - break; - } - } - return spFoundTask; + if(!m_tTasks.GetByTaskID(tTaskID, tEntry)) + return TTaskPtr(); + + return tEntry.GetTask(); } -size_t TTaskManager::Add(const TTaskPtr& spNewTask) +void TTaskManager::Add(const TTaskPtr& spNewTask) { if(!spNewTask) THROW_CORE_EXCEPTION(eErr_InvalidArgument); boost::unique_lock lock(m_lock); - // here we know load succeeded - spNewTask->SetTaskDirectory(m_pathTasksDir); - m_vTasks.push_back(spNewTask); + int iOrder = 1; + if(!m_tTasks.IsEmpty()) + { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(m_tTasks.GetCount() - 1); + iOrder = rEntry.GetOrder() + 1; + } - spNewTask->OnRegisterTask(); + m_tTasks.Add(m_stNextTaskID++, spNewTask->GetSerializerPath(), iOrder, spNewTask); - return m_vTasks.size() - 1; + spNewTask->OnRegisterTask(); } -void TTaskManager::RemoveAt(size_t stIndex, size_t stCount) +void TTaskManager::ClearBeforeExit() { - boost::unique_lock lock(m_lock); + StopAllTasks(); - _ASSERTE(stIndex >= m_vTasks.size() || stIndex + stCount > m_vTasks.size()); - if(stIndex >= m_vTasks.size() || stIndex + stCount > m_vTasks.size()) - THROW_CORE_EXCEPTION(eErr_InvalidArgument); + // ensure everything is stored so that we can resume processing in the future + Store(); - for(std::vector::iterator iterTask = m_vTasks.begin() + stIndex; iterTask != m_vTasks.begin() + stIndex + stCount; ++iterTask) + // now remove all tasks without serializing anymore (prevent accidental + // serialization) { - TTaskPtr& spTask = *iterTask; - - // kill task if needed - spTask->KillThread(); - - spTask->OnUnregisterTask(); + boost::unique_lock lock(m_lock); + m_tTasks.Clear(); + m_tTasks.ClearModifications(); } - - // remove elements from array - m_vTasks.erase(m_vTasks.begin() + stIndex, m_vTasks.begin() + stIndex + stCount); } -void TTaskManager::RemoveAll() -{ - boost::unique_lock lock(m_lock); - - StopAllTasksNL(); - - m_vTasks.clear(); -} - void TTaskManager::RemoveAllFinished() { - std::vector vTasksToRemove; + std::vector vTasksToRemove; // separate scope for locking { boost::unique_lock lock(m_lock); - size_t stIndex = m_vTasks.size(); + size_t stIndex = m_tTasks.GetCount(); while(stIndex--) { - TTaskPtr spTask = m_vTasks.at(stIndex); + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); // delete only when the thread is finished if((spTask->GetTaskState() == eTaskState_Finished || spTask->GetTaskState() == eTaskState_Cancelled)) { + spTask->KillThread(); + spTask->OnUnregisterTask(); - vTasksToRemove.push_back(spTask); - m_vTasks.erase(m_vTasks.begin() + stIndex); + vTasksToRemove.push_back(rEntry.GetTaskPath()); + m_tTasks.RemoveAt(stIndex); } } } - BOOST_FOREACH(TTaskPtr& spTask, vTasksToRemove) + BOOST_FOREACH(TSmartPath& spTaskPath, vTasksToRemove) { // delete associated files - spTask->DeleteProgress(); + DeleteFile(spTaskPath.ToString()); } } void TTaskManager::RemoveFinished(const TTaskPtr& spSelTask) { - boost::unique_lock lock(m_lock); + std::vector vTasksToRemove; - // this might be optimized by copying tasks to a local table in critical section, and then deleting progress files outside of the critical section - for(std::vector::iterator iterTask = m_vTasks.begin(); iterTask != m_vTasks.end(); ++iterTask) + // separate scope for locking { - TTaskPtr& spTask = *iterTask; + boost::unique_lock lock(m_lock); - if(spTask == spSelTask && (spTask->GetTaskState() == eTaskState_Finished || spTask->GetTaskState() == eTaskState_Cancelled)) + size_t stIndex = m_tTasks.GetCount(); + while(stIndex--) { - // kill task if needed - spTask->KillThread(); + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); - spTask->OnUnregisterTask(); + // delete only when the thread is finished + if(spTask == spSelTask && (spTask->GetTaskState() == eTaskState_Finished || spTask->GetTaskState() == eTaskState_Cancelled)) + { + spTask->KillThread(); - // delete associated files - spTask->DeleteProgress(); + spTask->OnUnregisterTask(); - m_vTasks.erase(iterTask); - - return; + vTasksToRemove.push_back(rEntry.GetTaskPath()); + m_tTasks.RemoveAt(stIndex); + break; + } } } + + BOOST_FOREACH(TSmartPath& spTaskPath, vTasksToRemove) + { + // delete associated files + DeleteFile(spTaskPath.ToString()); + } } void TTaskManager::StopAllTasks() @@ -252,8 +236,11 @@ if(stTasksToRun > 0) { - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + // turn on some thread - find something with wait state if(spTask->GetTaskState() == eTaskState_Waiting) { @@ -265,105 +252,46 @@ } } -void TTaskManager::SaveData() -{ - boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) - { - spTask->Store(); - } -} - -void TTaskManager::LoadDataProgress() -{ - if(m_pathTasksDir.IsEmpty()) - THROW_CORE_EXCEPTION(eErr_MissingTaskSerializationPath); - - TTaskPtr spTask; - TSmartPath pathFound; - WIN32_FIND_DATA wfd; - bool bExceptionEncountered = false; - - const size_t stMaxMsgSize = 4096; - boost::shared_array spMsgBuffer(new wchar_t[stMaxMsgSize]); - spMsgBuffer[0] = _T('\0'); - - // find all CH Task files - TSmartPath pathToFind = m_pathTasksDir + PathFromString(_T("*.cht")); - - HANDLE hFind = ::FindFirstFile(pathToFind.ToString(), &wfd); - BOOL bContinue = TRUE; - while(hFind != INVALID_HANDLE_VALUE && bContinue) - { - pathFound = m_pathTasksDir + PathFromString(wfd.cFileName); - // load data - spTask = CreateEmptyTask(); - try - { - spTask->Load(pathFound); - - // add read task to array - Add(spTask); - } - catch(icpf::exception& e) - { - e.get_info(spMsgBuffer.get(), stMaxMsgSize); - bExceptionEncountered = true; - } - catch(std::exception& e) - { - _tcsncpy_s(spMsgBuffer.get(), stMaxMsgSize, CA2CT(e.what()), _TRUNCATE); - bExceptionEncountered = true; - } - - if(bExceptionEncountered) - { - TString strFmt = _T("Cannot load task data: %path (reason: %reason)"); - strFmt.Replace(_T("%path"), pathFound.ToString()); - strFmt.Replace(_T("%reason"), spMsgBuffer.get()); - - LOG_ERROR(strFmt); - - bExceptionEncountered = false; - } - bContinue = ::FindNextFile(hFind, &wfd); - } - - ::FindClose(hFind); -} - void TTaskManager::TasksBeginProcessing() { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->BeginProcessing(); } } void TTaskManager::TasksPauseProcessing() { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->PauseProcessing(); } } void TTaskManager::TasksResumeProcessing() { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->ResumeProcessing(); } } void TTaskManager::TasksRestartProcessing() { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->RestartProcessing(); } } @@ -372,8 +300,10 @@ { boost::shared_lock lock(m_lock); bool bChanged=false; - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); if(spTask->RetryProcessing()) bChanged = true; } @@ -384,8 +314,10 @@ void TTaskManager::TasksCancelProcessing() { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->CancelProcessing(); } } @@ -399,8 +331,11 @@ else { boost::shared_lock lock(m_lock); - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + ETaskCurrentState eState = spTask->GetTaskState(); bFlag = (eState == eTaskState_Finished || eState == eTaskState_Cancelled || eState == eTaskState_Paused || eState == eTaskState_Error); @@ -412,12 +347,6 @@ return bFlag; } -void TTaskManager::SetTasksDir(const TSmartPath& pathDir) -{ - boost::unique_lock lock(m_lock); - m_pathTasksDir = pathDir; -} - void TTaskManager::GetStatsSnapshot(TTaskManagerStatsSnapshotPtr& spSnapshot) const { if(!spSnapshot) @@ -428,10 +357,13 @@ boost::shared_lock lock(m_lock); size_t stRunningTasks = 0; - BOOST_FOREACH(const TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); TTaskStatsSnapshotPtr spStats(new TTaskStatsSnapshot); spTask->GetStatsSnapshot(spStats); + spStats->SetTaskID(rEntry.GetTaskID()); if(spStats->IsTaskRunning() && spStats->GetTaskState()) ++stRunningTasks; @@ -450,8 +382,10 @@ size_t stRunningTasks = 0; - BOOST_FOREACH(const TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + const TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); if(spTask->IsRunning() && spTask->GetTaskState() == eTaskState_Processing) ++stRunningTasks; } @@ -462,16 +396,114 @@ void TTaskManager::StopAllTasksNL() { // kill all unfinished tasks - send kill request - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->RequestStopThread(); } // wait for finishing - BOOST_FOREACH(TTaskPtr& spTask, m_vTasks) + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); spTask->KillThread(); } } +IFeedbackHandler* TTaskManager::CreateNewFeedbackHandler() +{ + BOOST_ASSERT(m_piFeedbackFactory); + if(!m_piFeedbackFactory) + return NULL; + + IFeedbackHandler* piHandler = m_piFeedbackFactory->Create(); + + return piHandler; +} + +void TTaskManager::Store() +{ + // store this container information + TTaskInfoContainer tDataDiff; + { + boost::shared_lock lock(m_lock); + m_tTasks.GetDiffAndResetModifications(tDataDiff); + } + + try + { + m_spSerializer->Store(tDataDiff); + } + catch(const std::exception&) + { + boost::unique_lock lock(m_lock); + m_tTasks.RestoreModifications(tDataDiff); + + throw; + } + + // trigger storing tasks + boost::shared_lock lock(m_lock); + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + TTaskPtr spTask = rEntry.GetTask(); + + spTask->Store(); + } + +} + +void TTaskManager::Load() +{ + boost::unique_lock lock(m_lock); + + if(!m_tTasks.IsEmpty()) + THROW_CORE_EXCEPTION(eErr_InternalProblem); + + m_spSerializer->Load(m_tTasks); + + // clear all modifications of freshly loaded tasks (in case serializer does + // not reset the modification state) + m_tTasks.ClearModifications(); + + // load tasks + for(size_t stIndex = 0; stIndex < m_tTasks.GetCount(); ++stIndex) + { + TTaskInfoEntry& rEntry = m_tTasks.GetAt(stIndex); + + if(!rEntry.GetTask()) + { + IFeedbackHandler* piHandler = CreateNewFeedbackHandler(); + ITaskSerializerPtr spSerializer = m_spSerializer->CreateTaskSerializer(rEntry.GetTaskPath()); + + TTaskPtr spTask(new TTask(spSerializer, piHandler)); + spTask->Load(); + + rEntry.SetTask(spTask); + } + } +} + +TString TTaskManager::GetUuid() +{ + boost::uuids::random_generator gen; + boost::uuids::uuid u = gen(); + return boost::lexical_cast(u).c_str(); +} + +TSmartPath TTaskManager::CreateTaskLogPath(const TString& strUuid) const +{ + TSmartPath pathLog = m_pathLogDir + PathFromString(TString(_T("Task-")) + strUuid + _T(".log")); + return pathLog; +} + +chcore::TSmartPath TTaskManager::CreateTaskSerializePath(const TString& strUuid) const +{ + TSmartPath pathLog = m_pathLogDir + PathFromString(TString(_T("Task-")) + strUuid + _T(".sqlite")); + return pathLog; +} + END_CHCORE_NAMESPACE