Index: src/libchcore/OverlappedCallbacks.cpp =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -47,7 +47,7 @@ // in case of error (e.g end-of-file error triggers the difference and dwNumberOfBytesTransfered contains more up-to-date information) pBuffer->SetBytesTransferred(dwNumberOfBytesTransfered); - pQueue->AddFullBuffer(pBuffer); + pQueue->AddFinishedReadBuffer(pBuffer); } VOID CALLBACK OverlappedWriteCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) @@ -58,6 +58,6 @@ pBuffer->SetErrorCode(dwErrorCode); pBuffer->SetBytesTransferred(dwNumberOfBytesTransfered); - pQueue->AddFinishedBuffer(pBuffer); + pQueue->AddFinishedWriteBuffer(pBuffer); } } Index: src/libchcore/TBufferList.cpp =================================================================== diff -u --- src/libchcore/TBufferList.cpp (revision 0) +++ src/libchcore/TBufferList.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,62 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TBufferList.h" +#include "TCoreException.h" + +namespace chcore +{ + TBufferList::TBufferList() + { + } + + void TBufferList::Push(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidArgument, L"pBuffer", LOCATION); + + m_listBuffers.push_front(pBuffer); + } + + TOverlappedDataBuffer* TBufferList::Pop() + { + if(m_listBuffers.empty()) + return nullptr; + + TOverlappedDataBuffer* pBuffer = m_listBuffers.front(); + m_listBuffers.pop_front(); + + return pBuffer; + } + + void TBufferList::Clear() + { + m_listBuffers.clear(); + } + + size_t TBufferList::GetCount() const + { + return m_listBuffers.size(); + } + + bool TBufferList::IsEmpty() const + { + return m_listBuffers.empty(); + } +} Index: src/libchcore/TBufferList.h =================================================================== diff -u --- src/libchcore/TBufferList.h (revision 0) +++ src/libchcore/TBufferList.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,46 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TUNORDEREDBUFFERQUEUE_H__ +#define __TUNORDEREDBUFFERQUEUE_H__ + +namespace chcore +{ + class TOverlappedDataBuffer; + + class TBufferList + { + public: + TBufferList(); + + void Push(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* Pop(); + + void Clear(); + + size_t GetCount() const; + bool IsEmpty() const; + + private: + std::list m_listBuffers; + }; + + using TBufferListPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TFailedBufferQueue.cpp =================================================================== diff -u --- src/libchcore/TFailedBufferQueue.cpp (revision 0) +++ src/libchcore/TFailedBufferQueue.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,97 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TFailedBufferQueue.h" +#include "TOverlappedDataBuffer.h" + +namespace chcore +{ + TFailedBufferQueue::TFailedBufferQueue() : + m_eventHasBuffers(true, false) + { + } + + TOverlappedDataBuffer* TFailedBufferQueue::Pop() + { + if(!IsBufferReady()) + return nullptr; + + TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); + m_setBuffers.erase(m_setBuffers.begin()); + + if(pBuffer->GetFilePosition() == m_ullErrorPosition) + { + // removed the element marked as 'error', calculate the new error position + m_ullErrorPosition = NoPosition; + for(TOverlappedDataBuffer* pBuf : m_setBuffers) + { + if(pBuf->HasError()) + { + m_ullErrorPosition = pBuf->GetFilePosition(); + break; + } + } + } + + UpdateHasBuffers(); + + return pBuffer; + } + + const TOverlappedDataBuffer* const TFailedBufferQueue::Peek() const + { + if(!m_setBuffers.empty()) + return *m_setBuffers.begin(); + return nullptr; + } + + bool TFailedBufferQueue::IsBufferReady() const + { + return !m_setBuffers.empty(); + } + + void TFailedBufferQueue::Clear() + { + m_setBuffers.clear(); + m_eventHasBuffers.ResetEvent(); + } + + size_t TFailedBufferQueue::GetCount() const + { + return m_setBuffers.size(); + } + + bool TFailedBufferQueue::IsEmpty() const + { + return m_setBuffers.empty(); + } + + HANDLE TFailedBufferQueue::GetHasBuffersEvent() const + { + return m_eventHasBuffers.Handle(); + } + + void TFailedBufferQueue::UpdateHasBuffers() + { + if(IsBufferReady()) + m_eventHasBuffers.SetEvent(); + else + m_eventHasBuffers.ResetEvent(); + } +} Index: src/libchcore/TFailedBufferQueue.h =================================================================== diff -u --- src/libchcore/TFailedBufferQueue.h (revision 0) +++ src/libchcore/TFailedBufferQueue.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,104 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TFAILEDBUFFERQUEUE_H__ +#define __TFAILEDBUFFERQUEUE_H__ + +#include +#include "TEvent.h" +#include "TOverlappedDataBuffer.h" + +namespace chcore +{ + class TFailedBufferQueue + { + public: + static const unsigned long long NoPosition = 0xffffffffffffffff; + + public: + TFailedBufferQueue(); + + template + void PushWithFallback(TOverlappedDataBuffer* pBuffer, T& rRetryQueue) + { + if(pBuffer->HasError()) + { + if(pBuffer->GetFilePosition() < m_ullErrorPosition) + { + // case: new buffer failed at even earlier position in file than the one that failed previously (should also work for numeric_limits::max()) + // - move existing buffers with errors to failed read buffers, add current one to full queue + m_ullErrorPosition = pBuffer->GetFilePosition(); + + BufferCollection newQueue; + + for(TOverlappedDataBuffer* pBuf : m_setBuffers) + { + if(pBuf->HasError()) + rRetryQueue.Push(pBuf, true); + else + newQueue.insert(pBuf); + } + + if(newQueue.size() != m_setBuffers.size()) + std::swap(m_setBuffers, newQueue); + } + else if(pBuffer->GetFilePosition() > m_ullErrorPosition) + { + // case: new buffer failed at position later than the one that failed before - add to failed buffers + // for retry + rRetryQueue.Push(pBuffer, true); + return; + } + //else -> case: we've received the same buffer that failed before; add to normal full queue for user to handle that + } + else if(m_ullErrorPosition == pBuffer->GetFilePosition()) + { + // case: adding correctly read buffer that previously failed to read; clear the error flag and add full buffer + m_ullErrorPosition = NoPosition; + } + + m_setBuffers.insert(pBuffer); + UpdateHasBuffers(); + } + + TOverlappedDataBuffer* Pop(); + const TOverlappedDataBuffer* const Peek() const; + + void Clear(); + + size_t GetCount() const; + bool IsEmpty() const; + + HANDLE GetHasBuffersEvent() const; + + private: + bool IsBufferReady() const; + void UpdateHasBuffers(); + + private: + using BufferCollection = std::set; + + BufferCollection m_setBuffers; + TEvent m_eventHasBuffers; + unsigned long long m_ullErrorPosition = NoPosition; + }; + + using TFailedBufferQueuePtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/TOrderedBufferQueue.cpp (.../TOrderedBufferQueue.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -19,17 +19,81 @@ #include "stdafx.h" #include "TOrderedBufferQueue.h" #include "TOverlappedDataBuffer.h" -#include "TCoreException.h" namespace chcore { - bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) const + TOrderedBufferQueue::TOrderedBufferQueue() : + m_eventHasBuffers(true, false) { - if(!pBufferA) - throw TCoreException(eErr_InvalidArgument, L"pBufferA", LOCATION); - if(!pBufferB) - throw TCoreException(eErr_InvalidArgument, L"pBufferB", LOCATION); + } - return pBufferA->GetFilePosition() < pBufferB->GetFilePosition(); + TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) : + m_eventHasBuffers(true, false), + m_ullExpectedBufferPosition(ullExpectedPosition) + { } + + void TOrderedBufferQueue::Push(TOverlappedDataBuffer* pBuffer) + { + m_setBuffers.insert(pBuffer); + UpdateHasBuffers(); + } + + TOverlappedDataBuffer* TOrderedBufferQueue::Pop() + { + if(!IsBufferReady()) + return nullptr; + + TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin(); + m_setBuffers.erase(m_setBuffers.begin()); + + if(!pBuffer->HasError()) + m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize(); + + UpdateHasBuffers(); + + return pBuffer; + } + + const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const + { + if(!m_setBuffers.empty()) + return *m_setBuffers.begin(); + return nullptr; + } + + bool TOrderedBufferQueue::IsBufferReady() const + { + return (!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || (*m_setBuffers.begin())->GetFilePosition() == m_ullExpectedBufferPosition)); + } + + void TOrderedBufferQueue::Clear() + { + m_setBuffers.clear(); + m_ullExpectedBufferPosition = NoPosition; + m_eventHasBuffers.ResetEvent(); + } + + size_t TOrderedBufferQueue::GetCount() const + { + return m_setBuffers.size(); + } + + bool TOrderedBufferQueue::IsEmpty() const + { + return m_setBuffers.empty(); + } + + HANDLE TOrderedBufferQueue::GetHasBuffersEvent() const + { + return m_eventHasBuffers.Handle(); + } + + void TOrderedBufferQueue::UpdateHasBuffers() + { + if(!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || Peek()->GetFilePosition() == m_ullExpectedBufferPosition)) + m_eventHasBuffers.SetEvent(); + else + m_eventHasBuffers.ResetEvent(); + } } Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/TOrderedBufferQueue.h (.../TOrderedBufferQueue.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -19,31 +19,45 @@ #ifndef __TORDEREDBUFFERQUEUE_H__ #define __TORDEREDBUFFERQUEUE_H__ -#include "libchcore.h" #include +#include "TEvent.h" +#include "TOverlappedDataBuffer.h" namespace chcore { - class TOverlappedDataBuffer; - - struct CompareBufferPositions + class TOrderedBufferQueue { - bool operator()(const TOverlappedDataBuffer* rBufferA, const TOverlappedDataBuffer* rBufferB) const; - }; + public: + static const unsigned long long NoPosition = 0xffffffffffffffff; - class TOrderedBufferQueue : public std::set - { public: - TOverlappedDataBuffer* pop_front() - { - if(empty()) - throw std::runtime_error("Bounds exceeded in ordered buffer queue"); + TOrderedBufferQueue(); + TOrderedBufferQueue(unsigned long long ullExpectedPosition); - TOverlappedDataBuffer* pBuffer = *begin(); - erase(begin()); - return pBuffer; - } + void Push(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* Pop(); + const TOverlappedDataBuffer* const Peek() const; + + void Clear(); + + size_t GetCount() const; + bool IsEmpty() const; + + HANDLE GetHasBuffersEvent() const; + + private: + bool IsBufferReady() const; + void UpdateHasBuffers(); + + private: + using BufferCollection = std::set; + + BufferCollection m_setBuffers; + TEvent m_eventHasBuffers; + unsigned long long m_ullExpectedBufferPosition = NoPosition; }; + + using TOrderedBufferQueuePtr = std::shared_ptr; } #endif Index: src/libchcore/TOverlappedDataBuffer.cpp =================================================================== diff -u -rb89aea376d35ce4b0d6506f7d04dba73830d9268 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedDataBuffer.cpp (.../TOverlappedDataBuffer.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/TOverlappedDataBuffer.cpp (.../TOverlappedDataBuffer.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -27,6 +27,16 @@ namespace chcore { + bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) const + { + if(!pBufferA) + throw TCoreException(eErr_InvalidArgument, L"pBufferA", LOCATION); + if(!pBufferB) + throw TCoreException(eErr_InvalidArgument, L"pBufferB", LOCATION); + + return pBufferA->GetFilePosition() < pBufferB->GetFilePosition(); + } + TOverlappedDataBuffer::TOverlappedDataBuffer(size_t stBufferSize, void* pParam) : m_pParam(pParam) { Index: src/libchcore/TOverlappedDataBuffer.h =================================================================== diff -u -re0588f4598dea526e0869360a0f5ee278e7902a0 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision e0588f4598dea526e0869360a0f5ee278e7902a0) +++ src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -25,6 +25,13 @@ namespace chcore { + class TOverlappedDataBuffer; + + struct CompareBufferPositions + { + bool operator()(const TOverlappedDataBuffer* rBufferA, const TOverlappedDataBuffer* rBufferB) const; + }; + class TOverlappedDataBuffer : public OVERLAPPED { public: Index: src/libchcore/TOverlappedMemoryPool.cpp =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedMemoryPool.cpp (.../TOverlappedMemoryPool.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/TOverlappedMemoryPool.cpp (.../TOverlappedMemoryPool.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -21,14 +21,13 @@ #include "TOverlappedDataBuffer.h" #include "TCoreException.h" #include "ErrorCodes.h" -#include #define STATUS_END_OF_FILE 0xc0000011 namespace chcore { TOverlappedMemoryPool::TOverlappedMemoryPool() : - m_eventHasBuffers(true, false), + m_spQueueBuffers(std::make_shared()), m_eventAllBuffersAccountedFor(true, true) { } @@ -43,57 +42,10 @@ { } - TOverlappedDataBuffer* TOverlappedMemoryPool::GetBuffer() - { - if (!m_dequeBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = m_dequeBuffers.front(); - m_dequeBuffers.pop_front(); - - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - - return pBuffer; - } - - return nullptr; - } - - bool TOverlappedMemoryPool::AreAllBuffersAccountedFor() const - { - return m_dequeBuffers.size() == m_listAllBuffers.size(); - } - - void TOverlappedMemoryPool::AddBuffer(TOverlappedDataBuffer* pBuffer) - { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - m_dequeBuffers.push_back(pBuffer); - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - } - - void TOverlappedMemoryPool::UpdateAllBuffersAccountedFor() - { - if (AreAllBuffersAccountedFor()) - m_eventAllBuffersAccountedFor.SetEvent(); - else - m_eventAllBuffersAccountedFor.ResetEvent(); - } - - void TOverlappedMemoryPool::UpdateHasBuffers() - { - if(!m_dequeBuffers.empty()) - m_eventHasBuffers.SetEvent(); - else - m_eventHasBuffers.ResetEvent(); - } - void TOverlappedMemoryPool::ReinitializeBuffers(size_t stCount, size_t stBufferSize) { // sanity check - if any of the buffers are still in use, we can't change the sizes - if (m_listAllBuffers.size() != m_dequeBuffers.size()) + if (m_listAllBuffers.size() != m_spQueueBuffers->GetCount()) throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); if (stBufferSize == 0) throw TCoreException(eErr_InvalidArgument, L"stBufferSize", LOCATION); @@ -102,7 +54,7 @@ { // buffer sizes increased - clear current buffers and proceed with creating new ones m_listAllBuffers.clear(); - m_dequeBuffers.clear(); + m_spQueueBuffers->Clear(); } else if (stCount == m_listAllBuffers.size()) return; // nothing really changed @@ -111,31 +63,26 @@ else if (stCount < m_listAllBuffers.size()) { // there are too many buffers - reduce - m_dequeBuffers.clear(); + m_spQueueBuffers->Clear(); size_t stCountToRemove = m_listAllBuffers.size() - stCount; m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); for (const auto& upElement : m_listAllBuffers) { - m_dequeBuffers.push_back(upElement.get()); + m_spQueueBuffers->Push(upElement.get()); } - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); return; } // allocate buffers while (stCount--) { auto upBuffer = std::make_unique(stBufferSize, nullptr); - m_dequeBuffers.push_back(upBuffer.get()); + m_spQueueBuffers->Push(upBuffer.get()); m_listAllBuffers.push_back(std::move(upBuffer)); } - - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); } size_t TOverlappedMemoryPool::GetTotalBufferCount() const @@ -145,7 +92,7 @@ size_t TOverlappedMemoryPool::GetAvailableBufferCount() const { - return m_dequeBuffers.size(); + return m_spQueueBuffers->GetCount(); } size_t TOverlappedMemoryPool::GetSingleBufferSize() const @@ -156,28 +103,8 @@ return (*m_listAllBuffers.begin())->GetBufferSize(); } - void TOverlappedMemoryPool::WaitForMissingBuffers(HANDLE hKillEvent) const + TBufferListPtr TOverlappedMemoryPool::GetBufferList() const { - enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; - std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; - - bool bExit = false; - while (!bExit) - { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch (dwResult) - { - case STATUS_USER_APC: - break; - - case WAIT_OBJECT_0 + eAllBuffersReturned: - bExit = true; - break; - - case WAIT_OBJECT_0 + eKillThread: - bExit = true; - break; - } - } + return m_spQueueBuffers; } } Index: src/libchcore/TOverlappedMemoryPool.h =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedMemoryPool.h (.../TOverlappedMemoryPool.h) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/TOverlappedMemoryPool.h (.../TOverlappedMemoryPool.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -19,8 +19,8 @@ #ifndef __TOVERLAPPEDDATABUFFERQUEUE_H__ #define __TOVERLAPPEDDATABUFFERQUEUE_H__ -#include #include "TEvent.h" +#include "TBufferList.h" namespace chcore { @@ -41,29 +41,13 @@ size_t GetAvailableBufferCount() const; size_t GetSingleBufferSize() const; - // buffer management - void AddBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetBuffer(); + TBufferListPtr GetBufferList() const;; - // event access - HANDLE GetEventHasBuffers() const { return m_eventHasBuffers.Handle(); } - bool HasBuffers() const { return !m_dequeBuffers.empty(); } - - HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } - bool AreAllBuffersAccountedFor() const; - - void WaitForMissingBuffers(HANDLE hKillEvent) const; - private: - void UpdateAllBuffersAccountedFor(); - void UpdateHasBuffers(); - - private: std::vector> m_listAllBuffers; - std::deque m_dequeBuffers; + TBufferListPtr m_spQueueBuffers; - TEvent m_eventHasBuffers; TEvent m_eventAllBuffersAccountedFor; }; Index: src/libchcore/TOverlappedReader.cpp =================================================================== diff -u --- src/libchcore/TOverlappedReader.cpp (revision 0) +++ src/libchcore/TOverlappedReader.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,104 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedReader.h" +#include "TOverlappedDataBuffer.h" +#include "TCoreException.h" +#include "ErrorCodes.h" + +namespace chcore +{ + TOverlappedReader::TOverlappedReader(const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, + unsigned long long ullFilePos, DWORD dwChunkSize) : + m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), + m_tEmptyBuffers(spEmptyBuffers, ullFilePos, dwChunkSize), + m_tFailedReadBuffers(), + m_spFullBuffers(std::make_shared(ullFilePos)) + { + if(!spEmptyBuffers) + throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); + if(dwChunkSize == 0) + throw TCoreException(eErr_InvalidArgument, L"dwChunkSize", LOCATION); + } + + TOverlappedReader::~TOverlappedReader() + { + } + + TOverlappedDataBuffer* TOverlappedReader::GetEmptyBuffer() + { + return m_tEmptyBuffers.Pop(); + } + + void TOverlappedReader::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Releasing empty buffer; buffer-order: " << pBuffer->GetFilePosition(); + + m_tEmptyBuffers.Push(pBuffer, bKeepPosition); + } + + void TOverlappedReader::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer for re-read; buffer-order: " << pBuffer->GetFilePosition(); + + m_tFailedReadBuffers.PushWithFallback(pBuffer, m_tEmptyBuffers); + } + + TOverlappedDataBuffer* TOverlappedReader::GetFailedReadBuffer() + { + return m_tFailedReadBuffers.Pop(); + } + + void TOverlappedReader::AddFullBuffer(TOverlappedDataBuffer* pBuffer) + { + LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + if(pBuffer->IsLastPart()) + m_tEmptyBuffers.SetDataSourceFinished(pBuffer); + + m_spFullBuffers->Push(pBuffer); + } + + TOverlappedDataBuffer* TOverlappedReader::GetFullBuffer() + { + return m_spFullBuffers->Pop(); + } + + TOrderedBufferQueuePtr TOverlappedReader::GetFinishedQueue() const + { + return m_spFullBuffers; + } + + size_t TOverlappedReader::GetBufferCount() const + { + return m_tFailedReadBuffers.GetCount() + m_spFullBuffers->GetCount(); + } +} Index: src/libchcore/TOverlappedReader.h =================================================================== diff -u --- src/libchcore/TOverlappedReader.h (revision 0) +++ src/libchcore/TOverlappedReader.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,75 @@ +// ============================================================================ +// Copyright (C) 2001-2014 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDREADER_H__ +#define __TOVERLAPPEDREADER_H__ + +#include "../liblogger/TLogFileData.h" +#include "../liblogger/TLogger.h" +#include "TOrderedBufferQueue.h" +#include "TReadBufferQueueWrapper.h" +#include "TFailedBufferQueue.h" + +namespace chcore +{ + class TOverlappedReader + { + private: + static const unsigned long long NoIoError = 0xffffffffffffffff; + + public: + explicit TOverlappedReader(const logger::TLogFileDataPtr& spLogFileData, const TBufferListPtr& spEmptyBuffers, + unsigned long long ullFilePos, DWORD dwChunkSize); + TOverlappedReader(const TOverlappedReader&) = delete; + ~TOverlappedReader(); + + TOverlappedReader& operator=(const TOverlappedReader&) = delete; + + // buffer management + void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); + TOverlappedDataBuffer* GetEmptyBuffer(); + + void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFailedReadBuffer(); + + void AddFullBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFullBuffer(); + + TOrderedBufferQueuePtr GetFinishedQueue() const; + + // processing info + bool IsDataSourceFinished() const { return m_tEmptyBuffers.IsDataSourceFinished(); } + + // event access + HANDLE GetEventReadPossibleHandle() const { return m_tEmptyBuffers.GetHasBuffersEvent(); } + HANDLE GetEventReadFailedHandle() const { return m_tEmptyBuffers.GetHasBuffersEvent(); } + HANDLE GetEventReadFinishedHandle() const { return m_spFullBuffers->GetHasBuffersEvent(); } + + size_t GetBufferCount() const; + + private: + logger::TLoggerPtr m_spLog; + + // queues + TReadBufferQueueWrapper m_tEmptyBuffers; + TFailedBufferQueue m_tFailedReadBuffers; // initialized empty buffers + TOrderedBufferQueuePtr m_spFullBuffers; // buffers with data + }; +} + +#endif Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -re0588f4598dea526e0869360a0f5ee278e7902a0 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision e0588f4598dea526e0869360a0f5ee278e7902a0) +++ src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -26,19 +26,13 @@ namespace chcore { TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spMemoryPool, - file_size_t ullFilePos, DWORD dwChunkSize) : + unsigned long long ullFilePos, DWORD dwChunkSize) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), m_spMemoryPool(spMemoryPool), - m_eventReadPossible(true, true), - m_eventWritePossible(true, false), - m_eventWriteFinished(true, false), - m_eventAllBuffersAccountedFor(true, true), - m_bDataSourceFinished(false), + m_tReader(spLogFileData, spMemoryPool->GetBufferList(), ullFilePos, dwChunkSize), + m_tWriter(spLogFileData, m_tReader.GetFinishedQueue(), ullFilePos), m_bDataWritingFinished(false), - m_dwDataChunkSize(dwChunkSize), - m_ullNextReadBufferOrder(ullFilePos), - m_ullNextWriteBufferOrder(ullFilePos), - m_ullNextFinishedBufferOrder(ullFilePos) + m_eventAllBuffersAccountedFor(true, true) { if(!spMemoryPool) throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); @@ -50,156 +44,72 @@ TOverlappedDataBuffer* TOverlappedReaderWriter::GetEmptyBuffer() { - TOverlappedDataBuffer* pBuffer = nullptr; - - // return buffers to re-read if exists - if(!m_setEmptyBuffers.empty()) - pBuffer = m_setEmptyBuffers.pop_front(); - else + TOverlappedDataBuffer* pBuffer = m_tReader.GetEmptyBuffer(); + if(pBuffer) { - // get empty buffer and initialize - pBuffer = m_spMemoryPool->GetBuffer(); - if(pBuffer) - { - pBuffer->SetParam(this); - pBuffer->InitForRead(m_ullNextReadBufferOrder, m_dwDataChunkSize); - - m_ullNextReadBufferOrder += m_dwDataChunkSize; - } + pBuffer->SetParam(this); + UpdateAllBuffersAccountedFor(); } - // reset the accounted-for event only if we managed to get the pointer, otherwise nothing is changing - if(pBuffer) - m_eventAllBuffersAccountedFor.ResetEvent(); - - UpdateReadPossibleEvent(); // update read-possible always - if we're getting null with read-possible event set (which we should not), we need to reset it - return pBuffer; } - void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - LOG_TRACE(m_spLog) << L"Queuing buffer for re-read; buffer-order: " << pBuffer->GetFilePosition(); - - m_setEmptyBuffers.insert(pBuffer); - - m_eventReadPossible.SetEvent(); + m_tReader.AddEmptyBuffer(pBuffer, bKeepPosition); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFailedReadBuffer() { - if(!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + TOverlappedDataBuffer* pBuffer = m_tReader.GetFailedReadBuffer(); - LOG_TRACE(m_spLog) << L"Releasing empty buffer; buffer-order: " << pBuffer->GetFilePosition(); + if(pBuffer) + UpdateAllBuffersAccountedFor(); - m_spMemoryPool->AddBuffer(pBuffer); - - UpdateReadPossibleEvent(); - UpdateAllBuffersAccountedFor(); + return pBuffer; } - void TOverlappedReaderWriter::UpdateReadPossibleEvent() + void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) { - if(!m_setEmptyBuffers.empty() || (!m_bDataSourceFinished && m_spMemoryPool->HasBuffers())) - m_eventReadPossible.SetEvent(); - else - m_eventReadPossible.ResetEvent(); + m_tReader.AddFailedReadBuffer(pBuffer); + UpdateAllBuffersAccountedFor(); } - TOverlappedDataBuffer* TOverlappedReaderWriter::GetFullBuffer() + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedReadBuffer() { - if (!m_setFullBuffers.empty()) + TOverlappedDataBuffer* pBuffer = m_tReader.GetFullBuffer(); + + if(pBuffer) { - TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetFilePosition() != m_ullNextWriteBufferOrder) - return nullptr; + if (pBuffer->IsLastPart()) + m_bDataWritingFinished = true; - m_setFullBuffers.erase(m_setFullBuffers.begin()); + pBuffer->SetParam(this); - if(!pBuffer->HasError()) - { - // if this is the last part - mark that writing is finished, so that no other buffer will be written - if(pBuffer->IsLastPart()) - m_bDataWritingFinished = true; - - m_ullNextWriteBufferOrder += m_dwDataChunkSize; - } - - UpdateWritePossibleEvent(); - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; + UpdateAllBuffersAccountedFor(); } - return nullptr; + return pBuffer; } - void TOverlappedReaderWriter::AddFullBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFinishedReadBuffer(TOverlappedDataBuffer* pBuffer) { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + m_tReader.AddFullBuffer(pBuffer); - if(pBuffer->HasError()) - { - if(pBuffer->GetFilePosition() < m_ullReadErrorOrder) - { - // case: new buffer failed at even earlier position in file than the one that failed previously (should also work for numeric_limits::max()) - // - move existing buffers with errors to failed read buffers, add current one to full queue - m_ullReadErrorOrder = pBuffer->GetFilePosition(); + UpdateAllBuffersAccountedFor(); + } - TOrderedBufferQueue newQueue; + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFailedWriteBuffer() + { + TOverlappedDataBuffer* pBuffer = m_tWriter.GetFailedWriteBuffer(); - for(TOverlappedDataBuffer* pBuf : m_setFullBuffers) - { - if(pBuf->HasError()) - AddFailedReadBuffer(pBuf); - else - newQueue.insert(pBuf); - } - - if(newQueue.size() != m_setFullBuffers.size()) - std::swap(m_setFullBuffers, newQueue); - } - else if(pBuffer->GetFilePosition() > m_ullReadErrorOrder) - { - // case: new buffer failed at position later than the one that failed before - add to failed buffers - // for retry - AddFailedReadBuffer(pBuffer); - return; - } - //else -> case: we've received the same buffer that failed before; add to normal full queue for user to handle that - } - else if(m_ullReadErrorOrder == pBuffer->GetFilePosition()) - { - // case: adding correctly read buffer that previously failed to read; clear the error flag and add full buffer - m_ullReadErrorOrder = NoIoError; - } - - LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetFilePosition() << - L", requested-data-size: " << pBuffer->GetRequestedDataSize() << - L", real-data-size: " << pBuffer->GetRealDataSize() << - L", file-position: " << pBuffer->GetFilePosition() << - L", error-code: " << pBuffer->GetErrorCode() << - L", status-code: " << pBuffer->GetStatusCode() << - L", is-last-part: " << pBuffer->IsLastPart(); - - auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if (pBuffer->IsLastPart()) - m_bDataSourceFinished = true; - - UpdateWritePossibleEvent(); UpdateAllBuffersAccountedFor(); + + return pBuffer; } - void TOverlappedReaderWriter::AddFailedFullBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if(!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); @@ -214,48 +124,18 @@ // overwrite error code (to avoid treating the buffer as failed read) pBuffer->SetErrorCode(ERROR_SUCCESS); + m_tWriter.AddFailedWriteBuffer(pBuffer); - auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); - if(!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); - - if(pBuffer->IsLastPart()) - m_bDataSourceFinished = true; - - UpdateWritePossibleEvent(); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::UpdateWritePossibleEvent() + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedWriteBuffer() { - if (m_bDataWritingFinished || m_setFullBuffers.empty()) - m_eventWritePossible.ResetEvent(); - else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetFilePosition() == m_ullNextWriteBufferOrder) - m_eventWritePossible.SetEvent(); - else - m_eventWritePossible.ResetEvent(); - } - } + TOverlappedDataBuffer* pBuffer = m_tWriter.GetFinishedBuffer(); - TOverlappedDataBuffer* TOverlappedReaderWriter::GetFinishedBuffer() - { - if (!m_setFinishedBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if (pBuffer->GetFilePosition() != m_ullNextFinishedBufferOrder) - return nullptr; + if(!pBuffer) + UpdateAllBuffersAccountedFor(); - m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); - - m_eventWriteFinished.ResetEvent(); // faster than UpdateWriteFinishedEvent() and the final effect should be the same - m_eventAllBuffersAccountedFor.ResetEvent(); - - return pBuffer; - } - return nullptr; } @@ -265,11 +145,10 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); // allow next finished buffer to be processed - m_ullNextFinishedBufferOrder += m_dwDataChunkSize; - UpdateWriteFinishedEvent(); + //m_ullNextFinishedBufferOrder += m_dwDataChunkSize; } - void TOverlappedReaderWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFinishedWriteBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); @@ -282,74 +161,20 @@ L", status-code: " << pBuffer->GetStatusCode() << L", is-last-part: " << pBuffer->IsLastPart(); - auto pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); - if (!pairInsertInfo.second) - throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); + m_tWriter.AddFinishedBuffer(pBuffer); - UpdateWriteFinishedEvent(); UpdateAllBuffersAccountedFor(); } - void TOverlappedReaderWriter::UpdateWriteFinishedEvent() - { - if (m_setFinishedBuffers.empty()) - m_eventWriteFinished.ResetEvent(); - else - { - TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetFilePosition() == m_ullNextFinishedBufferOrder) - m_eventWriteFinished.SetEvent(); - else - m_eventWriteFinished.ResetEvent(); - } - } - void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() { - size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_setFullBuffers.size() + m_setFinishedBuffers.size() + m_setEmptyBuffers.size(); + size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_tReader.GetBufferCount() + m_tWriter.GetBufferCount(); if (stCurrentBuffers == m_spMemoryPool->GetTotalBufferCount()) m_eventAllBuffersAccountedFor.SetEvent(); else m_eventAllBuffersAccountedFor.ResetEvent(); } - void TOverlappedReaderWriter::DataSourceChanged() - { - CleanupBuffers(); - - if (!m_spMemoryPool->AreAllBuffersAccountedFor()) - throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); - - m_bDataSourceFinished = false; - m_bDataWritingFinished = false; - m_ullNextReadBufferOrder = 0; - m_ullNextWriteBufferOrder = 0; - m_ullNextFinishedBufferOrder = 0; - - m_eventWritePossible.ResetEvent(); - m_eventWriteFinished.ResetEvent(); - } - - void TOverlappedReaderWriter::CleanupBuffers() - { - // function sanitizes the buffer locations (empty/full/finished) - i.e. when there is full buffer that have no data, is marked eof and we are in the eof state - // then this buffer is really the empty one - if (m_bDataSourceFinished && !m_setFullBuffers.empty()) - { - auto iterCurrent = m_setFullBuffers.begin(); - while (iterCurrent != m_setFullBuffers.end()) - { - if ((*iterCurrent)->IsLastPart()) - { - m_spMemoryPool->AddBuffer(*iterCurrent); - iterCurrent = m_setFullBuffers.erase(iterCurrent); - } - else - ++iterCurrent; - } - } - } - void TOverlappedReaderWriter::WaitForMissingBuffersAndResetState(HANDLE hKillEvent) { enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; @@ -373,13 +198,14 @@ break; } } +/* auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spMemoryPool->AddBuffer(pBuffer); }; - std::for_each(m_setFullBuffers.begin(), m_setFullBuffers.end(), funcAdd); - std::for_each(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), funcAdd); + std::for_each(m_spFailedWriteBuffers->begin(), m_spFailedWriteBuffers->end(), funcAdd); + std::for_each(m_spFinishedBuffers.begin(), m_spFinishedBuffers.end(), funcAdd); - m_setFinishedBuffers.clear(); - m_setFullBuffers.clear(); + m_spFinishedBuffers.clear(); + m_spFailedWriteBuffers->clear();*/ } } Index: src/libchcore/TOverlappedReaderWriter.h =================================================================== diff -u -re0588f4598dea526e0869360a0f5ee278e7902a0 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision e0588f4598dea526e0869360a0f5ee278e7902a0) +++ src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -24,7 +24,8 @@ #include "../liblogger/TLogger.h" #include "TOverlappedMemoryPool.h" #include "TOrderedBufferQueue.h" -#include "IFilesystemFile.h" +#include "TOverlappedReader.h" +#include "TOverlappedWriter.h" namespace chcore { @@ -35,70 +36,59 @@ public: explicit TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spBuffers, - file_size_t ullFilePos, DWORD dwChunkSize); + unsigned long long ullFilePos, DWORD dwChunkSize); TOverlappedReaderWriter(const TOverlappedReaderWriter&) = delete; ~TOverlappedReaderWriter(); TOverlappedReaderWriter& operator=(const TOverlappedReaderWriter&) = delete; - // buffer management - void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); - void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer); + // buffer management - reader TOverlappedDataBuffer* GetEmptyBuffer(); + void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); - void AddFullBuffer(TOverlappedDataBuffer* pBuffer); - void AddFailedFullBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetFullBuffer(); + TOverlappedDataBuffer* GetFailedReadBuffer(); + void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); - void AddFinishedBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetFinishedBuffer(); - void MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFinishedReadBuffer(); + void AddFinishedReadBuffer(TOverlappedDataBuffer* pBuffer); - // data source change - void DataSourceChanged(); + // buffer management - writer + TOverlappedDataBuffer* GetFailedWriteBuffer(); + void AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer); + void AddFinishedWriteBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFinishedWriteBuffer(); + + void MarkFinishedBufferAsComplete(TOverlappedDataBuffer* pBuffer); + // processing info - bool IsDataSourceFinished() const { return m_bDataSourceFinished; } - bool IsDataWritingFinished() const { return m_bDataWritingFinished; } + bool IsDataSourceFinished() const { return m_tReader.IsDataSourceFinished(); } + //bool IsDataWritingFinished() const { return m_bDataWritingFinished; } // event access - HANDLE GetEventReadPossibleHandle() const { return m_eventReadPossible.Handle(); } - HANDLE GetEventWritePossibleHandle() const { return m_eventWritePossible.Handle(); } - HANDLE GetEventWriteFinishedHandle() const { return m_eventWriteFinished.Handle(); } + HANDLE GetEventReadPossibleHandle() const { return m_tReader.GetEventReadPossibleHandle(); } + HANDLE GetEventReadFailedHandle() const { return m_tReader.GetEventReadFailedHandle(); } + HANDLE GetEventWritePossibleHandle() const { return m_tReader.GetEventReadFinishedHandle(); } + + HANDLE GetEventWriteFailedHandle() const { return m_tWriter.GetEventWriteFailedHandle(); } + HANDLE GetEventWriteFinishedHandle() const { return m_tWriter.GetEventWriteFinishedHandle(); } + HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } void WaitForMissingBuffersAndResetState(HANDLE hKillEvent); private: - void CleanupBuffers(); - void UpdateReadPossibleEvent(); - void UpdateWritePossibleEvent(); - void UpdateWriteFinishedEvent(); void UpdateAllBuffersAccountedFor(); private: logger::TLoggerPtr m_spLog; TOverlappedMemoryPoolPtr m_spMemoryPool; + TOverlappedReader m_tReader; + TOverlappedWriter m_tWriter; - TOrderedBufferQueue m_setEmptyBuffers; // initialized empty buffers - TOrderedBufferQueue m_setFullBuffers; - TOrderedBufferQueue m_setFinishedBuffers; - - bool m_bDataSourceFinished = false; // input file was already read to the end bool m_bDataWritingFinished = false; // output file was already written to the end - DWORD m_dwDataChunkSize = 0; - - unsigned long long m_ullNextReadBufferOrder = 0; // next order id for read buffers - unsigned long long m_ullReadErrorOrder = NoIoError; - - unsigned long long m_ullNextWriteBufferOrder = 0; // next order id to be processed when writing - unsigned long long m_ullNextFinishedBufferOrder = 0; // next order id to be processed when finishing writing - - TEvent m_eventReadPossible; - TEvent m_eventWritePossible; - TEvent m_eventWriteFinished; TEvent m_eventAllBuffersAccountedFor; }; } Index: src/libchcore/TOverlappedWriter.cpp =================================================================== diff -u --- src/libchcore/TOverlappedWriter.cpp (revision 0) +++ src/libchcore/TOverlappedWriter.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,92 @@ +// ============================================================================ +// Copyright (C) 2001-2015 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedWriter.h" +#include "TOverlappedDataBuffer.h" +#include "TCoreException.h" +#include "ErrorCodes.h" + +namespace chcore +{ + TOverlappedWriter::TOverlappedWriter(const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, + unsigned long long ullFilePos) : + m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), + m_tBuffersToWrite(spBuffersToWrite), + m_tFailedWriteBuffers(ullFilePos), + m_tFinishedBuffers(ullFilePos), + m_bDataWritingFinished(false) + { + if(!spBuffersToWrite) + throw TCoreException(eErr_InvalidArgument, L"spBuffersToWrite", LOCATION); + } + + TOverlappedWriter::~TOverlappedWriter() + { + } + + void TOverlappedWriter::AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as full (failed); buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + // overwrite error code (to avoid treating the buffer as failed read) + pBuffer->SetErrorCode(ERROR_SUCCESS); + + m_tFailedWriteBuffers.Push(pBuffer); + } + + TOverlappedDataBuffer* TOverlappedWriter::GetFailedWriteBuffer() + { + return m_tFailedWriteBuffers.Pop(); + } + + TOverlappedDataBuffer* TOverlappedWriter::GetFinishedBuffer() + { + return m_tFinishedBuffers.Pop(); + } + + void TOverlappedWriter::AddFinishedBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + m_tFinishedBuffers.Push(pBuffer); + } + + size_t TOverlappedWriter::GetBufferCount() const + { + return m_tBuffersToWrite.GetCount() + m_tFailedWriteBuffers.GetCount() + m_tFinishedBuffers.GetCount(); + } +} Index: src/libchcore/TOverlappedWriter.h =================================================================== diff -u --- src/libchcore/TOverlappedWriter.h (revision 0) +++ src/libchcore/TOverlappedWriter.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,72 @@ +// ============================================================================ +// Copyright (C) 2001-2014 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDWRITER_H__ +#define __TOVERLAPPEDWRITER_H__ + +#include "../liblogger/TLogFileData.h" +#include "../liblogger/TLogger.h" +#include "TOrderedBufferQueue.h" +#include "TFailedBufferQueue.h" +#include "TWriteBufferQueueWrapper.h" + +namespace chcore +{ + class TOverlappedWriter + { + private: + static const unsigned long long NoIoError = 0xffffffffffffffff; + + public: + explicit TOverlappedWriter(const logger::TLogFileDataPtr& spLogFileData, const TOrderedBufferQueuePtr& spBuffersToWrite, + unsigned long long ullFilePos); + TOverlappedWriter(const TOverlappedWriter&) = delete; + ~TOverlappedWriter(); + + TOverlappedWriter& operator=(const TOverlappedWriter&) = delete; + + // buffer management - writer + void AddFailedWriteBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFailedWriteBuffer(); + + void AddFinishedBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetFinishedBuffer(); + + // processing info + bool IsDataWritingFinished() const { return m_bDataWritingFinished; } + + // event access + HANDLE GetEventWritePossibleHandle() const { return m_tBuffersToWrite.GetHasBuffersEvent(); } + HANDLE GetEventWriteFailedHandle() const { return m_tFailedWriteBuffers.GetHasBuffersEvent(); } + HANDLE GetEventWriteFinishedHandle() const { return m_tFinishedBuffers.GetHasBuffersEvent(); } + + size_t GetBufferCount() const; + + private: + logger::TLoggerPtr m_spLog; + + TWriteBufferQueueWrapper m_tBuffersToWrite; + + TOrderedBufferQueue m_tFailedWriteBuffers; + TOrderedBufferQueue m_tFinishedBuffers; + + bool m_bDataWritingFinished = false; // output file was already written to the end + }; +} + +#endif Index: src/libchcore/TReadBufferQueueWrapper.cpp =================================================================== diff -u --- src/libchcore/TReadBufferQueueWrapper.cpp (revision 0) +++ src/libchcore/TReadBufferQueueWrapper.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,137 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TReadBufferQueueWrapper.h" +#include "TOverlappedDataBuffer.h" + +namespace chcore +{ + TReadBufferQueueWrapper::TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize) : + m_spUnorderedQueue(spUnorderedQueue), + m_ullNextReadPosition(ullNextReadPosition), + m_dwChunkSize(dwChunkSize), + m_eventHasBuffers(true, false) + { + } + + void TReadBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition) + { + if(!bKeepPosition) + { + if(IsDataSourceFinished()) + m_spUnorderedQueue->Push(pBuffer); + else + { + pBuffer->InitForRead(m_ullNextReadPosition, m_dwChunkSize); + m_ullNextReadPosition += m_dwChunkSize; + m_tClaimedQueue.Push(pBuffer); + } + } + else + m_tClaimedQueue.Push(pBuffer); + + UpdateHasBuffers(); + } + + TOverlappedDataBuffer* TReadBufferQueueWrapper::Pop() + { + if(!IsBufferReady()) + return nullptr; + + // always return retry buffers first + TOverlappedDataBuffer* pBuffer = m_tClaimedQueue.Pop(); + if(!pBuffer) + { + pBuffer = m_spUnorderedQueue->Pop(); + if(pBuffer) + { + pBuffer->InitForRead(m_ullNextReadPosition, m_dwChunkSize); + m_ullNextReadPosition += m_dwChunkSize; + } + } + + if(pBuffer) + UpdateHasBuffers(); + + return pBuffer; + } + + bool TReadBufferQueueWrapper::IsBufferReady() const + { + if(IsDataSourceFinished()) + { + if(m_tClaimedQueue.IsEmpty()) + return false; + + const TOverlappedDataBuffer* const pFirstBuffer = m_tClaimedQueue.Peek(); + if(pFirstBuffer->GetFilePosition() <= m_ullDataSourceFinishedPos) + return true; + + return false; + } + else + return !m_tClaimedQueue.IsEmpty() || !m_spUnorderedQueue->IsEmpty(); + } + + void TReadBufferQueueWrapper::Clear() + { + m_spUnorderedQueue->Clear(); + m_ullNextReadPosition = 0; + m_dwChunkSize = 0; + m_ullDataSourceFinishedPos = NoPosition; + m_eventHasBuffers.ResetEvent(); + } + + size_t TReadBufferQueueWrapper::GetCount() const + { + return m_spUnorderedQueue->GetCount(); + } + + bool TReadBufferQueueWrapper::IsEmpty() const + { + return m_spUnorderedQueue->IsEmpty(); + } + + void TReadBufferQueueWrapper::SetDataSourceFinished(TOverlappedDataBuffer* pBuffer) + { + if(pBuffer->IsLastPart()) + { + if(pBuffer->GetFilePosition() < m_ullDataSourceFinishedPos) + { + m_ullDataSourceFinishedPos = pBuffer->GetFilePosition(); + // #todo: release excessive claimed buffers + } + } + } + + bool TReadBufferQueueWrapper::IsDataSourceFinished() const + { + return m_ullDataSourceFinishedPos != NoPosition; + } + + HANDLE TReadBufferQueueWrapper::GetHasBuffersEvent() const + { + return m_eventHasBuffers.Handle(); + } + + void TReadBufferQueueWrapper::UpdateHasBuffers() + { + m_eventHasBuffers.SetEvent(IsBufferReady()); + } +} Index: src/libchcore/TReadBufferQueueWrapper.h =================================================================== diff -u --- src/libchcore/TReadBufferQueueWrapper.h (revision 0) +++ src/libchcore/TReadBufferQueueWrapper.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,71 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TUNORDEREDBUFFERQUEUEWRAPPERWRAPPER_H__ +#define __TUNORDEREDBUFFERQUEUEWRAPPERWRAPPER_H__ + +#include "TEvent.h" +#include "TBufferList.h" +#include "TOrderedBufferQueue.h" + +namespace chcore +{ + class TOverlappedDataBuffer; + + class TReadBufferQueueWrapper + { + public: + static const unsigned long long NoPosition = 0xffffffffffffffff; + + public: + TReadBufferQueueWrapper(const TBufferListPtr& spUnorderedQueue, unsigned long long ullNextReadPosition, DWORD dwChunkSize); + + void Push(TOverlappedDataBuffer* pBuffer, bool bKeepPosition); + TOverlappedDataBuffer* Pop(); + + bool IsBufferReady() const; + + void Clear(); + + size_t GetCount() const; + bool IsEmpty() const; + + void SetDataSourceFinished(TOverlappedDataBuffer* pBuffer); + bool IsDataSourceFinished() const; + + HANDLE GetHasBuffersEvent() const; + + private: + void UpdateHasBuffers(); + + private: + TBufferListPtr m_spUnorderedQueue; // external queue of buffers to use + TOrderedBufferQueue m_tClaimedQueue; // internal queue of claimed buffers + + TEvent m_eventHasBuffers; + + unsigned long long m_ullNextReadPosition = 0; // next position for read buffers + DWORD m_dwChunkSize = 0; + + unsigned long long m_ullDataSourceFinishedPos = NoPosition; + }; + + using TUnorderedBufferQueueWrapperPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -re0588f4598dea526e0869360a0f5ee278e7902a0 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision e0588f4598dea526e0869360a0f5ee278e7902a0) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -384,11 +384,13 @@ // that also means that we don't want to queue reads or writes anymore - all the data that were read until now, will be lost // - write possible - we're prioritizing write queuing here to empty buffers as soon as possible // - read possible - lowest priority - if we don't have anything to write or finalize , then read another part of source data - enum { eWriteFinished, eKillThread, eWritePossible, eReadPossible, eHandleCount }; + enum { eWriteFinished, eKillThread, eWriteFailed, eWritePossible, eReadFailed, eReadPossible, eHandleCount }; std::array arrHandles = { - tReaderWriter.GetEventWriteFinishedHandle(), rThreadController.GetKillThreadHandle(), + tReaderWriter.GetEventWriteFinishedHandle(), + tReaderWriter.GetEventWriteFailedHandle(), tReaderWriter.GetEventWritePossibleHandle(), + tReaderWriter.GetEventReadFailedHandle(), tReaderWriter.GetEventReadPossibleHandle() }; @@ -424,12 +426,12 @@ eResult = tFileFBWrapper.ReadFileFB(fileSrc, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { - tReaderWriter.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer, false); bStopProcessing = true; } else if(bSkip) { - tReaderWriter.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer, false); AdjustProcessedSizeForSkip(pData->spSrcFile); @@ -438,122 +440,129 @@ } break; } + case WAIT_OBJECT_0 + eReadFailed: + { + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFailedReadBuffer(); + if (!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Cannot retrieve failed read buffer", LOCATION); + + // read error encountered - handle it + eResult = HandleReadError(spFeedbackHandler, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); + if(eResult == TSubTaskBase::eSubResult_Retry) + tReaderWriter.AddEmptyBuffer(pBuffer, true); + else if(eResult != TSubTaskBase::eSubResult_Continue) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + } + else if(bSkip) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); + + AdjustProcessedSizeForSkip(pData->spSrcFile); + + pData->bProcessed = false; + bStopProcessing = true; + } + + break; + } case WAIT_OBJECT_0 + eWritePossible: { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFullBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedReadBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write was possible, but no buffer is available", LOCATION); // was there an error reported? - if(pBuffer->HasError()) + pBuffer->InitForWrite(); + + eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); + if(eResult != TSubTaskBase::eSubResult_Continue) { - // read error encountered - handle it - eResult = HandleReadError(spFeedbackHandler, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); - if(eResult == TSubTaskBase::eSubResult_Retry) - tReaderWriter.AddFailedReadBuffer(pBuffer); - else if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + } + else if(bSkip) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); - AdjustProcessedSizeForSkip(pData->spSrcFile); + AdjustProcessedSizeForSkip(pData->spSrcFile); - pData->bProcessed = false; - bStopProcessing = true; - } + pData->bProcessed = false; + bStopProcessing = true; } - else - { - pBuffer->InitForWrite(); - eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); + break; + } - AdjustProcessedSizeForSkip(pData->spSrcFile); + case WAIT_OBJECT_0 + eWriteFailed: + { + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFailedWriteBuffer(); + if (!pBuffer) + throw TCoreException(eErr_InternalProblem, L"Failed to retrieve write failed buffer", LOCATION); - pData->bProcessed = false; - bStopProcessing = true; - } + eResult = HandleWriteError(spFeedbackHandler, *pBuffer, pData->pathDstFile, bSkip); + if(eResult == TSubTaskBase::eSubResult_Retry) + tReaderWriter.AddFailedWriteBuffer(pBuffer); + else if(eResult != TSubTaskBase::eSubResult_Continue) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; } + else if(bSkip) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); + AdjustProcessedSizeForSkip(pData->spSrcFile); + + pData->bProcessed = false; + bStopProcessing = true; + } + break; } case WAIT_OBJECT_0 + eWriteFinished: { - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedWriteBuffer(); if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Write finished was possible, but no buffer is available", LOCATION); - if(pBuffer->HasError()) + eResult = tFileFBWrapper.FinalizeFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); + if (eResult != TSubTaskBase::eSubResult_Continue) { - eResult = HandleWriteError(spFeedbackHandler, *pBuffer, pData->pathDstFile, bSkip); - if(eResult == TSubTaskBase::eSubResult_Retry) - tReaderWriter.AddFailedFullBuffer(pBuffer); - else if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer, false); + bStopProcessing = true; + } + else if (bSkip) + { + tReaderWriter.AddEmptyBuffer(pBuffer, false); - AdjustProcessedSizeForSkip(pData->spSrcFile); + AdjustProcessedSizeForSkip(pData->spSrcFile); - pData->bProcessed = false; - bStopProcessing = true; - } + pData->bProcessed = false; + bStopProcessing = true; } else { - eResult = tFileFBWrapper.FinalizeFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); - if (eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if (bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); + file_size_t fsWritten = pBuffer->GetRealDataSize(); - AdjustProcessedSizeForSkip(pData->spSrcFile); + // in case we read past the original eof, try to get new file size from filesystem + AdjustProcessedSize(fsWritten, pData->spSrcFile, fileSrc); - pData->bProcessed = false; - bStopProcessing = true; - } - else - { - file_size_t fsWritten = pBuffer->GetRealDataSize(); + // stop iterating through file + bStopProcessing = pBuffer->IsLastPart(); - // in case we read past the original eof, try to get new file size from filesystem - AdjustProcessedSize(fsWritten, pData->spSrcFile, fileSrc); + tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); + tReaderWriter.AddEmptyBuffer(pBuffer, false); - // stop iterating through file - bStopProcessing = pBuffer->IsLastPart(); + if(bStopProcessing) + { + // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for + AdjustFinalSize(pData->spSrcFile, fileSrc); - tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); - tReaderWriter.AddEmptyBuffer(pBuffer); - - if(bStopProcessing) - { - // this is the end of copying of src file - in case it is smaller than expected fix the stats so that difference is accounted for - AdjustFinalSize(pData->spSrcFile, fileSrc); - - pData->bProcessed = true; - m_tSubTaskStats.ResetCurrentItemProcessedSize(); - } + pData->bProcessed = true; + m_tSubTaskStats.ResetCurrentItemProcessedSize(); } } Index: src/libchcore/TWriteBufferQueueWrapper.cpp =================================================================== diff -u --- src/libchcore/TWriteBufferQueueWrapper.cpp (revision 0) +++ src/libchcore/TWriteBufferQueueWrapper.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,83 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TWriteBufferQueueWrapper.h" +#include "TOverlappedDataBuffer.h" + +namespace chcore +{ + TWriteBufferQueueWrapper::TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue) : + m_spDataQueue(spQueue), + m_eventHasBuffers(true, false) + { + } + + void TWriteBufferQueueWrapper::Push(TOverlappedDataBuffer* pBuffer) + { + m_tClaimedQueue.Push(pBuffer); + UpdateHasBuffers(); + } + + TOverlappedDataBuffer* TWriteBufferQueueWrapper::Pop() + { + TOverlappedDataBuffer* pBuffer = m_tClaimedQueue.Pop(); + if(!pBuffer) + pBuffer = m_spDataQueue->Pop(); + + if(pBuffer) + { + pBuffer->InitForWrite(); + UpdateHasBuffers(); + } + + return pBuffer; + } + + bool TWriteBufferQueueWrapper::IsBufferReady() const + { + return !m_tClaimedQueue.IsEmpty() || !m_spDataQueue->IsEmpty(); + } + + void TWriteBufferQueueWrapper::Clear() + { + m_spDataQueue->Clear(); + m_tClaimedQueue.Clear(); + m_eventHasBuffers.ResetEvent(); + } + + size_t TWriteBufferQueueWrapper::GetCount() const + { + return m_spDataQueue->GetCount(); + } + + bool TWriteBufferQueueWrapper::IsEmpty() const + { + return m_spDataQueue->IsEmpty(); + } + + HANDLE TWriteBufferQueueWrapper::GetHasBuffersEvent() const + { + return m_eventHasBuffers.Handle(); + } + + void TWriteBufferQueueWrapper::UpdateHasBuffers() + { + m_eventHasBuffers.SetEvent(IsBufferReady()); + } +} Index: src/libchcore/TWriteBufferQueueWrapper.h =================================================================== diff -u --- src/libchcore/TWriteBufferQueueWrapper.h (revision 0) +++ src/libchcore/TWriteBufferQueueWrapper.h (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,62 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TWRITEBUFFERQUEUEWRAPPERWRAPPER_H__ +#define __TWRITEBUFFERQUEUEWRAPPERWRAPPER_H__ + +#include "TEvent.h" +#include "TOrderedBufferQueue.h" + +namespace chcore +{ + class TOverlappedDataBuffer; + + class TWriteBufferQueueWrapper + { + public: + static const unsigned long long NoPosition = 0xffffffffffffffff; + + public: + TWriteBufferQueueWrapper(const TOrderedBufferQueuePtr& spQueue); + + void Push(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* Pop(); + + bool IsBufferReady() const; + + void Clear(); + + size_t GetCount() const; + bool IsEmpty() const; + + HANDLE GetHasBuffersEvent() const; + + private: + void UpdateHasBuffers(); + + private: + TOrderedBufferQueuePtr m_spDataQueue; // external queue of buffers to use + TOrderedBufferQueue m_tClaimedQueue; // internal queue of claimed buffers + + TEvent m_eventHasBuffers; + }; + + using TWriteBufferQueueWrapperPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/Tests/OverlappedCallbacksTests.cpp =================================================================== diff -u --- src/libchcore/Tests/OverlappedCallbacksTests.cpp (revision 0) +++ src/libchcore/Tests/OverlappedCallbacksTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,11 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../OverlappedCallbacks.h" + +using namespace chcore; + +TEST(OverlappedCallbackTests, DefaultTest) +{ + +} Index: src/libchcore/Tests/TBufferListTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TBufferListTests.cpp (revision 0) +++ src/libchcore/Tests/TBufferListTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,11 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TBufferList.h" + +using namespace chcore; + +TEST(TBufferListTests, DefaultTest) +{ + TBufferList bufferList; +} Index: src/libchcore/Tests/TFailedBufferQueueTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TFailedBufferQueueTests.cpp (revision 0) +++ src/libchcore/Tests/TFailedBufferQueueTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,11 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TFailedBufferQueue.h" + +using namespace chcore; + +TEST(TFailedBufferQueueTests, DefaultTest) +{ + TFailedBufferQueue queue; +} Index: src/libchcore/Tests/TOrderedBufferQueueTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TOrderedBufferQueueTests.cpp (revision 0) +++ src/libchcore/Tests/TOrderedBufferQueueTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,11 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TOrderedBufferQueue.h" + +using namespace chcore; + +TEST(TOrderedBufferQueueTests, DefaultTest) +{ + TOrderedBufferQueue bufferList; +} Index: src/libchcore/Tests/TOverlappedDataBufferTests.cpp =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -259,7 +259,7 @@ EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); EXPECT_EQ(234, buffer.GetRealDataSize()); - EXPECT_EQ(queue.GetFullBuffer(), &buffer); + EXPECT_EQ(queue.GetFinishedReadBuffer(), &buffer); } TEST(TOverlappedDataBufferTests, OverlappedReadCompleted_Failure) @@ -280,7 +280,7 @@ EXPECT_EQ(ERROR_ACCESS_DENIED, buffer.GetErrorCode()); EXPECT_EQ(0, buffer.GetRealDataSize()); - EXPECT_EQ(queue.GetFullBuffer(), &buffer); + EXPECT_EQ(queue.GetFinishedReadBuffer(), &buffer); } TEST(TOverlappedDataBufferTests, OverlappedWriteCompleted_Success) @@ -300,5 +300,5 @@ OverlappedWriteCompleted(ERROR_SUCCESS, 234, &buffer); EXPECT_EQ(ERROR_SUCCESS, buffer.GetErrorCode()); - EXPECT_EQ(queue.GetFinishedBuffer(), &buffer); + EXPECT_EQ(queue.GetFinishedWriteBuffer(), &buffer); } Index: src/libchcore/Tests/TOverlappedMemoryPoolTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TOverlappedMemoryPoolTests.cpp (revision 0) +++ src/libchcore/Tests/TOverlappedMemoryPoolTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,11 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TOverlappedMemoryPool.h" + +using namespace chcore; + +TEST(TOverlappedMemoryPoolTests, DefaultTest) +{ + TOverlappedMemoryPool memoryPool; +} Index: src/libchcore/Tests/TOverlappedReaderTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TOverlappedReaderTests.cpp (revision 0) +++ src/libchcore/Tests/TOverlappedReaderTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,13 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TOverlappedReader.h" + +using namespace chcore; + +TEST(TOverlappedReaderTests, DefaultTest) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + TBufferListPtr spQueue(std::make_shared()); + TOverlappedReader reader(spLogData, spQueue, 0, 4096); +} Index: src/libchcore/Tests/TOverlappedReaderWriterTests.cpp =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -29,8 +29,8 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_EQ(nullptr, tReaderWriter.GetEmptyBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); EXPECT_NE(nullptr, tReaderWriter.GetEventWritePossibleHandle()); @@ -41,7 +41,7 @@ EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); - EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); +// EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); EXPECT_EQ(0, spBuffers->GetTotalBufferCount()); EXPECT_EQ(0, spBuffers->GetSingleBufferSize()); @@ -55,8 +55,8 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); EXPECT_NE(nullptr, tReaderWriter.GetEventReadPossibleHandle()); EXPECT_NE(nullptr, tReaderWriter.GetEventWritePossibleHandle()); @@ -67,7 +67,7 @@ EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); - EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); +// EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); } TEST(TOverlappedReaderWriterTests, AllocatingConstructor_CheckBufferSizes) @@ -234,13 +234,13 @@ EXPECT_TIMEOUT(tReaderWriter.GetEventReadPossibleHandle()); - tReaderWriter.AddEmptyBuffer(pBuffers[0]); + tReaderWriter.AddEmptyBuffer(pBuffers[0], false); EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); - tReaderWriter.AddEmptyBuffer(pBuffers[1]); + tReaderWriter.AddEmptyBuffer(pBuffers[1], false); EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); - tReaderWriter.AddEmptyBuffer(pBuffers[2]); + tReaderWriter.AddEmptyBuffer(pBuffers[2], false); EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); } @@ -251,7 +251,7 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); - EXPECT_THROW(tReaderWriter.AddEmptyBuffer(nullptr), TCoreException); + EXPECT_THROW(tReaderWriter.AddEmptyBuffer(nullptr, false), TCoreException); } /////////////////////////////////////////////////////////////////////////////////////////////////// @@ -263,10 +263,10 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); - tReaderWriter.AddFullBuffer(pBuffer); + tReaderWriter.AddFinishedReadBuffer(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWritePossibleHandle()); - tReaderWriter.GetFullBuffer(); + tReaderWriter.GetFinishedReadBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); } @@ -278,14 +278,14 @@ TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; - tReaderWriter.AddFullBuffer(pBuffers[1]); - EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); - tReaderWriter.AddFullBuffer(pBuffers[2]); - EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); + tReaderWriter.AddFinishedReadBuffer(pBuffers[2]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedReadBuffer()); - tReaderWriter.AddFullBuffer(pBuffers[0]); - EXPECT_NE(nullptr, tReaderWriter.GetFullBuffer()); + tReaderWriter.AddFinishedReadBuffer(pBuffers[0]); + EXPECT_NE(nullptr, tReaderWriter.GetFinishedReadBuffer()); } TEST(TOverlappedReaderWriterTests, AddFullBuffer_HandlingSrcEof) @@ -298,10 +298,10 @@ pBuffers[1]->SetLastPart(true); - tReaderWriter.AddFullBuffer(pBuffers[0]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[0]); EXPECT_FALSE(tReaderWriter.IsDataSourceFinished()); - tReaderWriter.AddFullBuffer(pBuffers[1]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); EXPECT_TRUE(tReaderWriter.IsDataSourceFinished()); } @@ -315,19 +315,19 @@ pBuffers[2]->SetLastPart(true); - tReaderWriter.AddFullBuffer(pBuffers[0]); - tReaderWriter.AddFullBuffer(pBuffers[1]); - tReaderWriter.AddFullBuffer(pBuffers[2]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[0]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[2]); - tReaderWriter.GetFullBuffer(); - EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); + tReaderWriter.GetFinishedReadBuffer(); +// EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); - tReaderWriter.GetFullBuffer(); - EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); + tReaderWriter.GetFinishedReadBuffer(); +// EXPECT_FALSE(tReaderWriter.IsDataWritingFinished()); // getting the last buffer (marked as eof) causes setting the data-writing-finished flag - tReaderWriter.GetFullBuffer(); - EXPECT_TRUE(tReaderWriter.IsDataWritingFinished()); + tReaderWriter.GetFinishedReadBuffer(); +// EXPECT_TRUE(tReaderWriter.IsDataWritingFinished()); } TEST(TOverlappedReaderWriterTests, AddFullBuffer_Null) @@ -337,7 +337,7 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); - EXPECT_THROW(tReaderWriter.AddFullBuffer(nullptr), TCoreException); + EXPECT_THROW(tReaderWriter.AddFinishedReadBuffer(nullptr), TCoreException); } TEST(TOverlappedReaderWriterTests, AddFullBuffer_SameBufferTwice) @@ -352,8 +352,8 @@ pBuffer->SetBytesTransferred(1230); pBuffer->SetStatusCode(0); - tReaderWriter.AddFullBuffer(pBuffer); - EXPECT_THROW(tReaderWriter.AddFullBuffer(pBuffer), TCoreException); + tReaderWriter.AddFinishedReadBuffer(pBuffer); + EXPECT_THROW(tReaderWriter.AddFinishedReadBuffer(pBuffer), TCoreException); } TEST(TOverlappedReaderWriterTests, GetFullBuffer_AddFullBuffer_OutOfOrder) @@ -379,13 +379,13 @@ EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); - tReaderWriter.AddFullBuffer(pBuffers[1]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[1]); EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); - tReaderWriter.AddFullBuffer(pBuffers[2]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[2]); EXPECT_TIMEOUT(tReaderWriter.GetEventWritePossibleHandle()); - tReaderWriter.AddFullBuffer(pBuffers[0]); + tReaderWriter.AddFinishedReadBuffer(pBuffers[0]); EXPECT_SIGNALED(tReaderWriter.GetEventWritePossibleHandle()); } @@ -411,11 +411,11 @@ pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); - tReaderWriter.AddFinishedBuffer(pBuffers[1]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[1]); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); - tReaderWriter.AddFinishedBuffer(pBuffers[2]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[2]); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); - tReaderWriter.AddFinishedBuffer(pBuffers[0]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[0]); EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); } @@ -440,21 +440,21 @@ pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); - tReaderWriter.AddFinishedBuffer(pBuffers[1]); - tReaderWriter.AddFinishedBuffer(pBuffers[2]); - tReaderWriter.AddFinishedBuffer(pBuffers[0]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[1]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[2]); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[0]); - TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedBuffer(); + TOverlappedDataBuffer* pBuffer = tReaderWriter.GetFinishedWriteBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); - pBuffer = tReaderWriter.GetFinishedBuffer(); + pBuffer = tReaderWriter.GetFinishedWriteBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); EXPECT_SIGNALED(tReaderWriter.GetEventWriteFinishedHandle()); - pBuffer = tReaderWriter.GetFinishedBuffer(); + pBuffer = tReaderWriter.GetFinishedWriteBuffer(); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); tReaderWriter.MarkFinishedBufferAsComplete(pBuffer); EXPECT_TIMEOUT(tReaderWriter.GetEventWriteFinishedHandle()); @@ -481,14 +481,14 @@ pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); - tReaderWriter.AddFinishedBuffer(pBuffers[1]); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[1]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); - tReaderWriter.AddFinishedBuffer(pBuffers[2]); - EXPECT_EQ(nullptr, tReaderWriter.GetFinishedBuffer()); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[2]); + EXPECT_EQ(nullptr, tReaderWriter.GetFinishedWriteBuffer()); - tReaderWriter.AddFinishedBuffer(pBuffers[0]); - EXPECT_NE(nullptr, tReaderWriter.GetFinishedBuffer()); + tReaderWriter.AddFinishedWriteBuffer(pBuffers[0]); + EXPECT_NE(nullptr, tReaderWriter.GetFinishedWriteBuffer()); } TEST(TOverlappedReaderWriterTests, AddFinishedBuffer_Null) @@ -498,7 +498,7 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); - EXPECT_THROW(tReaderWriter.AddFinishedBuffer(nullptr), TCoreException); + EXPECT_THROW(tReaderWriter.AddFinishedWriteBuffer(nullptr), TCoreException); } TEST(TOverlappedReaderWriterTests, AddFinishedBuffer_SameBufferTwice) @@ -508,40 +508,6 @@ TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); - tReaderWriter.AddFinishedBuffer(pBuffer); - EXPECT_THROW(tReaderWriter.AddFinishedBuffer(pBuffer), TCoreException); + tReaderWriter.AddFinishedWriteBuffer(pBuffer); + EXPECT_THROW(tReaderWriter.AddFinishedWriteBuffer(pBuffer), TCoreException); } - -/////////////////////////////////////////////////////////////////////////////////////////////////// -TEST(TOverlappedReaderWriterTests, DataSourceChanged_CleanupBuffers) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); - TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; - - pBuffers[0]->SetLastPart(true); - pBuffers[1]->SetLastPart(true); - pBuffers[2]->SetLastPart(true); - - tReaderWriter.AddFullBuffer(pBuffers[1]); - tReaderWriter.AddFullBuffer(pBuffers[2]); - tReaderWriter.AddFullBuffer(pBuffers[0]); - - // this tests if the buffers are properly cleaned up - if they're not, DataSourceChanged() throws an exception - EXPECT_NO_THROW(tReaderWriter.DataSourceChanged()); -} - -TEST(TOverlappedReaderWriterTests, DataSourceChanged_InvalidBufferCount) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); - - tReaderWriter.GetEmptyBuffer(); - - // this tests if the buffers are properly cleaned up - if they're not, DataSourceChanged() throws an exception - EXPECT_THROW(tReaderWriter.DataSourceChanged(), TCoreException); -} Index: src/libchcore/Tests/TOverlappedWriterTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TOverlappedWriterTests.cpp (revision 0) +++ src/libchcore/Tests/TOverlappedWriterTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,14 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TOverlappedWriter.h" + +using namespace chcore; + +TEST(TOverlappedWriterTests, DefaultTest) +{ + logger::TLogFileDataPtr spLogData(std::make_shared()); + TOrderedBufferQueuePtr spQueue(std::make_shared()); + + TOverlappedWriter writer(spLogData, spQueue, 0); +} Index: src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (revision 0) +++ src/libchcore/Tests/TReadBufferQueueWrapperTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,14 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TReadBufferQueueWrapper.h" +#include + +using namespace chcore; + +TEST(TReadBufferQueueWrapperTests, DefaultTest) +{ + TBufferListPtr spList(std::make_shared()); + + TReadBufferQueueWrapper queueWrapper(spList, 0, 0); +} Index: src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp =================================================================== diff -u --- src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp (revision 0) +++ src/libchcore/Tests/TWriteBufferQueueWrapperTests.cpp (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -0,0 +1,13 @@ +#include "stdafx.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "../TWriteBufferQueueWrapper.h" +#include + +using namespace chcore; + +TEST(TWriteBufferQueueWrapperTests, DefaultTest) +{ + TOrderedBufferQueuePtr spQueue(std::make_shared()); + TWriteBufferQueueWrapper queueWrapper(spQueue); +} Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -493,6 +493,7 @@ + @@ -509,7 +510,9 @@ + + @@ -579,10 +582,13 @@ + + + @@ -623,6 +629,18 @@ + + true + true + true + true + + + true + true + true + true + true true @@ -725,6 +743,30 @@ true true + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + true true @@ -737,7 +779,26 @@ true true + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + + @@ -752,7 +813,9 @@ + + @@ -817,6 +880,8 @@ + + @@ -856,6 +921,7 @@ Create Create + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -r1506d51ff1c0a5d156dab398051efc0c87473e81 -r3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9 --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 3ccbdb8d3eac3427e6d3354854476e57fdc7ceb9) @@ -84,6 +84,12 @@ {0bd6a559-b992-4f1d-abe6-f8105a9d5f50} + + {94659973-1dd0-4e22-83c6-2fc26d3bf56e} + + + {64486dcc-0c2a-468c-9498-c4ff0c2183af} + @@ -350,9 +356,6 @@ Source Files\Tools - - Source Files\Tools\Data Buffer - Source Files\Tools @@ -464,12 +467,33 @@ Source Files\Tools\Data Buffer - + Source Files\Tools\Data Buffer - + Source Files\Tools\Data Buffer + + Source Files\Tools\Data Buffer + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + @@ -739,24 +763,15 @@ Source Files\Tools - - Source Files\Tools\Data Buffer - Source Files\Tools - - Tests - Source Files\Task Config Source Files\Task Config - - Tests - Source Files\Tools\Data Buffer @@ -847,17 +862,71 @@ Source Files\Tools\Data Buffer - - Tests - Source Files\Tools\Data Buffer - + Source Files\Tools\Data Buffer - + Source Files\Tools\Data Buffer + + Source Files\Tools\Data Buffer + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Source Files\Tools\Data Buffer\Queues + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + + + Tests\DataBuffer + \ No newline at end of file