Index: src/libchcore/TLocalFilesystemFile.cpp =================================================================== diff -u -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -213,9 +213,8 @@ rBuffer.SetErrorCode(ERROR_SUCCESS); rBuffer.SetLastPart(true); - TOverlappedDataBufferQueue* pQueue = (TOverlappedDataBufferQueue*)rBuffer.GetParam(); + OverlappedReadCompleted(rBuffer.GetErrorCode(), 0, &rBuffer); - pQueue->AddFullBuffer(&rBuffer); // basically the same as OverlappedReadCompleted break; } Index: src/libchcore/TOverlappedDataBuffer.cpp =================================================================== diff -u -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TOverlappedDataBuffer.cpp (.../TOverlappedDataBuffer.cpp) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TOverlappedDataBuffer.cpp (.../TOverlappedDataBuffer.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -30,9 +30,6 @@ TOverlappedDataBuffer::TOverlappedDataBuffer(size_t stBufferSize, void* pParam) : m_pParam(pParam) { - if (!m_pParam) - throw TCoreException(eErr_InvalidPointer, L"m_pParam", LOCATION); - // initialize OVERLAPPED members Internal = 0; InternalHigh = 0; Index: src/libchcore/TOverlappedDataBuffer.h =================================================================== diff -u -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -78,6 +78,7 @@ void SetFilePosition(unsigned long long ullPosition) { OffsetHigh = (DWORD)(ullPosition >> 32); Offset = (DWORD)ullPosition; } void* GetParam() const { return m_pParam; } + void SetParam(void* pParam) { m_pParam = pParam; } // composite initialization void InitForRead(unsigned long long ullPosition, DWORD dwRequestedSize); Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (.../TOverlappedDataBufferQueue.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -27,266 +27,73 @@ namespace chcore { - /////////////////////////////////////////////////////////////////////////////////// - // class TOverlappedDataBuffer - VOID CALLBACK OverlappedReadCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue() : + m_eventHasBuffers(true, false), + m_eventAllBuffersAccountedFor(true, true) { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); - - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; - TOverlappedDataBufferQueue* pQueue = (TOverlappedDataBufferQueue*)pBuffer->GetParam(); - - // determine if this is the last packet - bool bEof = (dwErrorCode == ERROR_HANDLE_EOF || - pBuffer->GetStatusCode() == STATUS_END_OF_FILE || - (dwErrorCode == ERROR_SUCCESS && dwNumberOfBytesTransfered != pBuffer->GetRequestedDataSize())); - - // reset status code and error code if they pointed out to EOF - if(pBuffer->GetStatusCode() == STATUS_END_OF_FILE) - pBuffer->SetStatusCode(0); - - pBuffer->SetErrorCode(dwErrorCode == ERROR_HANDLE_EOF ? ERROR_SUCCESS : dwErrorCode); - - pBuffer->SetRealDataSize(dwNumberOfBytesTransfered); - pBuffer->SetLastPart(bEof); - - pQueue->AddFullBuffer(pBuffer); } - VOID CALLBACK OverlappedWriteCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize) : + TOverlappedDataBufferQueue() { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); - - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; - TOverlappedDataBufferQueue* pQueue = (TOverlappedDataBufferQueue*)pBuffer->GetParam(); - - pBuffer->SetErrorCode(dwErrorCode); - - pQueue->AddFinishedBuffer(pBuffer); - } - - bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) - { - return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); - } - - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData) : - m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, true), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) - { - } - - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData, size_t stCount, size_t stBufferSize) : - m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_eventReadPossible(true, false), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, false), - m_bDataSourceFinished(false), - m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), - m_ullNextWriteBufferOrder(0), - m_ullNextFinishedBufferOrder(0) - { ReinitializeBuffers(stCount, stBufferSize); } TOverlappedDataBufferQueue::~TOverlappedDataBufferQueue() { } - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetEmptyBuffer() + TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetBuffer() { - if (!m_listEmptyBuffers.empty()) + if (!m_dequeBuffers.empty()) { - TOverlappedDataBuffer* pBuffer = m_listEmptyBuffers.front(); - m_listEmptyBuffers.pop_front(); + TOverlappedDataBuffer* pBuffer = m_dequeBuffers.front(); + m_dequeBuffers.pop_front(); - pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); - UpdateReadPossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - return pBuffer; } return nullptr; } - void TOverlappedDataBufferQueue::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + bool TOverlappedDataBufferQueue::AreAllBuffersAccountedFor() const { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer as empty; buffer-order: " << pBuffer->GetBufferOrder(); - - m_listEmptyBuffers.push_back(pBuffer); - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); + return m_dequeBuffers.size() == m_listAllBuffers.size(); } - void TOverlappedDataBufferQueue::UpdateReadPossibleEvent() + void TOverlappedDataBufferQueue::AddBuffer(TOverlappedDataBuffer* pBuffer) { - if (!m_listEmptyBuffers.empty() && !m_bDataSourceFinished) - m_eventReadPossible.SetEvent(); - else - m_eventReadPossible.ResetEvent(); - } - - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFullBuffer() - { - if (!m_setFullBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) - return nullptr; - - m_setFullBuffers.erase(m_setFullBuffers.begin()); - - if(pBuffer->GetErrorCode() == ERROR_SUCCESS) - { - // if this is the last part - mark that writing is finished, so that no other buffer will be written - if(pBuffer->IsLastPart()) - m_bDataWritingFinished = true; - - ++m_ullNextWriteBufferOrder; - } - - UpdateWritePossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - - return nullptr; - } - - void TOverlappedDataBufferQueue::AddFullBuffer(TOverlappedDataBuffer* pBuffer) - { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetBufferOrder() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if (pBuffer->IsLastPart()) - { - m_bDataSourceFinished = true; - UpdateReadPossibleEvent(); - } - - UpdateWritePossibleEvent(); + m_dequeBuffers.push_back(pBuffer); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); } - void TOverlappedDataBufferQueue::UpdateWritePossibleEvent() + void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() { - if (m_bDataWritingFinished || m_setFullBuffers.empty()) - m_eventWritePossible.ResetEvent(); + if (AreAllBuffersAccountedFor()) + m_eventAllBuffersAccountedFor.SetEvent(); else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) - m_eventWritePossible.SetEvent(); - else - m_eventWritePossible.ResetEvent(); - } - } - - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetFinishedBuffer() - { - if (!m_setFinishedBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) - return nullptr; - - m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); - - m_eventWriteFinished.ResetEvent(); // faster than UpdateWriteFinishedEvent() and the final effect should be the same m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - - return nullptr; } - void TOverlappedDataBufferQueue::MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) + void TOverlappedDataBufferQueue::UpdateHasBuffers() { - if(!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - // allow next finished buffer to be processed - ++m_ullNextFinishedBufferOrder; - UpdateWriteFinishedEvent(); - } - - void TOverlappedDataBufferQueue::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) - { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetBufferOrder() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - UpdateWriteFinishedEvent(); - UpdateAllBuffersAccountedFor(); - } - - void TOverlappedDataBufferQueue::UpdateWriteFinishedEvent() - { - if (m_setFinishedBuffers.empty()) - m_eventWriteFinished.ResetEvent(); + if(!m_dequeBuffers.empty()) + m_eventHasBuffers.SetEvent(); else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) - m_eventWriteFinished.SetEvent(); - else - m_eventWriteFinished.ResetEvent(); - } + m_eventHasBuffers.ResetEvent(); } - void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() - { - size_t stCurrentBuffers = m_listEmptyBuffers.size() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); - if (stCurrentBuffers == m_listAllBuffers.size()) - m_eventAllBuffersAccountedFor.SetEvent(); - else - m_eventAllBuffersAccountedFor.ResetEvent(); - } - void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { // sanity check - if any of the buffers are still in use, we can't change the sizes - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) + if (m_listAllBuffers.size() != m_dequeBuffers.size()) throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); if (stBufferSize == 0) throw TCoreException(eErr_InvalidArgument, L"stBufferSize", LOCATION); @@ -295,7 +102,7 @@ { // buffer sizes increased - clear current buffers and proceed with creating new ones m_listAllBuffers.clear(); - m_listEmptyBuffers.clear(); + m_dequeBuffers.clear(); } else if (stCount == m_listAllBuffers.size()) return; // nothing really changed @@ -304,30 +111,30 @@ else if (stCount < m_listAllBuffers.size()) { // there are too many buffers - reduce - m_listEmptyBuffers.clear(); + m_dequeBuffers.clear(); size_t stCountToRemove = m_listAllBuffers.size() - stCount; m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); for (const auto& upElement : m_listAllBuffers) { - m_listEmptyBuffers.push_back(upElement.get()); + m_dequeBuffers.push_back(upElement.get()); } - UpdateReadPossibleEvent(); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); return; } // allocate buffers while (stCount--) { - auto upBuffer = std::make_unique(stBufferSize, this); - m_listEmptyBuffers.push_back(upBuffer.get()); + auto upBuffer = std::make_unique(stBufferSize, nullptr); + m_dequeBuffers.push_back(upBuffer.get()); m_listAllBuffers.push_back(std::move(upBuffer)); } - UpdateReadPossibleEvent(); + UpdateHasBuffers(); UpdateAllBuffersAccountedFor(); } @@ -336,6 +143,11 @@ return m_listAllBuffers.size(); } + size_t TOverlappedDataBufferQueue::GetAvailableBufferCount() const + { + return m_dequeBuffers.size(); + } + size_t TOverlappedDataBufferQueue::GetSingleBufferSize() const { if (m_listAllBuffers.empty()) @@ -344,46 +156,8 @@ return (*m_listAllBuffers.begin())->GetBufferSize(); } - void TOverlappedDataBufferQueue::DataSourceChanged() + void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) const { - CleanupBuffers(); - - if (m_listAllBuffers.size() != m_listEmptyBuffers.size()) - throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); - - m_bDataSourceFinished = false; - m_bDataWritingFinished = false; - m_ullNextReadBufferOrder = 0; - m_ullNextWriteBufferOrder = 0; - m_ullNextFinishedBufferOrder = 0; - - UpdateReadPossibleEvent(); - m_eventWritePossible.ResetEvent(); - m_eventWriteFinished.ResetEvent(); - } - - void TOverlappedDataBufferQueue::CleanupBuffers() - { - // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state - // then this buffer is really the empty one - if (m_bDataSourceFinished && !m_setFullBuffers.empty()) - { - auto iterCurrent = m_setFullBuffers.begin(); - while (iterCurrent != m_setFullBuffers.end()) - { - if ((*iterCurrent)->IsLastPart()) - { - m_listEmptyBuffers.push_back(*iterCurrent); - iterCurrent = m_setFullBuffers.erase(iterCurrent); - } - else - ++iterCurrent; - } - } - } - - void TOverlappedDataBufferQueue::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) - { enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; @@ -405,11 +179,5 @@ break; } } - - std::copy(m_setFullBuffers.begin(), m_setFullBuffers.end(), std::back_inserter(m_listEmptyBuffers)); - std::copy(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), std::back_inserter(m_listEmptyBuffers)); - - m_setFinishedBuffers.clear(); - m_setFullBuffers.clear(); } } Index: src/libchcore/TOverlappedDataBufferQueue.h =================================================================== diff -u -r3c209ebdc14ac0829468249805b7587880761f59 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TOverlappedDataBufferQueue.h (.../TOverlappedDataBufferQueue.h) (revision 3c209ebdc14ac0829468249805b7587880761f59) +++ src/libchcore/TOverlappedDataBufferQueue.h (.../TOverlappedDataBufferQueue.h) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -21,88 +21,52 @@ #include #include "TEvent.h" -#include "IOverlappedDataBufferQueue.h" -#include "../liblogger/TLogFileData.h" -#include "../liblogger/TLogger.h" namespace chcore { class TOverlappedDataBuffer; - struct CompareBufferPositions + class TOverlappedDataBufferQueue { - bool operator()(const TOverlappedDataBuffer* rBufferA, const TOverlappedDataBuffer* rBufferB); - }; - - class TOverlappedDataBufferQueue : public IOverlappedDataBufferQueue - { public: - explicit TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData); - TOverlappedDataBufferQueue(const logger::TLogFileDataPtr& spLogFileData, size_t stCount, size_t stBufferSize); + TOverlappedDataBufferQueue(); + TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize); + TOverlappedDataBufferQueue(const TOverlappedDataBufferQueue&) = delete; ~TOverlappedDataBufferQueue(); + TOverlappedDataBufferQueue& operator=(const TOverlappedDataBufferQueue&) = delete; + void ReinitializeBuffers(size_t stCount, size_t stBufferSize); size_t GetTotalBufferCount() const; + size_t GetAvailableBufferCount() const; size_t GetSingleBufferSize() const; // buffer management - virtual void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) override; - virtual TOverlappedDataBuffer* GetEmptyBuffer() override; + void AddBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetBuffer(); - virtual void AddFullBuffer(TOverlappedDataBuffer* pBuffer) override; - virtual TOverlappedDataBuffer* GetFullBuffer() override; + bool AreAllBuffersAccountedFor() const; - virtual void AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) override; - virtual TOverlappedDataBuffer* GetFinishedBuffer() override; - virtual void MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) override; - - // data source change - void DataSourceChanged(); - - // processing info - bool IsDataSourceFinished() const { return m_bDataSourceFinished; } - bool IsDataWritingFinished() const { return m_bDataWritingFinished; } - // event access - HANDLE GetEventReadPossibleHandle() const { return m_eventReadPossible.Handle(); } - HANDLE GetEventWritePossibleHandle() const { return m_eventWritePossible.Handle(); } - HANDLE GetEventWriteFinishedHandle() const { return m_eventWriteFinished.Handle(); } + HANDLE GetEventHasBuffers() const { return m_eventHasBuffers.Handle(); } HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } - void WaitForMissingBuffersAndResetState(HANDLE hKillEvent); + void WaitForMissingBuffers(HANDLE hKillEvent) const; private: - void CleanupBuffers(); - void UpdateReadPossibleEvent(); - void UpdateWritePossibleEvent(); - void UpdateWriteFinishedEvent(); void UpdateAllBuffersAccountedFor(); + void UpdateHasBuffers(); private: - logger::TLoggerPtr m_spLog; + std::vector> m_listAllBuffers; - std::deque> m_listAllBuffers; + std::deque m_dequeBuffers; - std::list m_listEmptyBuffers; - - using FullBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions >; - FullBuffersSet m_setFullBuffers; - - using FinishedBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions >; - FinishedBuffersSet m_setFinishedBuffers; - - bool m_bDataSourceFinished; // input file was already read to the end - bool m_bDataWritingFinished; // output file was already written to the end - - unsigned long long m_ullNextReadBufferOrder; // next order id for read buffers - unsigned long long m_ullNextWriteBufferOrder; // next order id to be processed when writing - unsigned long long m_ullNextFinishedBufferOrder; // next order id to be processed when finishing writing - - TEvent m_eventReadPossible; - TEvent m_eventWritePossible; - TEvent m_eventWriteFinished; + TEvent m_eventHasBuffers; TEvent m_eventAllBuffersAccountedFor; }; + + using TOverlappedDataBufferQueuePtr = std::shared_ptr; } #endif Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u --- src/libchcore/TOverlappedReaderWriter.cpp (revision 0) +++ src/libchcore/TOverlappedReaderWriter.cpp (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -0,0 +1,325 @@ +// ============================================================================ +// Copyright (C) 2001-2015 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedReaderWriter.h" +#include "TOverlappedDataBuffer.h" +#include "TCoreException.h" +#include "ErrorCodes.h" +#include + +#define STATUS_END_OF_FILE 0xc0000011 + +namespace chcore +{ + /////////////////////////////////////////////////////////////////////////////////// + // class TOverlappedDataBuffer + VOID CALLBACK OverlappedReadCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + { + _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); + + TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; + TOverlappedReaderWriter* pQueue = (TOverlappedReaderWriter*)pBuffer->GetParam(); + + // determine if this is the last packet + bool bEof = (dwErrorCode == ERROR_HANDLE_EOF || + pBuffer->GetStatusCode() == STATUS_END_OF_FILE || + (dwErrorCode == ERROR_SUCCESS && dwNumberOfBytesTransfered != pBuffer->GetRequestedDataSize())); + + // reset status code and error code if they pointed out to EOF + if(pBuffer->GetStatusCode() == STATUS_END_OF_FILE) + pBuffer->SetStatusCode(0); + + pBuffer->SetErrorCode(dwErrorCode == ERROR_HANDLE_EOF ? ERROR_SUCCESS : dwErrorCode); + + pBuffer->SetRealDataSize(dwNumberOfBytesTransfered); + pBuffer->SetLastPart(bEof); + + pQueue->AddFullBuffer(pBuffer); + } + + VOID CALLBACK OverlappedWriteCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) + { + _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); + + TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; + TOverlappedReaderWriter* pQueue = (TOverlappedReaderWriter*)pBuffer->GetParam(); + + pBuffer->SetErrorCode(dwErrorCode); + + pQueue->AddFinishedBuffer(pBuffer); + } + + bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) + { + return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); + } + + TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedDataBufferQueuePtr& spBuffers) : + m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), + m_spBuffers(spBuffers), + m_eventWritePossible(true, false), + m_eventWriteFinished(true, false), + m_eventAllBuffersAccountedFor(true, true), + m_bDataSourceFinished(false), + m_bDataWritingFinished(false), + m_ullNextReadBufferOrder(0), + m_ullNextWriteBufferOrder(0), + m_ullNextFinishedBufferOrder(0) + { + } + + TOverlappedReaderWriter::~TOverlappedReaderWriter() + { + } + + TOverlappedDataBuffer* TOverlappedReaderWriter::GetEmptyBuffer() + { + TOverlappedDataBuffer* pBuffer = m_spBuffers->GetBuffer(); + if(pBuffer) + { + pBuffer->SetParam(this); + pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); + + m_eventAllBuffersAccountedFor.ResetEvent(); + } + + return pBuffer; + } + + void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as empty; buffer-order: " << pBuffer->GetBufferOrder(); + + pBuffer->SetParam(nullptr); + m_spBuffers->AddBuffer(pBuffer); + UpdateAllBuffersAccountedFor(); + } + + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFullBuffer() + { + if (!m_setFullBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); + if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) + return nullptr; + + m_setFullBuffers.erase(m_setFullBuffers.begin()); + + if(pBuffer->GetErrorCode() == ERROR_SUCCESS) + { + // if this is the last part - mark that writing is finished, so that no other buffer will be written + if(pBuffer->IsLastPart()) + m_bDataWritingFinished = true; + + ++m_ullNextWriteBufferOrder; + } + + UpdateWritePossibleEvent(); + m_eventAllBuffersAccountedFor.ResetEvent(); + + return pBuffer; + } + + return nullptr; + } + + void TOverlappedReaderWriter::AddFullBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetBufferOrder() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); + + if (pBuffer->IsLastPart()) + m_bDataSourceFinished = true; + + UpdateWritePossibleEvent(); + UpdateAllBuffersAccountedFor(); + } + + void TOverlappedReaderWriter::UpdateWritePossibleEvent() + { + if (m_bDataWritingFinished || m_setFullBuffers.empty()) + m_eventWritePossible.ResetEvent(); + else + { + TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); + if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) + m_eventWritePossible.SetEvent(); + else + m_eventWritePossible.ResetEvent(); + } + } + + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedBuffer() + { + if (!m_setFinishedBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); + if (pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) + return nullptr; + + m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); + + m_eventWriteFinished.ResetEvent(); // faster than UpdateWriteFinishedEvent() and the final effect should be the same + m_eventAllBuffersAccountedFor.ResetEvent(); + + return pBuffer; + } + + return nullptr; + } + + void TOverlappedReaderWriter::MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + // allow next finished buffer to be processed + ++m_ullNextFinishedBufferOrder; + UpdateWriteFinishedEvent(); + } + + void TOverlappedReaderWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetBufferOrder() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); + if (!pairInsertInfo.second) + throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); + + UpdateWriteFinishedEvent(); + UpdateAllBuffersAccountedFor(); + } + + void TOverlappedReaderWriter::UpdateWriteFinishedEvent() + { + if (m_setFinishedBuffers.empty()) + m_eventWriteFinished.ResetEvent(); + else + { + TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); + if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) + m_eventWriteFinished.SetEvent(); + else + m_eventWriteFinished.ResetEvent(); + } + } + + void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() + { + size_t stCurrentBuffers = m_spBuffers->GetAvailableBufferCount() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); + if (stCurrentBuffers == m_spBuffers->GetTotalBufferCount()) + m_eventAllBuffersAccountedFor.SetEvent(); + else + m_eventAllBuffersAccountedFor.ResetEvent(); + } + + void TOverlappedReaderWriter::DataSourceChanged() + { + CleanupBuffers(); + + if (!m_spBuffers->AreAllBuffersAccountedFor()) + throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); + + m_bDataSourceFinished = false; + m_bDataWritingFinished = false; + m_ullNextReadBufferOrder = 0; + m_ullNextWriteBufferOrder = 0; + m_ullNextFinishedBufferOrder = 0; + + m_eventWritePossible.ResetEvent(); + m_eventWriteFinished.ResetEvent(); + } + + void TOverlappedReaderWriter::CleanupBuffers() + { + // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state + // then this buffer is really the empty one + if (m_bDataSourceFinished && !m_setFullBuffers.empty()) + { + auto iterCurrent = m_setFullBuffers.begin(); + while (iterCurrent != m_setFullBuffers.end()) + { + if ((*iterCurrent)->IsLastPart()) + { + m_spBuffers->AddBuffer(*iterCurrent); + iterCurrent = m_setFullBuffers.erase(iterCurrent); + } + else + ++iterCurrent; + } + } + } + + void TOverlappedReaderWriter::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) + { + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; + + bool bExit = false; + while (!bExit) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; + + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } + } + + auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spBuffers->AddBuffer(pBuffer); }; + + std::for_each(m_setFullBuffers.begin(), m_setFullBuffers.end(), funcAdd); + std::for_each(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), funcAdd); + + m_setFinishedBuffers.clear(); + m_setFullBuffers.clear(); + } +} Fisheye: tag 3c209ebdc14ac0829468249805b7587880761f59 is not in file src/libchcore/TOverlappedReaderWriter.h Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -r7fd37811dbce76d429b80e4703e88925982f5859 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 7fd37811dbce76d429b80e4703e88925982f5859) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -46,20 +46,22 @@ #include "TFilesystemFeedbackWrapper.h" #include "TFilesystemFileFeedbackWrapper.h" #include "TDestinationPathProvider.h" +#include "TOverlappedReaderWriter.h" namespace chcore { struct CUSTOM_COPY_PARAMS { - explicit CUSTOM_COPY_PARAMS(const logger::TLogFileDataPtr& spFileData) : dbBuffer(spFileData) + CUSTOM_COPY_PARAMS() : + spBuffer(std::make_shared()) { } TFileInfoPtr spSrcFile; // CFileInfo - src file TSmartPath pathDstFile; // dest path with filename TBufferSizes tBufferSizes; - TOverlappedDataBufferQueue dbBuffer; // buffer handling + TOverlappedDataBufferQueuePtr spBuffer; // buffer handling bool bOnlyCreate = false; // flag from configuration - skips real copying - only create bool bProcessed = false; // has the element been processed ? (false if skipped) }; @@ -141,14 +143,14 @@ bool bCurrentFileSilentResume = m_tSubTaskStats.CanCurrentItemSilentResume(); // create a buffer of size m_nBufferSize - CUSTOM_COPY_PARAMS ccp(m_spLog->GetLogFileData()); + CUSTOM_COPY_PARAMS ccp; ccp.bProcessed = false; ccp.bOnlyCreate = GetTaskPropValue(rConfig); // remove changes in buffer sizes to avoid re-creation later rCfgTracker.RemoveModificationSet(TOptionsSet() % eTO_DefaultBufferSize % eTO_OneDiskBufferSize % eTO_TwoDisksBufferSize % eTO_CDBufferSize % eTO_LANBufferSize % eTO_UseOnlyDefaultBuffer % eTO_BufferQueueDepth); - AdjustBufferIfNeeded(ccp.dbBuffer, ccp.tBufferSizes, true); + AdjustBufferIfNeeded(ccp.spBuffer, ccp.tBufferSizes, true); bool bIgnoreFolders = GetTaskPropValue(rConfig); bool bForceDirectories = GetTaskPropValue(rConfig); @@ -358,10 +360,10 @@ return TSubTaskBase::eSubResult_Continue; // let the buffer queue know that we change the data source - pData->dbBuffer.DataSourceChanged(); + TOverlappedReaderWriter tReaderWriter(m_spLog->GetLogFileData(), pData->spBuffer); // recreate buffer if needed - AdjustBufferIfNeeded(pData->dbBuffer, pData->tBufferSizes); + AdjustBufferIfNeeded(pData->spBuffer, pData->tBufferSizes); ATLTRACE(_T("CustomCopyFile: %s\n"), pData->spSrcFile->GetFullFilePath().ToString()); @@ -380,10 +382,10 @@ // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data enum { eWriteFinished, eKillThread, eWritePossible, eReadPossible, eHandleCount }; std::array arrHandles = { - pData->dbBuffer.GetEventWriteFinishedHandle(), + tReaderWriter.GetEventWriteFinishedHandle(), rThreadController.GetKillThreadHandle(), - pData->dbBuffer.GetEventWritePossibleHandle(), - pData->dbBuffer.GetEventReadPossibleHandle() + tReaderWriter.GetEventWritePossibleHandle(), + tReaderWriter.GetEventReadPossibleHandle() }; // resume copying from the position after the last processed mark; the proper value should be set @@ -415,7 +417,7 @@ case WAIT_OBJECT_0 + eReadPossible: { - TOverlappedDataBuffer* pBuffer = pData->dbBuffer.GetEmptyBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Read was possible, but no buffer is available", LOCATION); @@ -425,12 +427,12 @@ eResult = tFileFBWrapper.ReadFileFB(fileSrc, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -441,7 +443,7 @@ } case WAIT_OBJECT_0 + eWritePossible: { - TOverlappedDataBuffer* pBuffer = pData->dbBuffer.GetFullBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFullBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); @@ -456,12 +458,12 @@ eResult = tFileFBWrapper.ReadFileFB(fileSrc, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -471,12 +473,12 @@ } else if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -491,12 +493,12 @@ eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -510,7 +512,7 @@ case WAIT_OBJECT_0 + eWriteFinished: { - TOverlappedDataBuffer* pBuffer = pData->dbBuffer.GetFinishedBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); @@ -522,12 +524,12 @@ eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -537,12 +539,12 @@ } else if(eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if(bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -555,12 +557,12 @@ eResult = tFileFBWrapper.FinalizeFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); if (eResult != TSubTaskBase::eSubResult_Continue) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); bStopProcessing = true; } else if (bSkip) { - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -577,8 +579,8 @@ // stop iterating through file bStopProcessing = pBuffer->IsLastPart(); - pData->dbBuffer.MarkFinishedBufferAsComplete(pBuffer); - pData->dbBuffer.AddEmptyBuffer(pBuffer); + tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer); if(bStopProcessing) { @@ -599,7 +601,7 @@ } } - pData->dbBuffer.WaitForMissingBuffersAndResetState(rThreadController.GetKillThreadHandle()); + tReaderWriter.WaitForMissingBuffersAndResetState(rThreadController.GetKillThreadHandle()); return eResult; } @@ -786,7 +788,7 @@ return eResult; } - bool TSubTaskCopyMove::AdjustBufferIfNeeded(TOverlappedDataBufferQueue& rBuffer, TBufferSizes& rBufferSizes, bool bForce) + bool TSubTaskCopyMove::AdjustBufferIfNeeded(const TOverlappedDataBufferQueuePtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce) { const TConfig& rConfig = GetContext().GetConfig(); TTaskConfigTracker& rCfgTracker = GetContext().GetCfgTracker(); @@ -808,7 +810,7 @@ LOG_INFO(m_spLog) << strFormat.c_str(); - rBuffer.ReinitializeBuffers(rBufferSizes.GetBufferCount(), rBufferSizes.GetMaxSize()); + spBuffer->ReinitializeBuffers(rBufferSizes.GetBufferCount(), rBufferSizes.GetMaxSize()); return true; // buffer adjusted } Index: src/libchcore/TSubTaskCopyMove.h =================================================================== diff -u -r12b36349f6214befeace08efa9acc7e03be0d847 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision 12b36349f6214befeace08efa9acc7e03be0d847) +++ src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -29,6 +29,7 @@ #include "TBufferSizes.h" #include "IFilesystemFile.h" #include "../liblogger/TLogger.h" +#include "TOverlappedDataBufferQueue.h" namespace chcore { @@ -38,8 +39,6 @@ class TDataBufferManager; class TSimpleDataBuffer; class TBufferSizes; - class TOverlappedDataBufferQueue; - class TOverlappedDataBuffer; class TFilesystemFileFeedbackWrapper; class LIBCHCORE_API TSubTaskCopyMove : public TSubTaskBase @@ -62,7 +61,7 @@ private: TBufferSizes::EBufferType GetBufferIndex(const TBufferSizes& rBufferSizes, const TFileInfoPtr& spFileInfo); - bool AdjustBufferIfNeeded(TOverlappedDataBufferQueue& rBuffer, TBufferSizes& rBufferSizes, bool bForce = false); + bool AdjustBufferIfNeeded(const TOverlappedDataBufferQueuePtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce = false); ESubOperationResult CustomCopyFileFB(const IFeedbackHandlerPtr& spFeedbackHandler, CUSTOM_COPY_PARAMS* pData); Fisheye: Tag b89aea376d35ce4b0d6506f7d04dba73830d9268 refers to a dead (removed) revision in file `src/libchcore/Tests/TOverlappedDataBufferQueueTests.cpp'. Fisheye: No comparison available. Pass `N' to diff? Index: src/libchcore/Tests/TOverlappedDataBufferTests.cpp =================================================================== diff -u -r4716f17b8dc8af1322503a0fe09a24f31bd5324c -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 4716f17b8dc8af1322503a0fe09a24f31bd5324c) +++ src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -2,21 +2,33 @@ #include "gtest/gtest.h" #include "gmock/gmock.h" #include "../TOverlappedDataBuffer.h" -#include "../TCoreException.h" -#include "../TOverlappedDataBufferQueue.h" +#include "../TOverlappedReaderWriter.h" +#include "../../liblogger/TLogFileData.h" using namespace chcore; -TEST(TOverlappedDataBufferTests, Constructor_InvalidInput) +TEST(TOverlappedDataBufferTests, Constructor_NullParam) { - EXPECT_THROW(TOverlappedDataBuffer(0, nullptr), TCoreException); + TOverlappedDataBuffer buffer(0, nullptr); + + EXPECT_EQ(nullptr, buffer.GetParam()); } +TEST(TOverlappedDataBufferTests, SetParam_GetParam) +{ + int iTest = 5; + + TOverlappedDataBuffer buffer(0, &iTest); + + EXPECT_EQ(&iTest, buffer.GetParam()); +} + TEST(TOverlappedDataBufferTests, Constructor_SanityTest) { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(32768, &queue); EXPECT_EQ(0, buffer.GetBufferOrder()); @@ -35,7 +47,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(32768, &queue); buffer.ReinitializeBuffer(16384); @@ -48,7 +61,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.ReinitializeBuffer(32768); @@ -61,7 +75,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -73,7 +88,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRealDataSize(123); @@ -85,7 +101,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetLastPart(true); @@ -97,7 +114,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetBufferOrder(123); @@ -109,7 +127,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetErrorCode(123); @@ -121,7 +140,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetStatusCode(123); @@ -133,7 +153,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetBytesTransferred(123); @@ -145,7 +166,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetFilePosition(123); @@ -158,7 +180,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -184,7 +207,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -206,7 +230,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -233,7 +258,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); @@ -253,7 +279,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); @@ -273,7 +300,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueue queue(spLogData); + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); Index: src/libchcore/Tests/TOverlappedReaderWriterTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (revision 0) +++ src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -0,0 +1,547 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TOverlappedReaderWriter.h" +#include "../TOverlappedDataBuffer.h" +#include "../TCoreException.h" +#include "../../liblogger/TLogFileData.h" + +using namespace chcore; + +#define EXPECT_TIMEOUT(handle)\ + {\ + DWORD dwResult = WaitForSingleObject(handle, 0); \ + EXPECT_EQ(WAIT_TIMEOUT, dwResult); \ + } + +#define EXPECT_SIGNALED(handle)\ + {\ + DWORD dwResult = WaitForSingleObject(handle, 0); \ + EXPECT_EQ(WAIT_OBJECT_0 + 0, dwResult); \ + } + + +TEST(TOverlappedReaderWriterTests, DefaultConstructor_SanityTest) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_EQ(nullptr, tReaderWriter.GetEmptyBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + + EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); + EXPECT_NE(nullptr, tReaderWriter.GetEventWritePossibleHandle()); + EXPECT_NE(nullptr, tReaderWriter.GetEventWriteFinishedHandle()); + + EXPECT_TIMEOUT(tReaderWriter.GetEventReadPossibleHandle()); + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + + EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); + EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); + + EXPECT_EQ(0, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(0, spBuffers->GetSingleBufferSize()); +} + +TEST(TOverlappedReaderWriterTests, AllocatingConstructor_SanityTest) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + + EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); + EXPECT_NE(nullptr, tReaderWriter.GetEventWritePossibleHandle()); + EXPECT_NE(nullptr, tReaderWriter.GetEventWriteFinishedHandle()); + + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + + EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); + EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); +} + +TEST(TOverlappedReaderWriterTests, AllocatingConstructor_CheckBufferSizes) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(32768, spBuffers->GetSingleBufferSize()); + + EXPECT_EQ(32768, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[2]->GetBufferSize()); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_FailsWithBuffersInUse) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + tReaderWriter.GetEmptyBuffer(); + + EXPECT_THROW(spBuffers->ReinitializeBuffers(3, 65536), TCoreException); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_ZeroLengthBuffers) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_THROW(spBuffers->ReinitializeBuffers(3, 0), TCoreException); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_SameSizeSameCount) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + spBuffers->ReinitializeBuffers(3, 32768); + + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(32768, spBuffers->GetSingleBufferSize()); + + EXPECT_EQ(32768, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[2]->GetBufferSize()); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_IncreaseSize) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + spBuffers->ReinitializeBuffers(3, 65536); + + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(65536, spBuffers->GetSingleBufferSize()); + + EXPECT_EQ(65536, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(65536, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(65536, pBuffers[2]->GetBufferSize()); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_DecreaseSize) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 65536)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + spBuffers->ReinitializeBuffers(3, 32768); + + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(32768, spBuffers->GetSingleBufferSize()); + + EXPECT_EQ(32768, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[2]->GetBufferSize()); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_IncreaseCount) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + spBuffers->ReinitializeBuffers(5, 32768); + + EXPECT_EQ(5, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(32768, spBuffers->GetSingleBufferSize()); + + TOverlappedDataBuffer* pBuffers[5] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(32768, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[2]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[3]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[4]->GetBufferSize()); +} + +TEST(TOverlappedReaderWriterTests, ReinitializeBuffer_DecreaseCount) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(5, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + spBuffers->ReinitializeBuffers(3, 32768); + + EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); + EXPECT_EQ(32768, spBuffers->GetSingleBufferSize()); + + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_EQ(32768, pBuffers[0]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[1]->GetBufferSize()); + EXPECT_EQ(32768, pBuffers[2]->GetBufferSize()); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedReaderWriterTests, GetEmptyBuffer) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + + EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + + EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + + EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); + EXPECT_TIMEOUT(tReaderWriter.GetEventReadPossibleHandle()); + + EXPECT_EQ(nullptr, tReaderWriter.GetEmptyBuffer()); +} + +TEST(TOverlappedReaderWriterTests, AddEmptyBuffer) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + EXPECT_TIMEOUT(tReaderWriter.GetEventReadPossibleHandle()); + + tReaderWriter.AddEmptyBuffer(pBuffers[0]); + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + + tReaderWriter.AddEmptyBuffer(pBuffers[1]); + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); + + tReaderWriter.AddEmptyBuffer(pBuffers[2]); + EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); +} + +TEST(TOverlappedReaderWriterTests, AddEmptyBuffer_Null) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_THROW(tReaderWriter.AddEmptyBuffer(nullptr), TCoreException); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedReaderWriterTests, AddFullBuffer_GetFullBuffer) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); + + tReaderWriter.AddFullBuffer(pBuffer); + EXPECT_SIGNALED(tReaderWriter.GetEventWritePossibleHandle()); + + tReaderWriter.GetFullBuffer(); + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); +} + +TEST(TOverlappedReaderWriterTests, GetFullBuffer_WrongOrder) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + tReaderWriter.AddFullBuffer(pBuffers[1]); + EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + + tReaderWriter.AddFullBuffer(pBuffers[2]); + EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + + tReaderWriter.AddFullBuffer(pBuffers[0]); + EXPECT_NE(nullptr, tReaderWriter.GetFullBuffer()); +} + +TEST(TOverlappedReaderWriterTests, AddFullBuffer_HandlingSrcEof) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[1]->SetLastPart(true); + + tReaderWriter.AddFullBuffer(pBuffers[0]); + EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); + + tReaderWriter.AddFullBuffer(pBuffers[1]); + EXPECT_TRUE(tReaderWriter.IsDataSourceFinished()); +} + +TEST(TOverlappedReaderWriterTests, AddFullBuffer_HandlingDstEof) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[2]->SetLastPart(true); + + tReaderWriter.AddFullBuffer(pBuffers[0]); + tReaderWriter.AddFullBuffer(pBuffers[1]); + tReaderWriter.AddFullBuffer(pBuffers[2]); + + tReaderWriter.GetFullBuffer(); + EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); + + tReaderWriter.GetFullBuffer(); + EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); + + // getting the last buffer (marked as eof) causes setting the data-writing-finished flag + tReaderWriter.GetFullBuffer(); + EXPECT_TRUE(tReaderWriter.IsDataWritingFinished()); +} + +TEST(TOverlappedReaderWriterTests, AddFullBuffer_Null) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_THROW(tReaderWriter.AddFullBuffer(nullptr), TCoreException); +} + +TEST(TOverlappedReaderWriterTests, AddFullBuffer_SameBufferTwice) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); + + pBuffer->InitForRead(0, 1280); + pBuffer->SetBytesTransferred(1230); + pBuffer->SetStatusCode(0); + + tReaderWriter.AddFullBuffer(pBuffer); + EXPECT_THROW(tReaderWriter.AddFullBuffer(pBuffer), TCoreException); +} + +TEST(TOverlappedReaderWriterTests, GetFullBuffer_AddFullBuffer_OutOfOrder) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); + + tReaderWriter.AddFullBuffer(pBuffers[1]); + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); + + tReaderWriter.AddFullBuffer(pBuffers[2]); + EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); + + tReaderWriter.AddFullBuffer(pBuffers[0]); + EXPECT_SIGNALED(tReaderWriter.GetEventWritePossibleHandle()); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedReaderWriterTests, AddFinishedBuffer_OutOfOrder_Signals) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + tReaderWriter.AddFinishedBuffer(pBuffers[1]); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + tReaderWriter.AddFinishedBuffer(pBuffers[2]); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + tReaderWriter.AddFinishedBuffer(pBuffers[0]); + EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); +} + +TEST(TOverlappedReaderWriterTests, GetFinishedBuffer_Signals) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + tReaderWriter.AddFinishedBuffer(pBuffers[1]); + tReaderWriter.AddFinishedBuffer(pBuffers[2]); + tReaderWriter.AddFinishedBuffer(pBuffers[0]); + + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedBuffer(); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); + EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); + + pBuffer = tReaderWriter.GetFinishedBuffer(); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); + EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); + + pBuffer = tReaderWriter.GetFinishedBuffer(); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); + tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); + EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); +} + +TEST(TOverlappedReaderWriterTests, GetFinishedBuffer_WrongOrder) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[0]->InitForRead(0, 1000); + pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->SetStatusCode(0); + + pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->SetStatusCode(0); + + pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->SetBytesTransferred(800); + pBuffers[2]->SetStatusCode(0); + pBuffers[2]->SetLastPart(true); + + tReaderWriter.AddFinishedBuffer(pBuffers[1]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + + tReaderWriter.AddFinishedBuffer(pBuffers[2]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + + tReaderWriter.AddFinishedBuffer(pBuffers[0]); + EXPECT_NE(nullptr, tReaderWriter.GetFinishedBuffer()); +} + +TEST(TOverlappedReaderWriterTests, AddFinishedBuffer_Null) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + EXPECT_THROW(tReaderWriter.AddFinishedBuffer(nullptr), TCoreException); +} + +TEST(TOverlappedReaderWriterTests, AddFinishedBuffer_SameBufferTwice) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); + tReaderWriter.AddFinishedBuffer(pBuffer); + EXPECT_THROW(tReaderWriter.AddFinishedBuffer(pBuffer), TCoreException); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +TEST(TOverlappedReaderWriterTests, DataSourceChanged_CleanupBuffers) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; + + pBuffers[0]->SetLastPart(true); + pBuffers[1]->SetLastPart(true); + pBuffers[2]->SetLastPart(true); + + tReaderWriter.AddFullBuffer(pBuffers[1]); + tReaderWriter.AddFullBuffer(pBuffers[2]); + tReaderWriter.AddFullBuffer(pBuffers[0]); + + // this tests if the buffers are properly cleaned up - if they're not, DataSourceChanged() throws an exception + EXPECT_NO_THROW(tReaderWriter.DataSourceChanged()); +} + +TEST(TOverlappedReaderWriterTests, DataSourceChanged_InvalidBufferCount) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + + TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + + tReaderWriter.GetEmptyBuffer(); + + // this tests if the buffers are properly cleaned up - if they're not, DataSourceChanged() throws an exception + EXPECT_THROW(tReaderWriter.DataSourceChanged(), TCoreException); +} Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -rf8fcbbd1d2321cf0c8be79526c449384af654e49 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision f8fcbbd1d2321cf0c8be79526c449384af654e49) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -508,6 +508,7 @@ + @@ -723,7 +724,7 @@ true true - + true true true @@ -749,6 +750,7 @@ + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -rf8fcbbd1d2321cf0c8be79526c449384af654e49 -rb89aea376d35ce4b0d6506f7d04dba73830d9268 --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision f8fcbbd1d2321cf0c8be79526c449384af654e49) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) @@ -464,6 +464,9 @@ Source Files\Tools + + Source Files\Tools\Data Buffer + @@ -748,9 +751,6 @@ Tests - - Tests - Source Files\Task Config @@ -847,5 +847,11 @@ Source Files\Library files + + Source Files\Tools\Data Buffer + + + Tests + \ No newline at end of file