Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -N -rdc1988138aa8e37ce585fed1caa25294781578ac -rfdf4929dc7df1376ed439b7271765f1a4ca31de6 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision dc1988138aa8e37ce585fed1caa25294781578ac) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision fdf4929dc7df1376ed439b7271765f1a4ca31de6) @@ -39,6 +39,7 @@ #include "TFileInfoArray.h" #include "SerializationHelpers.h" #include "TBinarySerializer.h" +#include "TDataBuffer.h" BEGIN_CHCORE_NAMESPACE @@ -136,7 +137,8 @@ TFileInfoPtr spSrcFile; // CFileInfo - src file TSmartPath pathDstFile; // dest path with filename - TDataBuffer dbBuffer; // buffer handling + TBufferSizes tBufferSizes; + TDataBufferManager dbBuffer; // buffer handling bool bOnlyCreate; // flag from configuration - skips real copying - only create bool bProcessed; // has the element been processed ? (false if skipped) }; @@ -199,7 +201,7 @@ // remove changes in buffer sizes to avoid re-creation later rCfgTracker.RemoveModificationSet(TOptionsSet() % eTO_DefaultBufferSize % eTO_OneDiskBufferSize % eTO_TwoDisksBufferSize % eTO_CDBufferSize % eTO_LANBufferSize % eTO_UseOnlyDefaultBuffer); - RecreateBufferIfNeeded(ccp.dbBuffer, true); + AdjustBufferIfNeeded(ccp.dbBuffer, ccp.tBufferSizes); // log TString strFormat; @@ -289,9 +291,6 @@ m_tSubTaskStats.SetProcessedCount(stIndex); m_tSubTaskStats.SetCurrentPath(TString()); - // delete buffer - it's not needed - ccp.dbBuffer.Delete(); - // log rLog.logi(_T("Finished processing in ProcessFiles")); @@ -303,8 +302,11 @@ m_tSubTaskStats.GetSnapshot(rStats); } -int TSubTaskCopyMove::GetBufferIndex(const TFileInfoPtr& spFileInfo) +TBufferSizes::EBufferType TSubTaskCopyMove::GetBufferIndex(const TBufferSizes& rBufferSizes, const TFileInfoPtr& spFileInfo) { + if(rBufferSizes.IsOnlyDefault()) + return TBufferSizes::eBuffer_Default; + if(!spFileInfo) THROW_CORE_EXCEPTION(eErr_InvalidArgument); @@ -359,10 +361,13 @@ return TSubTaskBase::eSubResult_Continue; // copying - unsigned long ulToRead = 0; + std::list listDataBuffers; + std::list listEmptyBuffers; + + size_t stToRead = 0; unsigned long ulRead = 0; unsigned long ulWritten = 0; - int iBufferIndex = 0; + TBufferSizes::EBufferType eBufferIndex = TBufferSizes::eBuffer_Default; bool bLastPart = false; do @@ -379,36 +384,66 @@ } // recreate buffer if needed - RecreateBufferIfNeeded(pData->dbBuffer, false); + AdjustBufferIfNeeded(pData->dbBuffer, pData->tBufferSizes); // establish count of data to read - if(GetTaskPropValue(rTaskDefinition.GetConfiguration())) - iBufferIndex = TBufferSizes::eBuffer_Default; - else - iBufferIndex = GetBufferIndex(pData->spSrcFile); - // new stats - m_tSubTaskStats.SetCurrentBufferIndex(iBufferIndex); + eBufferIndex = GetBufferIndex(pData->tBufferSizes, pData->spSrcFile); + m_tSubTaskStats.SetCurrentBufferIndex(eBufferIndex); - ulToRead = bNoBuffer ? ROUNDUP(pData->dbBuffer.GetSizes().GetSizeByType((TBufferSizes::EBufferType)iBufferIndex), MAXSECTORSIZE) : pData->dbBuffer.GetSizes().GetSizeByType((TBufferSizes::EBufferType)iBufferIndex); + stToRead = RoundUp((size_t)pData->tBufferSizes.GetSizeByType(eBufferIndex), pData->dbBuffer.GetSimpleBufferSize()); + size_t stBuffersToRead = stToRead / pData->dbBuffer.GetSimpleBufferSize(); // read data from file to buffer - eResult = ReadFileFB(fileSrc, pData->dbBuffer, ulToRead, ulRead, pData->spSrcFile->GetFullFilePath(), bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - return eResult; - else if(bSkip) + for(size_t stIndex = 0; stIndex < stBuffersToRead; ++stIndex) { - // new stats - m_tSubTaskStats.IncreaseProcessedSize(pData->spSrcFile->GetLength64() - m_tProgressInfo.GetCurrentFileProcessedSize()); + // get new simple buffer + TSimpleDataBufferPtr spBuffer; + if(listEmptyBuffers.empty()) + { + spBuffer.reset(new TSimpleDataBuffer); + if(pData->dbBuffer.GetFreeBuffer(*spBuffer.get())) + listEmptyBuffers.push_back(spBuffer); + else + { + if(listDataBuffers.empty()) + THROW_CORE_EXCEPTION(eErr_InternalProblem); + break; + } + } - pData->bProcessed = false; - return TSubTaskBase::eSubResult_Continue; - } + spBuffer = listEmptyBuffers.back(); + listEmptyBuffers.pop_back(); - bLastPart = (ulToRead != ulRead); + eResult = ReadFileFB(fileSrc, *spBuffer.get(), boost::numeric_cast(pData->dbBuffer.GetSimpleBufferSize()), ulRead, pData->spSrcFile->GetFullFilePath(), bSkip); + if(eResult != TSubTaskBase::eSubResult_Continue) + return eResult; + else if(bSkip) + { + // new stats + m_tSubTaskStats.IncreaseProcessedSize(pData->spSrcFile->GetLength64() - m_tProgressInfo.GetCurrentFileProcessedSize()); - if(ulRead > 0) + pData->bProcessed = false; + return TSubTaskBase::eSubResult_Continue; + } + + spBuffer->SetDataSize(ulRead); + + if(ulRead > 0) + listDataBuffers.push_back(spBuffer); + else + listEmptyBuffers.push_back(spBuffer); + + bLastPart = (pData->dbBuffer.GetSimpleBufferSize() != ulRead); + if(bLastPart) + break; + } + + while(!listDataBuffers.empty()) { - eResult = WriteFileExFB(fileDst, pData->dbBuffer, ulRead, ulWritten, pData->pathDstFile, bSkip, bNoBuffer); + TSimpleDataBufferPtr spBuffer = listDataBuffers.front(); + listDataBuffers.pop_front(); + + eResult = WriteFileExFB(fileDst, *spBuffer.get(), boost::numeric_cast(spBuffer->GetDataSize()), ulWritten, pData->pathDstFile, bSkip, bNoBuffer); if(eResult != TSubTaskBase::eSubResult_Continue) return eResult; else if(bSkip) @@ -420,13 +455,15 @@ return TSubTaskBase::eSubResult_Continue; } + listEmptyBuffers.push_back(spBuffer); + // increase count of processed data m_tProgressInfo.IncreaseCurrentFileProcessedSize(ulWritten); // new stats m_tSubTaskStats.IncreaseProcessedSize(ulWritten); } } - while(ulRead != 0 && !bLastPart); + while(!bLastPart); pData->bProcessed = true; m_tProgressInfo.SetCurrentFileProcessedSize(0); @@ -562,42 +599,55 @@ return eResult; } -void TSubTaskCopyMove::RecreateBufferIfNeeded(TDataBuffer& rBuffer, bool bInitialCreate) +bool TSubTaskCopyMove::AdjustBufferIfNeeded(chcore::TDataBufferManager& rBuffer, TBufferSizes& rBufferSizes) { TTaskConfigTracker& rCfgTracker = GetContext().GetCfgTracker(); TTaskDefinition& rTaskDefinition = GetContext().GetTaskDefinition(); icpf::log_file& rLog = GetContext().GetLog(); - if(bInitialCreate || (rCfgTracker.IsModified() && rCfgTracker.IsModified(TOptionsSet() % eTO_DefaultBufferSize % eTO_OneDiskBufferSize % eTO_TwoDisksBufferSize % eTO_CDBufferSize % eTO_LANBufferSize % eTO_UseOnlyDefaultBuffer, true))) + if(!rBuffer.IsInitialized() || (rCfgTracker.IsModified() && rCfgTracker.IsModified(TOptionsSet() % eTO_DefaultBufferSize % eTO_OneDiskBufferSize % eTO_TwoDisksBufferSize % eTO_CDBufferSize % eTO_LANBufferSize % eTO_UseOnlyDefaultBuffer, true))) { - TBufferSizes bs; - bs.SetOnlyDefault(GetTaskPropValue(rTaskDefinition.GetConfiguration())); - bs.SetDefaultSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); - bs.SetOneDiskSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); - bs.SetTwoDisksSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); - bs.SetCDSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); - bs.SetLANSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetOnlyDefault(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetDefaultSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetOneDiskSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetTwoDisksSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetCDSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); + rBufferSizes.SetLANSize(GetTaskPropValue(rTaskDefinition.GetConfiguration())); // log - const TBufferSizes& rbs1 = rBuffer.GetSizes(); - TString strFormat; - strFormat = _T("Changing buffer size from [Def:%defsize, One:%onesize, Two:%twosize, CD:%cdsize, LAN:%lansize] to [Def:%defsize2, One:%onesize2, Two:%twosize2, CD:%cdsize2, LAN:%lansize2]"); + strFormat = _T("Changing buffer size to [Def:%defsize2, One:%onesize2, Two:%twosize2, CD:%cdsize2, LAN:%lansize2]"); - strFormat.Replace(_T("%defsize"), boost::lexical_cast(rbs1.GetDefaultSize()).c_str()); - strFormat.Replace(_T("%onesize"), boost::lexical_cast(rbs1.GetOneDiskSize()).c_str()); - strFormat.Replace(_T("%twosize"), boost::lexical_cast(rbs1.GetTwoDisksSize()).c_str()); - strFormat.Replace(_T("%cdsize"), boost::lexical_cast(rbs1.GetCDSize()).c_str()); - strFormat.Replace(_T("%lansize"), boost::lexical_cast(rbs1.GetLANSize()).c_str()); - strFormat.Replace(_T("%defsize2"), boost::lexical_cast(bs.GetDefaultSize()).c_str()); - strFormat.Replace(_T("%onesize2"), boost::lexical_cast(bs.GetOneDiskSize()).c_str()); - strFormat.Replace(_T("%twosize2"), boost::lexical_cast(bs.GetTwoDisksSize()).c_str()); - strFormat.Replace(_T("%cdsize2"), boost::lexical_cast(bs.GetCDSize()).c_str()); - strFormat.Replace(_T("%lansize2"), boost::lexical_cast(bs.GetLANSize()).c_str()); + strFormat.Replace(_T("%defsize2"), boost::lexical_cast(rBufferSizes.GetDefaultSize()).c_str()); + strFormat.Replace(_T("%onesize2"), boost::lexical_cast(rBufferSizes.GetOneDiskSize()).c_str()); + strFormat.Replace(_T("%twosize2"), boost::lexical_cast(rBufferSizes.GetTwoDisksSize()).c_str()); + strFormat.Replace(_T("%cdsize2"), boost::lexical_cast(rBufferSizes.GetCDSize()).c_str()); + strFormat.Replace(_T("%lansize2"), boost::lexical_cast(rBufferSizes.GetLANSize()).c_str()); rLog.logi(strFormat); - rBuffer.Create(bs); + + if(!rBuffer.IsInitialized()) + { + size_t stMaxSize = rBufferSizes.GetMaxSize(); + size_t stPageSize = GetTaskPropValue(rTaskDefinition.GetConfiguration()); + size_t stChunkSize = GetTaskPropValue(rTaskDefinition.GetConfiguration()); + + chcore::TDataBufferManager::CheckBufferConfig(stMaxSize, stPageSize, stChunkSize); + + rBuffer.Initialize(stMaxSize, stPageSize, stChunkSize); + } + else + { + size_t stMaxSize = rBufferSizes.GetMaxSize(); + rBuffer.CheckResizeSize(stMaxSize); + + rBuffer.ChangeMaxMemorySize(stMaxSize); + } + + return true; } + else + return false; } TSubTaskBase::ESubOperationResult TSubTaskCopyMove::OpenSourceFileFB(TLocalFilesystemFile& fileSrc, const TSmartPath& spPathToOpen, bool bNoBuffering) @@ -944,7 +994,7 @@ return TSubTaskBase::eSubResult_Continue; } -TSubTaskBase::ESubOperationResult TSubTaskCopyMove::ReadFileFB(TLocalFilesystemFile& file, TDataBuffer& rBuffer, DWORD dwToRead, DWORD& rdwBytesRead, const TSmartPath& pathFile, bool& bSkip) +TSubTaskBase::ESubOperationResult TSubTaskCopyMove::ReadFileFB(TLocalFilesystemFile& file, chcore::TSimpleDataBuffer& rBuffer, DWORD dwToRead, DWORD& rdwBytesRead, const TSmartPath& pathFile, bool& bSkip) { IFeedbackHandler* piFeedbackHandler = GetContext().GetFeedbackHandler(); icpf::log_file& rLog = GetContext().GetLog(); @@ -995,7 +1045,7 @@ return TSubTaskBase::eSubResult_Continue; } -TSubTaskBase::ESubOperationResult TSubTaskCopyMove::WriteFileFB(TLocalFilesystemFile& file, TDataBuffer& rBuffer, DWORD dwToWrite, DWORD& rdwBytesWritten, const TSmartPath& pathFile, bool& bSkip) +TSubTaskBase::ESubOperationResult TSubTaskCopyMove::WriteFileFB(TLocalFilesystemFile& file, chcore::TSimpleDataBuffer& rBuffer, DWORD dwToWrite, DWORD& rdwBytesWritten, const TSmartPath& pathFile, bool& bSkip) { IFeedbackHandler* piFeedbackHandler = GetContext().GetFeedbackHandler(); icpf::log_file& rLog = GetContext().GetLog(); @@ -1047,7 +1097,7 @@ return TSubTaskBase::eSubResult_Continue; } -TSubTaskBase::ESubOperationResult TSubTaskCopyMove::WriteFileExFB(TLocalFilesystemFile& file, TDataBuffer& rBuffer, DWORD dwToWrite, DWORD& rdwBytesWritten, const TSmartPath& pathFile, bool& bSkip, bool bNoBuffer) +TSubTaskBase::ESubOperationResult TSubTaskCopyMove::WriteFileExFB(TLocalFilesystemFile& file, chcore::TSimpleDataBuffer& rBuffer, DWORD dwToWrite, DWORD& rdwBytesWritten, const TSmartPath& pathFile, bool& bSkip, bool bNoBuffer) { TString strFormat; TSubTaskBase::ESubOperationResult eResult = TSubTaskBase::eSubResult_Continue;