Index: src/libchcore/TBufferList.h =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TBufferList.h (.../TBufferList.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TBufferList.h (.../TBufferList.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,6 +23,7 @@ #include #include "TCoreException.h" #include +#include "TSharedCountMT.h" namespace chcore { @@ -31,7 +32,8 @@ class TBufferList { public: - TBufferList() + TBufferList() : + m_spCount(std::make_shared>()) { } @@ -44,9 +46,8 @@ boost::unique_lock lock(m_mutex); m_queueBuffers.push_front(pBuffer); + m_spCount->Increase(); } - - m_notifier(); } TOverlappedDataBuffer* Pop() @@ -61,10 +62,9 @@ pBuffer = m_queueBuffers.front(); m_queueBuffers.pop_front(); + m_spCount->Decrease(); } - m_notifier(); - return pBuffer; } @@ -76,22 +76,18 @@ bRemoved = !m_queueBuffers.empty(); m_queueBuffers.clear(); + m_spCount->SetValue(0); } - - if(bRemoved) - m_notifier(); } size_t GetCount() const { - boost::shared_lock lock(m_mutex); - return m_queueBuffers.size(); + return m_spCount->GetValue(); } bool IsEmpty() const { - boost::shared_lock lock(m_mutex); - return m_queueBuffers.empty(); + return m_spCount->GetValue() == 0; } void SetExpectedBuffersCount(size_t stExpectedBuffers) // thread-unsafe by design @@ -103,21 +99,20 @@ bool AreAllBuffersAccountedFor() const { boost::shared_lock lock(m_mutex); - return m_stExpectedBuffers == m_queueBuffers.size(); + return m_stExpectedBuffers == m_spCount->GetValue(); } - boost::signals2::signal& GetNotifier() + TSharedCountMTPtr GetSharedCount() { - return m_notifier; + return m_spCount; } private: mutable boost::shared_mutex m_mutex; + TSharedCountMTPtr m_spCount; size_t m_stExpectedBuffers = 0; // count of buffers there should be in m_queueBuffers when no buffer is in use std::deque m_queueBuffers; - - boost::signals2::signal m_notifier; }; using TBufferListPtr = std::shared_ptr; Index: src/libchcore/TEventCounter.h =================================================================== diff -u -r5127141ac49a45db27f748dfb659d31f2e4983c4 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TEventCounter.h (.../TEventCounter.h) (revision 5127141ac49a45db27f748dfb659d31f2e4983c4) +++ src/libchcore/TEventCounter.h (.../TEventCounter.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -35,7 +35,7 @@ public: explicit TEventCounter(T initialValue = 0) : m_event(true, false), - m_tCounter(initialValue), + m_spCounter(std::make_shared>(initialValue)), m_tMaxUsed(initialValue) { UpdateEvent(); @@ -46,23 +46,18 @@ void Increase() { - ++m_tCounter; - if(m_tCounter > m_tMaxUsed) - m_tMaxUsed = m_tCounter; + m_spCounter->Increase(); + if(m_spCounter->GetValue() > m_tMaxUsed) + m_tMaxUsed = m_spCounter->GetValue(); UpdateEvent(); } void Decrease() { - --m_tCounter; + m_spCounter->Decrease(); UpdateEvent(); } - T GetCounter() const - { - return m_tCounter; - } - T GetMaxUsed() const { return m_tMaxUsed; @@ -73,16 +68,21 @@ return m_event.Handle(); } + TSharedCountPtr GetSharedCount() + { + return m_spCounter; + } + private: void UpdateEvent() { - bool bIsEqual = (m_tCounter == CompareValue); + bool bIsEqual = (m_spCounter->GetValue() == CompareValue); m_event.SetEvent(EventMode == EEventCounterMode::eSetIfEqual ? bIsEqual : !bIsEqual); } private: TEvent m_event; - T m_tCounter; + TSharedCountPtr m_spCounter; T m_tMaxUsed; }; } Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -26,6 +26,7 @@ namespace chcore { TOrderedBufferQueue::TOrderedBufferQueue(const TBufferListPtr& spEmptyBuffers, unsigned long long ullExpectedPosition) : + m_spBuffersCount(std::make_shared>()), m_spEmptyBuffers(spEmptyBuffers), m_eventHasBuffers(true, false), m_eventHasError(true, false), @@ -48,12 +49,14 @@ if(pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION); - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); auto pairInsert = m_setBuffers.insert(pBuffer); if (!pairInsert.second) throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION); + m_spBuffersCount->Increase(); + if(pBuffer->IsLastPart()) m_bDataSourceFinished = true; @@ -73,14 +76,16 @@ TOverlappedDataBuffer* TOrderedBufferQueue::Pop() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!InternalHasPoppableBuffer()) return nullptr; TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); m_setBuffers.erase(m_setBuffers.begin()); + m_spBuffersCount->Decrease(); + m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize(); UpdateHasBuffers(); @@ -90,7 +95,7 @@ TOverlappedDataBuffer* TOrderedBufferQueue::PopError() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_pFirstErrorBuffer) return nullptr; @@ -104,7 +109,7 @@ const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_setBuffers.empty()) return *m_setBuffers.begin(); @@ -113,19 +118,19 @@ size_t TOrderedBufferQueue::GetCount() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0); } bool TOrderedBufferQueue::IsEmpty() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return m_setBuffers.empty(); } bool TOrderedBufferQueue::HasPoppableBuffer() const { - boost::shared_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); return InternalHasPoppableBuffer(); } @@ -156,13 +161,14 @@ void TOrderedBufferQueue::ClearBuffers() { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); for(TOverlappedDataBuffer* pBuffer : m_setBuffers) { m_spEmptyBuffers->Push(pBuffer); } m_setBuffers.clear(); + m_spBuffersCount->SetValue(0); if(m_pFirstErrorBuffer) { @@ -177,16 +183,7 @@ void TOrderedBufferQueue::UpdateHasBuffers() { - if(InternalHasPoppableBuffer()) - { - m_eventHasBuffers.SetEvent(); - m_notifier(true); - } - else - { - m_eventHasBuffers.ResetEvent(); - m_notifier(false); - } + m_eventHasBuffers.SetEvent(InternalHasPoppableBuffer()); } void TOrderedBufferQueue::UpdateHasErrors() @@ -212,18 +209,18 @@ m_eventHasReadingFinished.SetEvent(bFullSequence); } - boost::signals2::signal& TOrderedBufferQueue::GetNotifier() - { - return m_notifier; - } - void TOrderedBufferQueue::UpdateProcessingRange(unsigned long long ullNewPosition) { - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_setBuffers.empty()) throw TCoreException(eErr_InvalidData, L"Cannot update processing range when processing already started", LOCATION); m_ullExpectedBufferPosition = ullNewPosition; } + + TSharedCountMTPtr TOrderedBufferQueue::GetSharedCount() + { + return m_spBuffersCount; + } } Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -24,7 +24,7 @@ #include "TOverlappedDataBuffer.h" #include "TBufferList.h" #include "TCoreException.h" -#include +#include namespace chcore { @@ -57,10 +57,10 @@ void ClearBuffers(); - boost::signals2::signal& GetNotifier(); - void UpdateProcessingRange(unsigned long long ullNewPosition); + TSharedCountMTPtr GetSharedCount(); + private: void UpdateHasBuffers(); void UpdateHasErrors(); @@ -71,10 +71,11 @@ private: using BufferCollection = std::set; BufferCollection m_setBuffers; + TSharedCountMTPtr m_spBuffersCount; TBufferListPtr m_spEmptyBuffers; - mutable boost::shared_mutex m_mutex; + mutable boost::recursive_mutex m_mutex; TOverlappedDataBuffer* m_pFirstErrorBuffer = nullptr; unsigned long long m_ullErrorPosition = NoPosition; @@ -85,8 +86,6 @@ unsigned long long m_ullExpectedBufferPosition = 0; bool m_bDataSourceFinished = false; - - boost::signals2::signal m_notifier; }; template @@ -97,7 +96,7 @@ if(!pBuffer->HasError()) throw TCoreException(eErr_InvalidArgument, L"Cannot push successful buffer to failed queue", LOCATION); - boost::unique_lock lock(m_mutex); + boost::unique_lock lock(m_mutex); if(!m_pFirstErrorBuffer && m_ullErrorPosition == NoPosition) { Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOverlappedReader.cpp (.../TOverlappedReader.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -24,19 +24,24 @@ namespace chcore { - TOverlappedReader::TOverlappedReader(const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, + TOverlappedReader::TOverlappedReader(const logger::TLogFileDataPtr& spLogFileData, + const TBufferListPtr& spEmptyBuffers, const TOverlappedProcessorRangePtr& spDataRange, - DWORD dwChunkSize) : + DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, + TSharedCountPtr spOtfBuffersCount) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_tInputBuffers(spEmptyBuffers, spDataRange ? spDataRange->GetResumePosition() : 0, dwChunkSize), - m_spFullBuffers(std::make_shared(spEmptyBuffers, spDataRange ? spDataRange->GetResumePosition() : 0)) + m_spFullBuffers(std::make_shared(spEmptyBuffers, spDataRange ? spDataRange->GetResumePosition() : 0)), + m_tInputBuffers(spEmptyBuffers, spDataRange ? spDataRange->GetResumePosition() : 0, dwChunkSize, stMaxOtfBuffers, stMaxReadAheadBuffers, spOtfBuffersCount, m_spFullBuffers->GetSharedCount()) { if(!spLogFileData) throw TCoreException(eErr_InvalidArgument, L"spLogFileData is NULL", LOCATION); if(!spEmptyBuffers) throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); if(!spDataRange) throw TCoreException(eErr_InvalidArgument, L"spDataRange is NULL", LOCATION); + if(!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); m_dataRangeChanged = spDataRange->GetNotifier().connect(boost::bind(&TOverlappedReader::UpdateProcessingRange, this, _1)); } Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TOverlappedReader.h (.../TOverlappedReader.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -33,7 +33,9 @@ explicit TOverlappedReader(const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, const TOverlappedProcessorRangePtr& spDataRange, - DWORD dwChunkSize); + DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, + TSharedCountPtr spOtfBuffersCount); TOverlappedReader(const TOverlappedReader&) = delete; ~TOverlappedReader(); @@ -68,8 +70,8 @@ logger::TLoggerPtr m_spLog; // queues - TReadBufferQueueWrapper m_tInputBuffers; TOrderedBufferQueuePtr m_spFullBuffers; // buffers with data + TReadBufferQueueWrapper m_tInputBuffers; boost::signals2::connection m_dataRangeChanged; }; Index: src/libchcore/TOverlappedReaderFB.cpp =================================================================== diff -u -r38031c379667959206abe69507d8a4e3b040dba6 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision 38031c379667959206abe69507d8a4e3b040dba6) +++ src/libchcore/TOverlappedReaderFB.cpp (.../TOverlappedReaderFB.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -34,13 +34,14 @@ const TBufferListPtr& spEmptyBuffers, const TOverlappedProcessorRangePtr& spDataRange, DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, bool bNoBuffering, bool bProtectReadOnlyFiles) : - m_spReader(std::make_shared(spLogFileData, spEmptyBuffers, spDataRange, dwChunkSize)), + m_counterOnTheFly(), + m_spReader(std::make_shared(spLogFileData, spEmptyBuffers, spDataRange, dwChunkSize, stMaxOtfBuffers, stMaxReadAheadBuffers, m_counterOnTheFly.GetSharedCount())), m_eventReadingFinished(true, false), m_eventProcessingFinished(true, false), m_eventLocalKill(true, false), - m_counterOnTheFly(), m_spFilesystem(spFilesystem), m_spSrcFileInfo(spSrcFileInfo), m_spSrcFile(), @@ -56,6 +57,12 @@ throw TCoreException(eErr_InvalidArgument, L"spStats is NULL", LOCATION); if(!spSrcFileInfo) throw TCoreException(eErr_InvalidArgument, L"spSrcFileInfo is NULL", LOCATION); + if(!spLogFileData) + throw TCoreException(eErr_InvalidArgument, L"spLogFileData is NULL", LOCATION); + if(!spEmptyBuffers) + throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); + if(!spDataRange) + throw TCoreException(eErr_InvalidArgument, L"spDataRange is NULL", LOCATION); IFilesystemFilePtr fileSrc = m_spFilesystem->CreateFileObject(IFilesystemFile::eMode_Read, m_spSrcFileInfo->GetFullFilePath(), bNoBuffering, bProtectReadOnlyFiles); m_spSrcFile = std::make_shared(fileSrc, spFeedbackHandler, spLogFileData, rThreadController, spFilesystem); Index: src/libchcore/TOverlappedReaderFB.h =================================================================== diff -u -rbaad6054459abaaf69cbfd1ebad8783085160d99 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision baad6054459abaaf69cbfd1ebad8783085160d99) +++ src/libchcore/TOverlappedReaderFB.h (.../TOverlappedReaderFB.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -40,6 +40,7 @@ const TBufferListPtr& spEmptyBuffers, const TOverlappedProcessorRangePtr& spDataRange, DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, bool bNoBuffering, bool bProtectReadOnlyFiles); TOverlappedReaderFB(const TOverlappedReaderFB& rSrc) = delete; @@ -69,13 +70,13 @@ void ClearQueues(); private: + TEventCounter m_counterOnTheFly; + TOverlappedReaderPtr m_spReader; TEvent m_eventReadingFinished; TEvent m_eventProcessingFinished; TEvent m_eventLocalKill; - TEventCounter m_counterOnTheFly; - IFilesystemPtr m_spFilesystem; TFileInfoPtr m_spSrcFileInfo; TFilesystemFileFeedbackWrapperPtr m_spSrcFile; Index: src/libchcore/TOverlappedReaderWriterFB.cpp =================================================================== diff -u -rbaad6054459abaaf69cbfd1ebad8783085160d99 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision baad6054459abaaf69cbfd1ebad8783085160d99) +++ src/libchcore/TOverlappedReaderWriterFB.cpp (.../TOverlappedReaderWriterFB.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -47,11 +47,9 @@ m_rThreadController(rThreadController), m_spRange(std::make_shared(ullResumePosition)), m_spMemoryPool(spMemoryPool), - m_spReader(std::make_shared(spFilesystem, spFeedbackHandler, rThreadController, spStats, spSrcFileInfo, spLogFileData, spMemoryPool->GetBufferList(), m_spRange, dwChunkSize, bNoBuffering, bProtectReadOnlyFiles)), - m_spWriter(std::make_shared(spFilesystem, spFeedbackHandler, rThreadController, spStats, spSrcFileInfo, pathDst, spLogFileData, m_spReader->GetFinishedQueue(), m_spRange, spMemoryPool->GetBufferList(), bOnlyCreate, bNoBuffering, bProtectReadOnlyFiles, bUpdateFileAttributesAndTimes)) + m_spReader(std::make_shared(spFilesystem, spFeedbackHandler, rThreadController, spStats, spSrcFileInfo, spLogFileData, spMemoryPool ? spMemoryPool->GetBufferList() : TBufferListPtr(), m_spRange, dwChunkSize, MaxOtfBuffers, MaxReadAheadBuffers, bNoBuffering, bProtectReadOnlyFiles)), + m_spWriter(std::make_shared(spFilesystem, spFeedbackHandler, rThreadController, spStats, spSrcFileInfo, pathDst, spLogFileData, m_spReader->GetFinishedQueue(), m_spRange, spMemoryPool ? spMemoryPool->GetBufferList() : TBufferListPtr(), MaxOtfBuffers, bOnlyCreate, bNoBuffering, bProtectReadOnlyFiles, bUpdateFileAttributesAndTimes)) { - if(!spMemoryPool) - throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); } TOverlappedReaderWriterFB::~TOverlappedReaderWriterFB() Index: src/libchcore/TOverlappedReaderWriterFB.h =================================================================== diff -u -rbaad6054459abaaf69cbfd1ebad8783085160d99 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision baad6054459abaaf69cbfd1ebad8783085160d99) +++ src/libchcore/TOverlappedReaderWriterFB.h (.../TOverlappedReaderWriterFB.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -32,6 +32,10 @@ class TOverlappedReaderWriterFB { public: + static const size_t MaxOtfBuffers = 2; + static const size_t MaxReadAheadBuffers = 50; + + public: explicit TOverlappedReaderWriterFB(const IFilesystemPtr& spFilesystem, const IFeedbackHandlerPtr& spFeedbackHandler, TWorkerThreadController& rThreadController, Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TOverlappedWriter.cpp (.../TOverlappedWriter.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -25,10 +25,10 @@ namespace chcore { TOverlappedWriter::TOverlappedWriter(const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, - const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers) : + const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers, size_t stMaxOtfBuffers, TSharedCountPtr spOtfBuffersCount) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), m_spEmptyBuffers(spEmptyBuffers), - m_tBuffersToWrite(spBuffersToWrite), + m_tBuffersToWrite(spBuffersToWrite, stMaxOtfBuffers, spOtfBuffersCount), m_tFinishedBuffers(spEmptyBuffers, spRange != nullptr ? spRange->GetResumePosition() : 0) { if(!spLogFileData) @@ -39,12 +39,15 @@ throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); if(!spRange) throw TCoreException(eErr_InvalidArgument, L"spRange is NULL", LOCATION); + if(!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); m_dataRangeChanged = spRange->GetNotifier().connect(boost::bind(&TOverlappedWriter::UpdateProcessingRange, this, _1)); } TOverlappedWriter::~TOverlappedWriter() { + m_dataRangeChanged.disconnect(); } void TOverlappedWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u -rb051cbac8dac8c448507aa7c64753af9cf793af5 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision b051cbac8dac8c448507aa7c64753af9cf793af5) +++ src/libchcore/TOverlappedWriter.h (.../TOverlappedWriter.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -32,7 +32,7 @@ { public: explicit TOverlappedWriter(const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, - const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers); + const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers, size_t stMaxOtfBuffers, TSharedCountPtr spOtfBuffersCount); TOverlappedWriter(const TOverlappedWriter&) = delete; ~TOverlappedWriter(); Index: src/libchcore/TOverlappedWriterFB.cpp =================================================================== diff -u -rbaad6054459abaaf69cbfd1ebad8783085160d99 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision baad6054459abaaf69cbfd1ebad8783085160d99) +++ src/libchcore/TOverlappedWriterFB.cpp (.../TOverlappedWriterFB.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -38,11 +38,13 @@ const TOrderedBufferQueuePtr& spBuffersToWrite, const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers, + size_t stMaxOtfBuffers, bool bOnlyCreate, bool bNoBuffering, bool bProtectReadOnlyFiles, bool bUpdateFileAttributesAndTimes) : - m_spWriter(std::make_shared(spLogFileData, spBuffersToWrite, spRange, spEmptyBuffers)), + m_counterOnTheFly(), + m_spWriter(std::make_shared(spLogFileData, spBuffersToWrite, spRange, spEmptyBuffers, stMaxOtfBuffers, m_counterOnTheFly.GetSharedCount())), m_spStats(spStats), m_spSrcFileInfo(spSrcFileInfo), m_spDataRange(spRange), @@ -51,7 +53,6 @@ m_eventProcessingFinished(true, false), m_eventWritingFinished(true, false), m_eventLocalKill(true, false), - m_counterOnTheFly(), m_rThreadController(rThreadController), m_spLog(logger::MakeLogger(spLogFileData, L"File-Writer")) { @@ -67,6 +68,10 @@ throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); if(!spRange) throw TCoreException(eErr_InvalidArgument, L"spRange is NULL", LOCATION); + if(!spLogFileData) + throw TCoreException(eErr_InvalidArgument, L"spLogFileData is NULL", LOCATION); + if(!spBuffersToWrite) + throw TCoreException(eErr_InvalidArgument, L"spBuffersToWrite is NULL", LOCATION); IFilesystemFilePtr fileDst = spFilesystem->CreateFileObject(IFilesystemFile::eMode_Write, pathDst, bNoBuffering, bProtectReadOnlyFiles); m_spDstFile = std::make_shared(fileDst, spFeedbackHandler, spLogFileData, rThreadController, spFilesystem); Index: src/libchcore/TOverlappedWriterFB.h =================================================================== diff -u -rbaad6054459abaaf69cbfd1ebad8783085160d99 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision baad6054459abaaf69cbfd1ebad8783085160d99) +++ src/libchcore/TOverlappedWriterFB.h (.../TOverlappedWriterFB.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -40,6 +40,7 @@ const TOrderedBufferQueuePtr& spBuffersToWrite, const TOverlappedProcessorRangePtr& spRange, const TBufferListPtr& spEmptyBuffers, + size_t stMaxOtfBuffers, bool bOnlyCreate, bool bNoBuffering, bool bProtectReadOnlyFiles, @@ -55,7 +56,6 @@ void StartThreaded(); TSubTaskBase::ESubOperationResult StopThreaded(); - bool WereAttributesAndTimesSet() const; HANDLE GetEventWritingFinishedHandle() const; HANDLE GetEventProcessingFinishedHandle() const; @@ -75,6 +75,8 @@ void UpdateCurrentItemStatsFromFileSize(bool bFileWritingFinished); private: + TEventCounter m_counterOnTheFly; + TOverlappedWriterPtr m_spWriter; TFilesystemFileFeedbackWrapperPtr m_spDstFile; TSubTaskStatsInfoPtr m_spStats; @@ -87,8 +89,6 @@ TEvent m_eventWritingFinished; TEvent m_eventLocalKill; - TEventCounter m_counterOnTheFly; - TWorkerThreadController& m_rThreadController; TSubTaskBase::ESubOperationResult m_eThreadResult = TSubTaskBase::eSubResult_Continue; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,25 +23,46 @@ namespace chcore { - TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : + TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, + unsigned long long ullNextReadPosition, DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, + TSharedCountPtr spOtfBuffersCount, TSharedCountMTPtr spCurrentReadAheadBuffers) : m_spEmptyBuffers(spEmptyBuffers), - m_eventHasBuffers(true, false), m_ullNextReadPosition(ullNextReadPosition), - m_dwChunkSize(dwChunkSize) + m_dwChunkSize(dwChunkSize), + m_stMaxOtfBuffers(stMaxOtfBuffers), + m_stMaxReadAheadBuffers(stMaxReadAheadBuffers), + m_spOtfBuffersCount(spOtfBuffersCount), + m_spCurrentReadAheadBuffers(spCurrentReadAheadBuffers), + m_eventHasBuffers(true, false) { if(!spEmptyBuffers) throw TCoreException(eErr_InvalidArgument, L"spEmptyBuffers is NULL", LOCATION); + if(!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); + if(!spCurrentReadAheadBuffers) + throw TCoreException(eErr_InvalidArgument, L"spCurrentReadAheadBuffers is NULL", LOCATION); if(dwChunkSize == 0) throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); + if(stMaxOtfBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxOtfBuffers cannot be 0", LOCATION); + if(stMaxReadAheadBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxReadAheadBuffers cannot be 0", LOCATION); - m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_emptyBuffersQueueConnector = m_spEmptyBuffers->GetSharedCount()->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_currentReadAheadConnector = m_spCurrentReadAheadBuffers->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_retryBuffersConnector = m_tRetryBuffers.GetSharedCount()->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); + m_otfBuffersConnector = m_spOtfBuffersCount->GetNotifier().connect(boost::bind(&TReadBufferQueueWrapper::UpdateHasBuffers, this)); UpdateHasBuffers(); } TReadBufferQueueWrapper::~TReadBufferQueueWrapper() { m_emptyBuffersQueueConnector.disconnect(); + m_currentReadAheadConnector.disconnect(); + m_retryBuffersConnector.disconnect(); + m_otfBuffersConnector.disconnect(); } void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) @@ -63,8 +84,6 @@ } else m_tRetryBuffers.Push(pBuffer); - - UpdateHasBuffers(); } void TReadBufferQueueWrapper::PushEmpty(TOverlappedDataBuffer* pBuffer) @@ -73,8 +92,6 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); m_spEmptyBuffers->Push(pBuffer); - - //UpdateHasBuffers(); // already updated using notifier } TOverlappedDataBuffer* TReadBufferQueueWrapper::Pop() @@ -94,23 +111,21 @@ } } - if(pBuffer) - UpdateHasBuffers(); - return pBuffer; } bool TReadBufferQueueWrapper::IsBufferReady() const { - if(IsDataSourceFinished()) - return !m_tRetryBuffers.IsEmpty(); + if(m_spOtfBuffersCount->GetValue() >= m_stMaxOtfBuffers) + return false; - return !m_tRetryBuffers.IsEmpty() || !m_spEmptyBuffers->IsEmpty(); - } + if(m_spCurrentReadAheadBuffers->GetValue() >= m_stMaxReadAheadBuffers) + return false; - size_t TReadBufferQueueWrapper::GetCount() const - { - return m_tRetryBuffers.GetCount(); + if(!m_tRetryBuffers.IsEmpty()) + return true; + + return !IsDataSourceFinished() && !m_spEmptyBuffers->IsEmpty(); } void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -33,16 +33,17 @@ static const unsigned long long NoPosition = 0xffffffffffffffff; public: - TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, unsigned long long ullNextReadPosition, DWORD dwChunkSize); + TReadBufferQueueWrapper(const TBufferListPtr& spEmptyBuffers, + unsigned long long ullNextReadPosition, DWORD dwChunkSize, + size_t stMaxOtfBuffers, size_t stMaxReadAheadBuffers, + TSharedCountPtr spOtfBuffersCount, TSharedCountMTPtr spCurrentReadAheadBuffers); ~TReadBufferQueueWrapper(); void Push(TOverlappedDataBuffer* pBuffer); void PushEmpty(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* Pop(); - size_t GetCount() const; - void SetDataSourceFinished(TOverlappedDataBuffer* pBuffer); bool IsDataSourceFinished() const; @@ -56,17 +57,34 @@ void UpdateHasBuffers(); private: + // external buffers TBufferListPtr m_spEmptyBuffers; // external queue of buffers to use boost::signals2::connection m_emptyBuffersQueueConnector; + // retry buffers TSimpleOrderedBufferQueue m_tRetryBuffers; // internal queue of claimed buffers + boost::signals2::connection m_retryBuffersConnector; - TEvent m_eventHasBuffers; - + // input unsigned long long m_ullNextReadPosition = 0; // next position for read buffers DWORD m_dwChunkSize = 0; + // config + size_t m_stMaxOtfBuffers = 0; + size_t m_stMaxReadAheadBuffers = 0; + + // internal state unsigned long long m_ullDataSourceFinishedPos = NoPosition; + + // external state + TSharedCountPtr m_spOtfBuffersCount; + boost::signals2::connection m_otfBuffersConnector; + + TSharedCountMTPtr m_spCurrentReadAheadBuffers; + boost::signals2::connection m_currentReadAheadConnector; + + // events + TEvent m_eventHasBuffers; }; } Index: src/libchcore/TSharedCount.h =================================================================== diff -u --- src/libchcore/TSharedCount.h (revision 0) +++ src/libchcore/TSharedCount.h (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -0,0 +1,99 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TSHAREDCOUNT_H__ +#define __TSHAREDCOUNT_H__ + +#include + +namespace chcore +{ + template + class TSharedCount + { + public: + explicit TSharedCount(T initialValue = 0) : m_tCounter(initialValue) + { + } + + TSharedCount(const TSharedCount& rSrc) = delete; + + // assignment operator + TSharedCount& operator=(const TSharedCount& rSrc) = delete; + + // conversion from/to T + operator T() const + { + return GetValue(); + } + + TSharedCount& operator=(T newValue) + { + SetValue(newValue); + return *this; + } + + // get/set value + T GetValue() const + { + return m_tCounter; + } + + void SetValue(T newValue) + { + m_tCounter = newValue; + m_notifier(); + } + + void Increase() + { + Increase(1); + } + + void Decrease() + { + Decrease(1); + } + + void Increase(T addValue) + { + m_tCounter += addValue; + m_notifier(); + } + + void Decrease(T addValue) + { + m_tCounter -= addValue; + m_notifier(); + } + + boost::signals2::signal& GetNotifier() + { + return m_notifier; + } + + private: + T m_tCounter = 0; + boost::signals2::signal m_notifier; + }; + + template + using TSharedCountPtr = std::shared_ptr>; +} + +#endif Index: src/libchcore/TSharedCountMT.h =================================================================== diff -u --- src/libchcore/TSharedCountMT.h (revision 0) +++ src/libchcore/TSharedCountMT.h (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -0,0 +1,113 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TSHAREDCOUNTMT_H__ +#define __TSHAREDCOUNTMT_H__ + +#include +#include +#include + +namespace chcore +{ + template + class TSharedCountMT + { + public: + explicit TSharedCountMT(T initialValue = 0) : m_tCounter(initialValue) + { + } + + TSharedCountMT(const TSharedCountMT& rSrc) = delete; + + // assignment operator + TSharedCountMT& operator=(const TSharedCountMT& rSrc) = delete; + + // conversion from/to T + operator T() const + { + return GetValue(); + } + + TSharedCountMT& operator=(T newValue) + { + SetValue(newValue); + return *this; + } + + // get/set value + T GetValue() const + { + boost::unique_lock lock(m_lock); + return m_tCounter; + } + + void SetValue(T newValue) + { + { + boost::unique_lock lock(m_lock); + m_tCounter = newValue; + } + m_notifier(); + } + + // increment/decrement + void Increase() + { + Increase(1); + } + + void Decrease() + { + Decrease(1); + } + + void Increase(T addValue) + { + { + boost::unique_lock lock(m_lock); + m_tCounter += addValue; + } + m_notifier(); + } + + void Decrease(T addValue) + { + { + boost::unique_lock lock(m_lock); + m_tCounter -= addValue; + } + m_notifier(); + } + + boost::signals2::signal& GetNotifier() + { + return m_notifier; + } + + private: + T m_tCounter = 0; + boost::signals2::signal m_notifier; + mutable boost::mutex m_lock; + }; + + template + using TSharedCountMTPtr = std::shared_ptr>; +} + +#endif Index: src/libchcore/TSimpleOrderedBufferQueue.h =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TSimpleOrderedBufferQueue.h (.../TSimpleOrderedBufferQueue.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,18 +23,24 @@ #include "TCoreException.h" #include "TOverlappedDataBuffer.h" #include "TBufferList.h" +#include "TSharedCount.h" namespace chcore { class TSimpleOrderedBufferQueue : private std::set { public: + TSimpleOrderedBufferQueue() : m_spCount(std::make_shared>()) + { + } + void Push(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) throw TCoreException(eErr_InvalidArgument, L"pBuffer is NULL", LOCATION); if(!insert(pBuffer).second) throw TCoreException(eErr_InvalidArgument, L"Buffer already exists in the collection", LOCATION); + m_spCount->Increase(); } TOverlappedDataBuffer* Pop() @@ -44,6 +50,8 @@ TOverlappedDataBuffer* pBuffer = *begin(); erase(begin()); + m_spCount->Decrease(); + return pBuffer; } @@ -64,6 +72,8 @@ spBuffers->Push(pBuffer); } clear(); + + m_spCount->SetValue(0); } bool IsEmpty() const @@ -73,8 +83,16 @@ size_t GetCount() const { - return size(); + return m_spCount->GetValue(); } + + TSharedCountPtr GetSharedCount() + { + return m_spCount; + } + + private: + TSharedCountPtr m_spCount; }; } Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (.../TWriteBufferQueueWrapper.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -23,21 +23,29 @@ namespace chcore { - TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue) : + TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue, size_t stMaxOtfBuffers, TSharedCountPtr spOtfBuffersCount) : m_spDataQueue(spQueue), + m_stMaxOtfBuffers(stMaxOtfBuffers), + m_spOtfBuffersCount(spOtfBuffersCount), m_eventHasBuffers(true, false) { if (!spQueue) throw TCoreException(eErr_InvalidArgument, L"spQueue is NULL", LOCATION); + if (!spOtfBuffersCount) + throw TCoreException(eErr_InvalidArgument, L"spOtfBuffersCount is NULL", LOCATION); + if (stMaxOtfBuffers == 0) + throw TCoreException(eErr_InvalidArgument, L"stMaxOtfBuffers cannot be 0", LOCATION); UpdateHasBuffers(); - m_dataQueueConnector = m_spDataQueue->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this, _1)); + m_dataQueueConnector = m_spDataQueue->GetSharedCount()->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); + m_retryBuffersConnector = m_tRetryBuffers.GetSharedCount()->GetNotifier().connect(boost::bind(&TWriteBufferQueueWrapper::UpdateHasBuffers, this)); } TWriteBufferQueueWrapper::~TWriteBufferQueueWrapper() { m_dataQueueConnector.disconnect(); + m_retryBuffersConnector.disconnect(); } void TWriteBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) @@ -63,18 +71,11 @@ TOverlappedDataBuffer* TWriteBufferQueueWrapper::InternalPop() { - const TOverlappedDataBuffer* pClaimedQueueBuffer = m_tRetryBuffers.Peek(); - if (!pClaimedQueueBuffer) - return m_spDataQueue->Pop(); + TOverlappedDataBuffer* pBuffer = m_tRetryBuffers.Pop(); + if(!pBuffer) + pBuffer = m_spDataQueue->Pop(); - const TOverlappedDataBuffer* pDataQueueBuffer = m_spDataQueue->Peek(); - if (!pDataQueueBuffer) - return m_tRetryBuffers.Pop(); - - if (pClaimedQueueBuffer->GetFilePosition() < pDataQueueBuffer->GetFilePosition()) - return m_tRetryBuffers.Pop(); - else - return m_spDataQueue->Pop(); + return pBuffer; } size_t TWriteBufferQueueWrapper::GetCount() const @@ -93,14 +94,19 @@ m_tRetryBuffers.ClearBuffers(spEmptyBuffers); } - void TWriteBufferQueueWrapper::UpdateHasBuffers(bool bDataQueueHasPoppableBuffer) + bool TWriteBufferQueueWrapper::HasBuffersToProcess() const { - bool bIsReady = bDataQueueHasPoppableBuffer || !m_tRetryBuffers.IsEmpty(); - m_eventHasBuffers.SetEvent(bIsReady); + if(m_spOtfBuffersCount->GetValue() >= m_stMaxOtfBuffers) + return false; + + if(!m_tRetryBuffers.IsEmpty()) + return true; + + return m_spDataQueue->HasPoppableBuffer(); } void TWriteBufferQueueWrapper::UpdateHasBuffers() { - UpdateHasBuffers(m_spDataQueue->HasPoppableBuffer()); + m_eventHasBuffers.SetEvent(HasBuffersToProcess()); } } Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/TWriteBufferQueueWrapper.h (.../TWriteBufferQueueWrapper.h) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -31,7 +31,7 @@ class TWriteBufferQueueWrapper { public: - explicit TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue); + TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue, size_t stMaxOtfBuffers, TSharedCountPtr spOtfBuffersCount); ~TWriteBufferQueueWrapper(); void Push(TOverlappedDataBuffer* pBuffer); @@ -43,17 +43,27 @@ void ClearBuffers(const TBufferListPtr& spEmptyBuffers); private: - void UpdateHasBuffers(bool bDataQueueHasPoppableBuffer); + bool HasBuffersToProcess() const; void UpdateHasBuffers(); TOverlappedDataBuffer* InternalPop(); private: + // input buffers TOrderedBufferQueuePtr m_spDataQueue; // external queue of buffers to use boost::signals2::connection m_dataQueueConnector; + // retry buffers TSimpleOrderedBufferQueue m_tRetryBuffers; // internal queue of claimed buffers + boost::signals2::connection m_retryBuffersConnector; + // config + size_t m_stMaxOtfBuffers = 0; + + // external state + TSharedCountPtr m_spOtfBuffersCount; + + // event TEvent m_eventHasBuffers; }; } Index: src/libchcore/Tests/TOverlappedReaderTests.cpp =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/Tests/TOverlappedReaderTests.cpp (.../TOverlappedReaderTests.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -16,7 +16,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); EXPECT_EQ(nullptr, tReader.GetEmptyBuffer()); EXPECT_EQ(nullptr, tReader.GetFailedReadBuffer()); @@ -36,7 +37,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); EXPECT_TIMEOUT(tReader.GetEventReadFailedHandle()); @@ -57,7 +59,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); EXPECT_SIGNALED(tReader.GetEventReadPossibleHandle()); @@ -79,7 +82,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); TOverlappedDataBuffer* pBuffers[ 3 ] = { tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer() }; @@ -101,7 +105,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); EXPECT_THROW(tReader.AddEmptyBuffer(nullptr), TCoreException); EXPECT_THROW(tReader.AddRetryBuffer(nullptr), TCoreException); @@ -114,7 +119,9 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); + TOverlappedDataBuffer* pBuffer = tReader.GetEmptyBuffer(); tReader.AddFinishedReadBuffer(pBuffer); @@ -127,7 +134,9 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); + TOverlappedDataBuffer* pBuffers[ 3 ] = { tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer() }; tReader.AddFinishedReadBuffer(pBuffers[ 1 ]); @@ -146,7 +155,9 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); + TOverlappedDataBuffer* pBuffers[ 3 ] = { tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer(), tReader.GetEmptyBuffer() }; pBuffers[ 1 ]->SetLastPart(true); @@ -164,7 +175,8 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); EXPECT_THROW(tReader.AddFinishedReadBuffer(nullptr), TCoreException); } @@ -175,7 +187,9 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedReader tReader(spLogData, spBuffers->GetBufferList(), spRange, 4096, 1, 1, spOtfBufferCount); + TOverlappedDataBuffer* pBuffer = tReader.GetEmptyBuffer(); pBuffer->InitForRead(0, 1280); Index: src/libchcore/Tests/TOverlappedWriterTests.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/Tests/TOverlappedWriterTests.cpp (.../TOverlappedWriterTests.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/Tests/TOverlappedWriterTests.cpp (.../TOverlappedWriterTests.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -13,7 +13,8 @@ TBufferListPtr spEmptyBuffers(std::make_shared()); TOrderedBufferQueuePtr spQueue(std::make_shared(spEmptyBuffers, 0)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedWriter tWriter(spLogData, spQueue, spRange, spEmptyBuffers); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedWriter tWriter(spLogData, spQueue, spRange, spEmptyBuffers, 1, spOtfBufferCount); EXPECT_EQ(nullptr, tWriter.GetWriteBuffer()); EXPECT_EQ(nullptr, tWriter.GetFailedWriteBuffer()); @@ -35,7 +36,8 @@ TBufferListPtr spEmptyBuffers(std::make_shared()); TOrderedBufferQueuePtr spQueue(std::make_shared(spEmptyBuffers, 0)); TOverlappedProcessorRangePtr spRange(std::make_shared(0)); - TOverlappedWriter tWriter(spLogData, spQueue, spRange, spEmptyBuffers); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TOverlappedWriter tWriter(spLogData, spQueue, spRange, spEmptyBuffers, 1, spOtfBufferCount); EXPECT_TIMEOUT(tWriter.GetEventWritePossibleHandle()); EXPECT_TIMEOUT(tWriter.GetEventWriteFailedHandle()); Index: src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -12,15 +12,17 @@ { TBufferListPtr spList(std::make_shared()); - EXPECT_THROW(TReadBufferQueueWrapper(spList, 0, 0), TCoreException); + EXPECT_THROW(TReadBufferQueueWrapper(spList, 0, 0, 0, 0, nullptr, nullptr), TCoreException); } TEST(TReadBufferQueueWrapperTests, Constructor) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); - EXPECT_EQ(0, queue.GetCount()); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); EXPECT_EQ(false, queue.IsDataSourceFinished()); } @@ -29,8 +31,11 @@ { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + EXPECT_EQ(nullptr, queue.Pop()); } @@ -46,8 +51,11 @@ spList->Push(&buffer3); spList->Push(&buffer4); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + EXPECT_EQ(&buffer4, queue.Pop()); EXPECT_EQ(0, buffer4.GetFilePosition()); EXPECT_EQ(1024, buffer4.GetRequestedDataSize()); @@ -70,8 +78,11 @@ TEST(TReadBufferQueueWrapperTests, PushPop_ClaimedBuffers) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + TOverlappedDataBuffer buffer1(1024, nullptr); TOverlappedDataBuffer buffer2(1024, nullptr); TOverlappedDataBuffer buffer3(1024, nullptr); @@ -113,8 +124,11 @@ spList->Push(&buffer1); spList->Push(&buffer2); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + TOverlappedDataBuffer buffer3(1024, nullptr); TOverlappedDataBuffer buffer4(1024, nullptr); queue.PushEmpty(&buffer3); @@ -156,7 +170,10 @@ spList->Push(&buffer1); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); queue.SetDataSourceFinished(&buffer1); EXPECT_EQ(true, queue.IsDataSourceFinished()); @@ -170,16 +187,22 @@ TOverlappedDataBuffer buffer1(1024, nullptr); spList->Push(&buffer1); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + EXPECT_THROW(queue.SetDataSourceFinished(&buffer1), TCoreException); } TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_CheckBufferMaintenance) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetFilePosition(0); buffer1.SetLastPart(true); @@ -191,7 +214,6 @@ queue.SetDataSourceFinished(&buffer1); - EXPECT_EQ(2, queue.GetCount()); EXPECT_EQ(&buffer1, queue.Pop()); EXPECT_EQ(0, spList->GetCount()); @@ -201,22 +223,23 @@ TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_ValidPushAfterFinished) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetLastPart(true); queue.Push(&buffer1); queue.SetDataSourceFinished(&buffer1); - EXPECT_EQ(1, queue.GetCount()); EXPECT_EQ(0, spList->GetCount()); TOverlappedDataBuffer buffer2(1024, nullptr); buffer2.SetLastPart(true); queue.Push(&buffer2); - EXPECT_EQ(1, queue.GetCount()); EXPECT_EQ(&buffer1, queue.Pop()); EXPECT_EQ(1, spList->GetCount()); EXPECT_EQ(&buffer2, spList->Pop()); @@ -225,16 +248,18 @@ TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_InvalidPushAfterFinished) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queue(spList, 0, 1024); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TSharedCountMTPtr spReadAheadBufferCount(std::make_shared>()); + TReadBufferQueueWrapper queue(spList, 0, 1024, 1, 1, spOtfBufferCount, spReadAheadBufferCount); + TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetLastPart(true); buffer1.SetFilePosition(0); queue.Push(&buffer1); queue.SetDataSourceFinished(&buffer1); - EXPECT_EQ(1, queue.GetCount()); EXPECT_EQ(&buffer1, queue.Pop()); EXPECT_EQ(0, spList->GetCount()); Index: src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp =================================================================== diff -u -r518d1a3484cde91ff17c071211df5e40cb3487e3 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp (.../TWriteBufferQueueWrapperTests.cpp) (revision 518d1a3484cde91ff17c071211df5e40cb3487e3) +++ src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp (.../TWriteBufferQueueWrapperTests.cpp) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -10,15 +10,16 @@ TEST(TWriteBufferQueueWrapperTests, ConstructorWithNullParam) { - EXPECT_THROW(TWriteBufferQueueWrapper(nullptr), TCoreException); + EXPECT_THROW(TWriteBufferQueueWrapper(nullptr, 0, nullptr), TCoreException); } TEST(TWriteBufferQueueWrapperTests, Constructor) { TBufferListPtr spEmptyBuffers(std::make_shared()); TOrderedBufferQueuePtr spQueue(std::make_shared(spEmptyBuffers, 0)); - TWriteBufferQueueWrapper queue(spQueue); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TWriteBufferQueueWrapper queue(spQueue, 1, spOtfBufferCount); EXPECT_EQ(0, queue.GetCount()); EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); } @@ -28,7 +29,8 @@ TBufferListPtr spEmptyBuffers(std::make_shared()); TOrderedBufferQueuePtr spQueue(std::make_shared(spEmptyBuffers, 0)); - TWriteBufferQueueWrapper queue(spQueue); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TWriteBufferQueueWrapper queue(spQueue, 1, spOtfBufferCount); EXPECT_EQ(nullptr, queue.Pop()); } @@ -56,7 +58,8 @@ spQueue->Push(&buffer3); spQueue->Push(&buffer4); - TWriteBufferQueueWrapper queue(spQueue); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TWriteBufferQueueWrapper queue(spQueue, 1, spOtfBufferCount); EXPECT_EQ(&buffer1, queue.Pop()); EXPECT_EQ(0, buffer1.GetFilePosition()); @@ -82,7 +85,8 @@ TBufferListPtr spEmptyBuffers(std::make_shared()); TOrderedBufferQueuePtr spQueue(std::make_shared(spEmptyBuffers, 0)); - TWriteBufferQueueWrapper queue(spQueue); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TWriteBufferQueueWrapper queue(spQueue, 1, spOtfBufferCount); TOverlappedDataBuffer buffer1(1024, nullptr); buffer1.SetFilePosition(0); @@ -141,7 +145,8 @@ spQueue->Push(&buffer1); spQueue->Push(&buffer2); - TWriteBufferQueueWrapper queue(spQueue); + TSharedCountPtr spOtfBufferCount(std::make_shared>()); + TWriteBufferQueueWrapper queue(spQueue, 1, spOtfBufferCount); TOverlappedDataBuffer buffer3(1024, nullptr); buffer3.SetFilePosition(2000); @@ -153,16 +158,6 @@ queue.Push(&buffer4); EXPECT_SIGNALED(queue.GetHasBuffersEvent()); - EXPECT_EQ(&buffer1, queue.Pop()); - EXPECT_EQ(0, buffer1.GetFilePosition()); - EXPECT_EQ(1000, buffer1.GetRequestedDataSize()); - - EXPECT_SIGNALED(queue.GetHasBuffersEvent()); - EXPECT_EQ(&buffer2, queue.Pop()); - EXPECT_EQ(1000, buffer2.GetFilePosition()); - EXPECT_EQ(1000, buffer2.GetRequestedDataSize()); - - EXPECT_SIGNALED(queue.GetHasBuffersEvent()); EXPECT_EQ(&buffer3, queue.Pop()); EXPECT_EQ(2000, buffer3.GetFilePosition()); EXPECT_EQ(1000, buffer3.GetRequestedDataSize()); @@ -172,6 +167,16 @@ EXPECT_EQ(3000, buffer4.GetFilePosition()); EXPECT_EQ(1000, buffer4.GetRequestedDataSize()); + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(0, buffer1.GetFilePosition()); + EXPECT_EQ(1000, buffer1.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer2, queue.Pop()); + EXPECT_EQ(1000, buffer2.GetFilePosition()); + EXPECT_EQ(1000, buffer2.GetRequestedDataSize()); + EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); EXPECT_EQ(nullptr, queue.Pop()); } Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -r71bfd29bfef00738456ab96f336feeb8a02c4df6 -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 71bfd29bfef00738456ab96f336feeb8a02c4df6) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -523,7 +523,10 @@ + + + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -rc719644bb4360fcf7ccf6f1139bcae852bd6effd -rd99302fce795dbb5139659016a5da7948f141fb4 --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision c719644bb4360fcf7ccf6f1139bcae852bd6effd) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision d99302fce795dbb5139659016a5da7948f141fb4) @@ -564,6 +564,15 @@ Source Files\Tools\Threading + + Source Files\Tools\Threading + + + Source Files\Tools\Threading + + + Source Files\Tools\Threading +