Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) @@ -78,6 +78,22 @@ m_eventHasBuffers.ResetEvent(); } + std::vector TOrderedBufferQueue::GetUnneededLastParts() + { + auto iterFind = std::find_if(m_setBuffers.begin(), m_setBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); }); + if(iterFind == m_setBuffers.end() || ++iterFind == m_setBuffers.end()) + return std::vector(); + + auto iterInvalidParts = std::find_if(iterFind, m_setBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); }); + if(iterInvalidParts != m_setBuffers.end()) + throw TCoreException(eErr_InvalidArgument, L"Found non-last-parts after last-part", LOCATION); + + std::vector vBuffers(iterFind, m_setBuffers.end()); + m_setBuffers.erase(iterFind, m_setBuffers.end()); + + return vBuffers; + } + size_t TOrderedBufferQueue::GetCount() const { return m_setBuffers.size(); Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) @@ -40,6 +40,7 @@ const TOverlappedDataBuffer* const Peek() const; void Clear(); + std::vector GetUnneededLastParts(); size_t GetCount() const; bool IsEmpty() const; Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u -r6e4ac7776b68464371cd8522a2a8d79fbcab3b28 -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 --- src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 6e4ac7776b68464371cd8522a2a8d79fbcab3b28) +++ src/libchcore/TReadBufferQueueWrapper.cpp (.../TReadBufferQueueWrapper.cpp) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) @@ -29,6 +29,11 @@ m_dwChunkSize(dwChunkSize), m_eventHasBuffers(true, false) { + if(!spUnorderedQueue) + throw TCoreException(eErr_InvalidArgument, L"spUnorderedQueue is NULL", LOCATION); + if(dwChunkSize == 0) + throw TCoreException(eErr_InvalidArgument, L"dwChunkSize cannot be 0", LOCATION); + UpdateHasBuffers(); } @@ -48,6 +53,18 @@ m_tClaimedQueue.Push(pBuffer); } } + else if(IsDataSourceFinished()) + { + if(!pBuffer->IsLastPart()) + { + if(pBuffer->GetFilePosition() > m_ullDataSourceFinishedPos) + throw TCoreException(eErr_InvalidArgument, L"Adding regular buffer after the queue was marked as finished", LOCATION); + + m_tClaimedQueue.Push(pBuffer); + } + else + m_spUnorderedQueue->Push(pBuffer); + } else m_tClaimedQueue.Push(pBuffer); @@ -94,35 +111,27 @@ return !m_tClaimedQueue.IsEmpty() || !m_spUnorderedQueue->IsEmpty(); } - void TReadBufferQueueWrapper::Clear() - { - m_spUnorderedQueue->Clear(); - m_ullNextReadPosition = 0; - m_dwChunkSize = 0; - m_ullDataSourceFinishedPos = NoPosition; - m_eventHasBuffers.ResetEvent(); - } - size_t TReadBufferQueueWrapper::GetCount() const { return m_tClaimedQueue.GetCount(); } - bool TReadBufferQueueWrapper::IsEmpty() const - { - return m_spUnorderedQueue->IsEmpty(); - } - void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) { - if(pBuffer->IsLastPart()) + if(!pBuffer->IsLastPart()) + throw TCoreException(eErr_InvalidArgument, L"Trying to set the end of data using unfinished buffer", LOCATION); + + if(pBuffer->GetFilePosition() < m_ullDataSourceFinishedPos) { - if(pBuffer->GetFilePosition() < m_ullDataSourceFinishedPos) + m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); + + std::vector vItems = m_tClaimedQueue.GetUnneededLastParts(); + for(TOverlappedDataBuffer* pBuffer : vItems) { - m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); - // #todo: release excessive claimed buffers - UpdateHasBuffers(); + m_spUnorderedQueue->Push(pBuffer); } + + UpdateHasBuffers(); } } Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u -rbef894e38e5c1486824787cf8c47a87a0828b228 -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 --- src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision bef894e38e5c1486824787cf8c47a87a0828b228) +++ src/libchcore/TReadBufferQueueWrapper.h (.../TReadBufferQueueWrapper.h) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) @@ -38,10 +38,7 @@ void Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); TOverlappedDataBuffer* Pop(); - void Clear(); - size_t GetCount() const; - bool IsEmpty() const; void SetDataSourceFinished(TOverlappedDataBuffer* pBuffer); bool IsDataSourceFinished() const; Index: src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp =================================================================== diff -u -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 -r71bc7ffbd5b707e2cbb78eb30677d82577d62ee1 --- src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) +++ src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (.../TReadBufferQueueWrapperTests.cpp) (revision 71bc7ffbd5b707e2cbb78eb30677d82577d62ee1) @@ -3,12 +3,242 @@ #include "gmock/gmock.h" #include "../TReadBufferQueueWrapper.h" #include +#include "../TCoreException.h" +#include "../GTestMacros.h" using namespace chcore; -TEST(TReadBufferQueueWrapperTests, DefaultTest) +TEST(TReadBufferQueueWrapperTests, ConstructorWithZeroChunkSize) { TBufferListPtr spList(std::make_shared()); - TReadBufferQueueWrapper queueWrapper(spList, 0, 0); + EXPECT_THROW(TReadBufferQueueWrapper(spList, 0, 0), TCoreException); } + +TEST(TReadBufferQueueWrapperTests, Constructor) +{ + TBufferListPtr spList(std::make_shared()); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + EXPECT_EQ(0, queue.GetCount()); + EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); + EXPECT_EQ(false, queue.IsDataSourceFinished()); +} + +TEST(TReadBufferQueueWrapperTests, Pop_EmptyQueue) +{ + TBufferListPtr spList(std::make_shared()); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + + EXPECT_EQ(nullptr, queue.Pop()); +} + +TEST(TReadBufferQueueWrapperTests, Pop_FromBufferList) +{ + TBufferListPtr spList(std::make_shared()); + TOverlappedDataBuffer buffer1(1024, nullptr); + TOverlappedDataBuffer buffer2(1024, nullptr); + TOverlappedDataBuffer buffer3(1024, nullptr); + TOverlappedDataBuffer buffer4(1024, nullptr); + spList->Push(&buffer1); + spList->Push(&buffer2); + spList->Push(&buffer3); + spList->Push(&buffer4); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + + EXPECT_EQ(&buffer4, queue.Pop()); + EXPECT_EQ(0, buffer4.GetFilePosition()); + EXPECT_EQ(1024, buffer4.GetRequestedDataSize()); + + EXPECT_EQ(&buffer3, queue.Pop()); + EXPECT_EQ(1024, buffer3.GetFilePosition()); + EXPECT_EQ(1024, buffer3.GetRequestedDataSize()); + + EXPECT_EQ(&buffer2, queue.Pop()); + EXPECT_EQ(2048, buffer2.GetFilePosition()); + EXPECT_EQ(1024, buffer2.GetRequestedDataSize()); + + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(3072, buffer1.GetFilePosition()); + EXPECT_EQ(1024, buffer1.GetRequestedDataSize()); + + EXPECT_EQ(nullptr, queue.Pop()); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_ClaimedBuffers) +{ + TBufferListPtr spList(std::make_shared()); + TReadBufferQueueWrapper queue(spList, 0, 1024); + + TOverlappedDataBuffer buffer1(1024, nullptr); + TOverlappedDataBuffer buffer2(1024, nullptr); + TOverlappedDataBuffer buffer3(1024, nullptr); + TOverlappedDataBuffer buffer4(1024, nullptr); + queue.Push(&buffer4, false); + queue.Push(&buffer3, false); + queue.Push(&buffer2, false); + queue.Push(&buffer1, false); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer4, queue.Pop()); + EXPECT_EQ(0, buffer4.GetFilePosition()); + EXPECT_EQ(1024, buffer4.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer3, queue.Pop()); + EXPECT_EQ(1024, buffer3.GetFilePosition()); + EXPECT_EQ(1024, buffer3.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer2, queue.Pop()); + EXPECT_EQ(2048, buffer2.GetFilePosition()); + EXPECT_EQ(1024, buffer2.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(3072, buffer1.GetFilePosition()); + EXPECT_EQ(1024, buffer1.GetRequestedDataSize()); + + EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); + EXPECT_EQ(nullptr, queue.Pop()); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_MixedBuffers) +{ + TBufferListPtr spList(std::make_shared()); + TOverlappedDataBuffer buffer1(1024, nullptr); + TOverlappedDataBuffer buffer2(1024, nullptr); + spList->Push(&buffer1); + spList->Push(&buffer2); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + + TOverlappedDataBuffer buffer3(1024, nullptr); + TOverlappedDataBuffer buffer4(1024, nullptr); + queue.Push(&buffer3, false); + queue.Push(&buffer4, false); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer3, queue.Pop()); + EXPECT_EQ(0, buffer3.GetFilePosition()); + EXPECT_EQ(1024, buffer3.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer4, queue.Pop()); + EXPECT_EQ(1024, buffer4.GetFilePosition()); + EXPECT_EQ(1024, buffer4.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer2, queue.Pop()); + EXPECT_EQ(2048, buffer2.GetFilePosition()); + EXPECT_EQ(1024, buffer2.GetRequestedDataSize()); + + EXPECT_SIGNALED(queue.GetHasBuffersEvent()); + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(3072, buffer1.GetFilePosition()); + EXPECT_EQ(1024, buffer1.GetRequestedDataSize()); + + EXPECT_TIMEOUT(queue.GetHasBuffersEvent()); + EXPECT_EQ(nullptr, queue.Pop()); +} + +///////////////////////////////////////////////////////////////////////////////// +// data source finished mode + +TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished) +{ + TBufferListPtr spList(std::make_shared()); + + TOverlappedDataBuffer buffer1(1024, nullptr); + buffer1.SetLastPart(true); + + spList->Push(&buffer1); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + queue.SetDataSourceFinished(&buffer1); + + EXPECT_EQ(true, queue.IsDataSourceFinished()); + EXPECT_EQ(nullptr, queue.Pop()); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinishedUsingInvalidBuffer) +{ + TBufferListPtr spList(std::make_shared()); + + TOverlappedDataBuffer buffer1(1024, nullptr); + spList->Push(&buffer1); + + TReadBufferQueueWrapper queue(spList, 0, 1024); + + EXPECT_THROW(queue.SetDataSourceFinished(&buffer1), TCoreException); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_CheckBufferMaintenance) +{ + TBufferListPtr spList(std::make_shared()); + TReadBufferQueueWrapper queue(spList, 0, 1024); + + TOverlappedDataBuffer buffer1(1024, nullptr); + buffer1.SetFilePosition(0); + buffer1.SetLastPart(true); + queue.Push(&buffer1, true); + TOverlappedDataBuffer buffer2(1024, nullptr); + buffer2.SetFilePosition(1024); + buffer2.SetLastPart(true); + queue.Push(&buffer2, true); + + queue.SetDataSourceFinished(&buffer1); + + EXPECT_EQ(1, queue.GetCount()); + EXPECT_EQ(&buffer1, queue.Pop()); + + EXPECT_EQ(1, spList->GetCount()); + EXPECT_EQ(&buffer2, spList->Pop()); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_ValidPushAfterFinished) +{ + TBufferListPtr spList(std::make_shared()); + TReadBufferQueueWrapper queue(spList, 0, 1024); + + TOverlappedDataBuffer buffer1(1024, nullptr); + buffer1.SetLastPart(true); + queue.Push(&buffer1, true); + + queue.SetDataSourceFinished(&buffer1); + + EXPECT_EQ(1, queue.GetCount()); + EXPECT_EQ(0, spList->GetCount()); + + TOverlappedDataBuffer buffer2(1024, nullptr); + buffer2.SetLastPart(true); + queue.Push(&buffer2, true); + + EXPECT_EQ(1, queue.GetCount()); + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(1, spList->GetCount()); + EXPECT_EQ(&buffer2, spList->Pop()); +} + +TEST(TReadBufferQueueWrapperTests, PushPop_DataSourceFinished_InvalidPushAfterFinished) +{ + TBufferListPtr spList(std::make_shared()); + TReadBufferQueueWrapper queue(spList, 0, 1024); + + TOverlappedDataBuffer buffer1(1024, nullptr); + buffer1.SetLastPart(true); + buffer1.SetFilePosition(0); + queue.Push(&buffer1, true); + + queue.SetDataSourceFinished(&buffer1); + + EXPECT_EQ(1, queue.GetCount()); + EXPECT_EQ(&buffer1, queue.Pop()); + EXPECT_EQ(0, spList->GetCount()); + + TOverlappedDataBuffer buffer2(1024, nullptr); + buffer2.SetFilePosition(1000); + EXPECT_THROW(queue.Push(&buffer2, true), TCoreException); +}