Index: src/libchcore/ErrorCodes.h =================================================================== diff -u -r6103ac74583f2136b821dc67515ed8469abd8155 -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/ErrorCodes.h (.../ErrorCodes.h) (revision 6103ac74583f2136b821dc67515ed8469abd8155) +++ src/libchcore/ErrorCodes.h (.../ErrorCodes.h) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -78,6 +78,7 @@ eErr_CannotDeleteFile = 3002, eErr_CannotReadFile = 3003, eErr_CannotWriteFile = 3004, + eErr_InvalidOverlappedPosition = 3005, // Task handling errors (4000+) eErr_MissingTaskSerializationPath = 4000, Index: src/libchcore/TLocalFilesystem.cpp =================================================================== diff -u -rb75259a9158d8b46d747e20ab4960dd002adb4b1 -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/TLocalFilesystem.cpp (.../TLocalFilesystem.cpp) (revision b75259a9158d8b46d747e20ab4960dd002adb4b1) +++ src/libchcore/TLocalFilesystem.cpp (.../TLocalFilesystem.cpp) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -491,7 +491,7 @@ if (!IsOpen()) THROW_CORE_EXCEPTION(eErr_InternalProblem); - DWORD dwToWrite = boost::numeric_cast(rBuffer.GetBytesTransferred()); + DWORD dwToWrite = boost::numeric_cast(rBuffer.GetRealDataSize()); if (m_bNoBuffering && rBuffer.IsLastPart()) dwToWrite = RoundUp(dwToWrite, MaxSectorSize); Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -refe016ef1d0cb0cf1ba379dbe3693e35f6a2361e -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision efe016ef1d0cb0cf1ba379dbe3693e35f6a2361e) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -21,6 +21,8 @@ #include "TOverlappedDataBuffer.h" #include "TCoreException.h" #include "ErrorCodes.h" +#include +#include BEGIN_CHCORE_NAMESPACE @@ -33,9 +35,11 @@ m_eventReadPossible(true, false), m_eventWritePossible(true, false), m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, true), m_stBufferSize(0), m_ullNextExpectedWritePosition(0), m_bDataSourceFinished(false), + m_bDataWritingFinished(false), m_ullNextReadBufferOrder(0), m_ullNextWriteBufferOrder(0), m_ullNextFinishedBufferOrder(0) @@ -46,9 +50,11 @@ m_eventReadPossible(true, false), m_eventWritePossible(true, false), m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, false), m_stBufferSize(0), m_ullNextExpectedWritePosition(0), m_bDataSourceFinished(false), + m_bDataWritingFinished(false), m_ullNextReadBufferOrder(0), m_ullNextWriteBufferOrder(0), m_ullNextFinishedBufferOrder(0) @@ -70,6 +76,7 @@ pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); UpdateReadPossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -84,6 +91,7 @@ m_listEmptyBuffers.push_back(pBuffer); UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() @@ -108,6 +116,7 @@ ++m_ullNextWriteBufferOrder; UpdateWritePossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -124,22 +133,24 @@ // then it can be treated as an empty buffer if (pBuffer->IsLastPart() && m_bDataSourceFinished && pBuffer->GetBytesTransferred() == 0) { - // not using AddEmptyBuffer() as we there is no need for changing the signals (they are already set correctly) - m_listEmptyBuffers.push_back(pBuffer); + AddEmptyBuffer(pBuffer); return; } - m_setFullBuffers.insert(pBuffer); + std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); if(pBuffer->IsLastPart()) m_bDataSourceFinished = true; UpdateWritePossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() { - if (m_setFullBuffers.empty()) + if (m_bDataWritingFinished || m_setFullBuffers.empty()) m_eventWritePossible.ResetEvent(); else { @@ -163,7 +174,12 @@ ++m_ullNextFinishedBufferOrder; + // if this is the last part - mark that writing is finished, so that no other buffer will be written + if (pBuffer->IsLastPart()) + m_bDataWritingFinished = true; + UpdateWriteFinishedEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); return pBuffer; } @@ -176,9 +192,12 @@ if (!pBuffer) THROW_CORE_EXCEPTION(eErr_InvalidPointer); - m_setFinishedBuffers.insert(pBuffer); + std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + THROW_CORE_EXCEPTION(eErr_InvalidOverlappedPosition); UpdateWriteFinishedEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() @@ -195,6 +214,15 @@ } } +void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() +{ + size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); + if (stCurrentBuffers == m_listAllBuffers.size()) + m_eventAllBuffersAccountedFor.SetEvent(); + else + m_eventAllBuffersAccountedFor.ResetEvent(); +} + void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { // sanity check - if any of the buffers are still in use, we can't change the sizes @@ -227,6 +255,7 @@ } UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); return; } @@ -240,6 +269,7 @@ m_stBufferSize = stCount; UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); } void TOverlappedDataBufferQueue::DataSourceChanged() @@ -250,11 +280,12 @@ THROW_CORE_EXCEPTION(eErr_InternalProblem); m_bDataSourceFinished = false; + m_bDataWritingFinished = false; m_ullNextReadBufferOrder = 0; m_ullNextWriteBufferOrder = 0; m_ullNextFinishedBufferOrder = 0; - m_eventReadPossible.SetEvent(); + UpdateReadPossibleEvent(); m_eventWritePossible.ResetEvent(); m_eventWriteFinished.ResetEvent(); } @@ -279,4 +310,30 @@ } } +void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) +{ + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; + + bool bExit = false; + while (!bExit) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + ATLTRACE(_T("STATUS_USER_APC while waiting for missing buffers\n")); + break; + + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; + + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } + } +} + END_CHCORE_NAMESPACE Index: src/libchcore/TOverlappedDataBufferQueue.h =================================================================== diff -u -r9312d2ac24e7963495b234adb8b9628076b16023 -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/TOverlappedDataBufferQueue.h (.../TOverlappedDataBufferQueue.h) (revision 9312d2ac24e7963495b234adb8b9628076b16023) +++ src/libchcore/TOverlappedDataBufferQueue.h (.../TOverlappedDataBufferQueue.h) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -56,25 +56,35 @@ void DataSourceChanged(); // event access - HANDLE GetEventReadPossibleHandle() { return m_eventReadPossible.Handle(); } - HANDLE GetEventWritePossibleHandle() { return m_eventWritePossible.Handle(); } - HANDLE GetEventWriteFinishedHandle() { return m_eventWriteFinished.Handle(); } + HANDLE GetEventReadPossibleHandle() const { return m_eventReadPossible.Handle(); } + HANDLE GetEventWritePossibleHandle() const { return m_eventWritePossible.Handle(); } + HANDLE GetEventWriteFinishedHandle() const { return m_eventWriteFinished.Handle(); } + HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } + void WaitForMissingBuffers(HANDLE hKillEvent); + private: void CleanupBuffers(); void UpdateReadPossibleEvent(); void UpdateWritePossibleEvent(); void UpdateWriteFinishedEvent(); + void UpdateAllBuffersAccountedFor(); private: std::deque> m_listAllBuffers; size_t m_stBufferSize; std::list m_listEmptyBuffers; - std::set m_setFullBuffers; - std::set m_setFinishedBuffers; + using FullBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions > ; + FullBuffersSet m_setFullBuffers; + + using FinishedBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions > ; + FinishedBuffersSet m_setFinishedBuffers; + bool m_bDataSourceFinished; // input file was already read to the end + bool m_bDataWritingFinished; // output file was already written to the end + unsigned long long m_ullNextExpectedWritePosition; // current write file pointer unsigned long long m_ullNextReadBufferOrder; // next order id for read buffers unsigned long long m_ullNextWriteBufferOrder; // next order id to be processed when writing @@ -83,6 +93,7 @@ TEvent m_eventReadPossible; TEvent m_eventWritePossible; TEvent m_eventWriteFinished; + TEvent m_eventAllBuffersAccountedFor; }; END_CHCORE_NAMESPACE Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -refe016ef1d0cb0cf1ba379dbe3693e35f6a2361e -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision efe016ef1d0cb0cf1ba379dbe3693e35f6a2361e) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -508,6 +508,8 @@ pData->bProcessed = true; m_tSubTaskStats.SetCurrentItemProcessedSize(0); + pData->dbBuffer.WaitForMissingBuffers(rThreadController.GetKillThreadHandle()); + return TSubTaskBase::eSubResult_Continue; } Index: src/libchcore/Tests/TOverlappedDataBufferQueueTests.cpp =================================================================== diff -u -refe016ef1d0cb0cf1ba379dbe3693e35f6a2361e -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/Tests/TOverlappedDataBufferQueueTests.cpp (.../TOverlappedDataBufferQueueTests.cpp) (revision efe016ef1d0cb0cf1ba379dbe3693e35f6a2361e) +++ src/libchcore/Tests/TOverlappedDataBufferQueueTests.cpp (.../TOverlappedDataBufferQueueTests.cpp) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -6,6 +6,19 @@ using namespace chcore; +#define EXPECT_TIMEOUT(handle)\ + {\ + DWORD dwResult = WaitForSingleObject(handle, 0); \ + EXPECT_EQ(WAIT_TIMEOUT, dwResult); \ + } + +#define EXPECT_SIGNALED(handle)\ + {\ + DWORD dwResult = WaitForSingleObject(handle, 0); \ + EXPECT_EQ(WAIT_OBJECT_0 + 0, dwResult); \ + } + + TEST(TOverlappedDataBufferQueueTests, DefaultConstructor_SanityTest) { TOverlappedDataBufferQueue queue; @@ -50,6 +63,7 @@ EXPECT_EQ(WAIT_TIMEOUT, dwResult); } +/////////////////////////////////////////////////////////////////////////////////////////////////// TEST(TOverlappedDataBufferQueueTests, GetEmptyBuffer) { TOverlappedDataBufferQueue queue(3, 32768); @@ -94,19 +108,79 @@ EXPECT_EQ(WAIT_OBJECT_0 + 0, dwResult); } +/////////////////////////////////////////////////////////////////////////////////////////////////// TEST(TOverlappedDataBufferQueueTests, GetFullBuffer_AddFullBuffer) { TOverlappedDataBufferQueue queue(3, 32768); + TOverlappedDataBuffer* pBuffer = queue.GetEmptyBuffer(); - TOverlappedDataBuffer* pBuffers[3] = { queue.GetEmptyBuffer(), queue.GetEmptyBuffer(), queue.GetEmptyBuffer() }; - - TOverlappedDataBuffer* pBuffer = pBuffers[0]; - - pBuffer->InitForRead(0, 1230); + pBuffer->InitForRead(0, 1280); pBuffer->SetBytesTransferred(1230); pBuffer->SetStatusCode(0); queue.AddFullBuffer(pBuffer); DWORD dwResult = WaitForSingleObject(queue.GetEventWritePossibleHandle(), 0); EXPECT_EQ(WAIT_OBJECT_0 + 0, dwResult); + + pBuffer = queue.GetFullBuffer(); + + dwResult = WaitForSingleObject(queue.GetEventWritePossibleHandle(), 0); + EXPECT_EQ(WAIT_TIMEOUT, dwResult); } + +TEST(TOverlappedDataBufferQueueTests, GetFullBuffer_AddFullBuffer_OutOfOrder) +{ + TOverlappedDataBufferQueue queue(3, 32768); + TOverlappedDataBuffer* pBuffers[3] = { queue.GetEmptyBuffer(), queue.GetEmptyBuffer(), queue.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + EXPECT_TIMEOUT(queue.GetEventWritePossibleHandle()); + + queue.AddFullBuffer(pBuffers[1]); + EXPECT_TIMEOUT(queue.GetEventWritePossibleHandle()); + + queue.AddFullBuffer(pBuffers[2]); + EXPECT_TIMEOUT(queue.GetEventWritePossibleHandle()); + + queue.AddFullBuffer(pBuffers[0]); + EXPECT_SIGNALED(queue.GetEventWritePossibleHandle()); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedDataBufferQueueTests, GetFinishedBuffer_AddFinishedBuffer_OutOfOrder) +{ + TOverlappedDataBufferQueue queue(3, 32768); + TOverlappedDataBuffer* pBuffers[3] = { queue.GetEmptyBuffer(), queue.GetEmptyBuffer(), queue.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + queue.AddFinishedBuffer(pBuffers[1]); + EXPECT_TIMEOUT(queue.GetEventWriteFinishedHandle()); + queue.AddFinishedBuffer(pBuffers[2]); + EXPECT_TIMEOUT(queue.GetEventWriteFinishedHandle()); + queue.AddFinishedBuffer(pBuffers[0]); + EXPECT_SIGNALED(queue.GetEventWriteFinishedHandle()); +} Index: src/libchcore/libchcore.vc120.vcxproj =================================================================== diff -u -refe016ef1d0cb0cf1ba379dbe3693e35f6a2361e -re4005a958c9412d890eeff1e8087c8298aa7bcf7 --- src/libchcore/libchcore.vc120.vcxproj (.../libchcore.vc120.vcxproj) (revision efe016ef1d0cb0cf1ba379dbe3693e35f6a2361e) +++ src/libchcore/libchcore.vc120.vcxproj (.../libchcore.vc120.vcxproj) (revision e4005a958c9412d890eeff1e8087c8298aa7bcf7) @@ -687,8 +687,18 @@ true true - - + + true + true + true + true + + + true + true + true + true +