Index: src/libchcore/OverlappedCallbacks.cpp =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/OverlappedCallbacks.cpp (.../OverlappedCallbacks.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -27,8 +27,6 @@ { VOID CALLBACK OverlappedReadCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; TOverlappedReaderWriter* pQueue = (TOverlappedReaderWriter*)pBuffer->GetParam(); @@ -45,18 +43,20 @@ pBuffer->SetRealDataSize(dwNumberOfBytesTransfered); pBuffer->SetLastPart(bEof); + // NOTE: updating the bytes transferred as system does not update lpOverlapped with correct value + // in case of error (e.g end-of-file error triggers the difference and dwNumberOfBytesTransfered contains more up-to-date information) + pBuffer->SetBytesTransferred(dwNumberOfBytesTransfered); pQueue->AddFullBuffer(pBuffer); } VOID CALLBACK OverlappedWriteCompleted(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) { - _ASSERTE(dwNumberOfBytesTransfered == lpOverlapped->InternalHigh); dwNumberOfBytesTransfered; - TOverlappedDataBuffer* pBuffer = (TOverlappedDataBuffer*)lpOverlapped; TOverlappedReaderWriter* pQueue = (TOverlappedReaderWriter*)pBuffer->GetParam(); pBuffer->SetErrorCode(dwErrorCode); + pBuffer->SetBytesTransferred(dwNumberOfBytesTransfered); pQueue->AddFinishedBuffer(pBuffer); } Index: src/libchcore/TLocalFilesystemFile.cpp =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/TLocalFilesystemFile.cpp (.../TLocalFilesystemFile.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -27,7 +27,7 @@ #include "TFileException.h" #include "TFileInfo.h" #include "StreamingHelpers.h" -#include "TOverlappedDataBufferQueue.h" +#include "TOverlappedMemoryPool.h" #include "OverlappedCallbacks.h" namespace chcore @@ -187,12 +187,12 @@ { LOG_TRACE(m_spLog) << L"Requesting read of " << rBuffer.GetRequestedDataSize() << L" bytes at position " << rBuffer.GetFilePosition() << - L"; buffer-order: " << rBuffer.GetBufferOrder() << + L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); if (!IsOpen()) { - LOG_ERROR(m_spLog) << L"Read request failed - file not open" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_ERROR(m_spLog) << L"Read request failed - file not open" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); throw TFileException(eErr_FileNotOpen, ERROR_INVALID_HANDLE, m_pathFile, L"Cannot read from closed file", LOCATION); } @@ -202,12 +202,12 @@ switch (dwLastError) { case ERROR_IO_PENDING: - LOG_TRACE(m_spLog) << L"Read requested and is pending" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Read requested and is pending" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); return; case ERROR_HANDLE_EOF: { - LOG_TRACE(m_spLog) << L"Read request marked as EOF" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Read request marked as EOF" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); rBuffer.SetBytesTransferred(0); rBuffer.SetStatusCode(0); @@ -221,26 +221,26 @@ default: { - LOG_ERROR(m_spLog) << L"Read request failed with error " << dwLastError << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_ERROR(m_spLog) << L"Read request failed with error " << dwLastError << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); throw TFileException(eErr_CannotReadFile, dwLastError, m_pathFile, L"Error reading data from file", LOCATION); } } } else - LOG_TRACE(m_spLog) << L"Read request succeeded" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Read request succeeded" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); } void TLocalFilesystemFile::WriteFile(TOverlappedDataBuffer& rBuffer) { LOG_TRACE(m_spLog) << L"Requesting writing of " << rBuffer.GetRealDataSize() << L" bytes at position " << rBuffer.GetFilePosition() << - L"; buffer-order: " << rBuffer.GetBufferOrder() << + L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); if (!IsOpen()) { - LOG_ERROR(m_spLog) << L"Write request failed - file not open" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_ERROR(m_spLog) << L"Write request failed - file not open" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); throw TFileException(eErr_FileNotOpen, ERROR_INVALID_HANDLE, m_pathFile, L"Cannot write to closed file", LOCATION); } @@ -249,33 +249,33 @@ if (m_bNoBuffering && rBuffer.IsLastPart()) { dwToWrite = RoundUp(dwToWrite, MaxSectorSize); - LOG_TRACE(m_spLog) << L"Writing last part of file in no-buffering mode. Rounding up last write to " << dwToWrite << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Writing last part of file in no-buffering mode. Rounding up last write to " << dwToWrite << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); } if (!::WriteFileEx(m_hFile, rBuffer.GetBufferPtr(), dwToWrite, &rBuffer, OverlappedWriteCompleted)) { DWORD dwLastError = GetLastError(); if (dwLastError != ERROR_IO_PENDING) { - LOG_ERROR(m_spLog) << L"Write request failed with error " << dwLastError << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_ERROR(m_spLog) << L"Write request failed with error " << dwLastError << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); throw TFileException(eErr_CannotWriteFile, dwLastError, m_pathFile, L"Error while writing to file", LOCATION); } - LOG_TRACE(m_spLog) << L"Write requested and is pending" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Write requested and is pending" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); } else - LOG_TRACE(m_spLog) << L"Write request succeeded" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_TRACE(m_spLog) << L"Write request succeeded" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); } void TLocalFilesystemFile::FinalizeFile(TOverlappedDataBuffer& rBuffer) { LOG_TRACE(m_spLog) << L"Finalizing file" << - L"; buffer-order: " << rBuffer.GetBufferOrder() << + L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); if (!IsOpen()) { - LOG_ERROR(m_spLog) << L"Cannot finalize file - file not open" << L"; buffer-order: " << rBuffer.GetBufferOrder() << GetFileInfoForLog(m_bNoBuffering); + LOG_ERROR(m_spLog) << L"Cannot finalize file - file not open" << L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); throw TFileException(eErr_FileNotOpen, ERROR_INVALID_HANDLE, m_pathFile, L"Cannot write to closed file", LOCATION); } @@ -291,7 +291,7 @@ LOG_TRACE(m_spLog) << L"File need truncating - really written " << dwReallyWritten << L", should write " << dwToWrite << L". Truncating file to " << fsNewFileSize << - L"; buffer-order: " << rBuffer.GetBufferOrder() << + L"; buffer-order: " << rBuffer.GetFilePosition() << GetFileInfoForLog(m_bNoBuffering); Truncate(fsNewFileSize); Index: src/libchcore/TOrderedBufferQueue.cpp =================================================================== diff -u -N --- src/libchcore/TOrderedBufferQueue.cpp (revision 0) +++ src/libchcore/TOrderedBufferQueue.cpp (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -0,0 +1,35 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOrderedBufferQueue.h" +#include "TOverlappedDataBuffer.h" +#include "TCoreException.h" + +namespace chcore +{ + bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) const + { + if(!pBufferA) + throw TCoreException(eErr_InvalidArgument, L"pBufferA", LOCATION); + if(!pBufferB) + throw TCoreException(eErr_InvalidArgument, L"pBufferB", LOCATION); + + return pBufferA->GetFilePosition() < pBufferB->GetFilePosition(); + } +} Index: src/libchcore/TOrderedBufferQueue.h =================================================================== diff -u -N --- src/libchcore/TOrderedBufferQueue.h (revision 0) +++ src/libchcore/TOrderedBufferQueue.h (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -0,0 +1,49 @@ +// ============================================================================ +// Copyright (C) 2001-2016 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TORDEREDBUFFERQUEUE_H__ +#define __TORDEREDBUFFERQUEUE_H__ + +#include "libchcore.h" +#include + +namespace chcore +{ + class TOverlappedDataBuffer; + + struct CompareBufferPositions + { + bool operator()(const TOverlappedDataBuffer* rBufferA, const TOverlappedDataBuffer* rBufferB) const; + }; + + class TOrderedBufferQueue : public std::set + { + public: + TOverlappedDataBuffer* pop_front() + { + if(empty()) + throw std::runtime_error("Bounds exceeded in ordered buffer queue"); + + TOverlappedDataBuffer* pBuffer = *begin(); + erase(begin()); + return pBuffer; + } + }; +} + +#endif Index: src/libchcore/TOverlappedDataBuffer.h =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/TOverlappedDataBuffer.h (.../TOverlappedDataBuffer.h) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -56,9 +56,6 @@ void SetLastPart(bool bLastPart) { m_bLastPart = bLastPart; } bool IsLastPart() const { return m_bLastPart; } - unsigned long long GetBufferOrder() const { return m_ullBufferOrder; } - void SetBufferOrder(unsigned long long ullOrder) { m_ullBufferOrder = ullOrder; } - DWORD GetErrorCode() const { return m_dwErrorCode; } void SetErrorCode(DWORD dwErrorCode) { m_dwErrorCode = dwErrorCode; } @@ -90,7 +87,6 @@ DWORD m_dwRealDataSize = 0; // data size as reported by read operation DWORD m_dwErrorCode = 0; // win32 error code bool m_bLastPart = false; // marks the last part of the file - unsigned long long m_ullBufferOrder = 0; // marks the order of this buffer void* m_pParam = nullptr; // pointer to the queue where this object resides }; Index: src/libchcore/TOverlappedMemoryPool.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedMemoryPool.cpp (revision 0) +++ src/libchcore/TOverlappedMemoryPool.cpp (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -0,0 +1,183 @@ +// ============================================================================ +// Copyright (C) 2001-2015 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#include "stdafx.h" +#include "TOverlappedMemoryPool.h" +#include "TOverlappedDataBuffer.h" +#include "TCoreException.h" +#include "ErrorCodes.h" +#include + +#define STATUS_END_OF_FILE 0xc0000011 + +namespace chcore +{ + TOverlappedMemoryPool::TOverlappedMemoryPool() : + m_eventHasBuffers(true, false), + m_eventAllBuffersAccountedFor(true, true) + { + } + + TOverlappedMemoryPool::TOverlappedMemoryPool(size_t stCount, size_t stBufferSize) : + TOverlappedMemoryPool() + { + ReinitializeBuffers(stCount, stBufferSize); + } + + TOverlappedMemoryPool::~TOverlappedMemoryPool() + { + } + + TOverlappedDataBuffer* TOverlappedMemoryPool::GetBuffer() + { + if (!m_dequeBuffers.empty()) + { + TOverlappedDataBuffer* pBuffer = m_dequeBuffers.front(); + m_dequeBuffers.pop_front(); + + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); + + return pBuffer; + } + + return nullptr; + } + + bool TOverlappedMemoryPool::AreAllBuffersAccountedFor() const + { + return m_dequeBuffers.size() == m_listAllBuffers.size(); + } + + void TOverlappedMemoryPool::AddBuffer(TOverlappedDataBuffer* pBuffer) + { + if (!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + m_dequeBuffers.push_back(pBuffer); + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); + } + + void TOverlappedMemoryPool::UpdateAllBuffersAccountedFor() + { + if (AreAllBuffersAccountedFor()) + m_eventAllBuffersAccountedFor.SetEvent(); + else + m_eventAllBuffersAccountedFor.ResetEvent(); + } + + void TOverlappedMemoryPool::UpdateHasBuffers() + { + if(!m_dequeBuffers.empty()) + m_eventHasBuffers.SetEvent(); + else + m_eventHasBuffers.ResetEvent(); + } + + void TOverlappedMemoryPool::ReinitializeBuffers(size_t stCount, size_t stBufferSize) + { + // sanity check - if any of the buffers are still in use, we can't change the sizes + if (m_listAllBuffers.size() != m_dequeBuffers.size()) + throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); + if (stBufferSize == 0) + throw TCoreException(eErr_InvalidArgument, L"stBufferSize", LOCATION); + + if (stBufferSize != GetSingleBufferSize()) + { + // buffer sizes increased - clear current buffers and proceed with creating new ones + m_listAllBuffers.clear(); + m_dequeBuffers.clear(); + } + else if (stCount == m_listAllBuffers.size()) + return; // nothing really changed + else if (stCount > m_listAllBuffers.size()) + stCount -= m_listAllBuffers.size(); // allocate only the missing buffers + else if (stCount < m_listAllBuffers.size()) + { + // there are too many buffers - reduce + m_dequeBuffers.clear(); + + size_t stCountToRemove = m_listAllBuffers.size() - stCount; + + m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); + for (const auto& upElement : m_listAllBuffers) + { + m_dequeBuffers.push_back(upElement.get()); + } + + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); + return; + } + + // allocate buffers + while (stCount--) + { + auto upBuffer = std::make_unique(stBufferSize, nullptr); + m_dequeBuffers.push_back(upBuffer.get()); + m_listAllBuffers.push_back(std::move(upBuffer)); + } + + UpdateHasBuffers(); + UpdateAllBuffersAccountedFor(); + } + + size_t TOverlappedMemoryPool::GetTotalBufferCount() const + { + return m_listAllBuffers.size(); + } + + size_t TOverlappedMemoryPool::GetAvailableBufferCount() const + { + return m_dequeBuffers.size(); + } + + size_t TOverlappedMemoryPool::GetSingleBufferSize() const + { + if (m_listAllBuffers.empty()) + return 0; + + return (*m_listAllBuffers.begin())->GetBufferSize(); + } + + void TOverlappedMemoryPool::WaitForMissingBuffers(HANDLE hKillEvent) const + { + enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; + std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; + + bool bExit = false; + while (!bExit) + { + DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); + switch (dwResult) + { + case STATUS_USER_APC: + break; + + case WAIT_OBJECT_0 + eAllBuffersReturned: + bExit = true; + break; + + case WAIT_OBJECT_0 + eKillThread: + bExit = true; + break; + } + } + } +} Index: src/libchcore/TOverlappedDataBufferQueue.cpp =================================================================== diff -u -N --- src/libchcore/TOverlappedDataBufferQueue.cpp (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/TOverlappedDataBufferQueue.cpp (revision 0) @@ -1,183 +0,0 @@ -// ============================================================================ -// Copyright (C) 2001-2015 by Jozef Starosczyk -// ixen@copyhandler.com -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU Library General Public License -// (version 2) as published by the Free Software Foundation; -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this program; if not, write to the -// Free Software Foundation, Inc., -// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -// ============================================================================ -#include "stdafx.h" -#include "TOverlappedDataBufferQueue.h" -#include "TOverlappedDataBuffer.h" -#include "TCoreException.h" -#include "ErrorCodes.h" -#include - -#define STATUS_END_OF_FILE 0xc0000011 - -namespace chcore -{ - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue() : - m_eventHasBuffers(true, false), - m_eventAllBuffersAccountedFor(true, true) - { - } - - TOverlappedDataBufferQueue::TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize) : - TOverlappedDataBufferQueue() - { - ReinitializeBuffers(stCount, stBufferSize); - } - - TOverlappedDataBufferQueue::~TOverlappedDataBufferQueue() - { - } - - TOverlappedDataBuffer* TOverlappedDataBufferQueue::GetBuffer() - { - if (!m_dequeBuffers.empty()) - { - TOverlappedDataBuffer* pBuffer = m_dequeBuffers.front(); - m_dequeBuffers.pop_front(); - - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - - return pBuffer; - } - - return nullptr; - } - - bool TOverlappedDataBufferQueue::AreAllBuffersAccountedFor() const - { - return m_dequeBuffers.size() == m_listAllBuffers.size(); - } - - void TOverlappedDataBufferQueue::AddBuffer(TOverlappedDataBuffer* pBuffer) - { - if (!pBuffer) - throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - - m_dequeBuffers.push_back(pBuffer); - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - } - - void TOverlappedDataBufferQueue::UpdateAllBuffersAccountedFor() - { - if (AreAllBuffersAccountedFor()) - m_eventAllBuffersAccountedFor.SetEvent(); - else - m_eventAllBuffersAccountedFor.ResetEvent(); - } - - void TOverlappedDataBufferQueue::UpdateHasBuffers() - { - if(!m_dequeBuffers.empty()) - m_eventHasBuffers.SetEvent(); - else - m_eventHasBuffers.ResetEvent(); - } - - void TOverlappedDataBufferQueue::ReinitializeBuffers(size_t stCount, size_t stBufferSize) - { - // sanity check - if any of the buffers are still in use, we can't change the sizes - if (m_listAllBuffers.size() != m_dequeBuffers.size()) - throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); - if (stBufferSize == 0) - throw TCoreException(eErr_InvalidArgument, L"stBufferSize", LOCATION); - - if (stBufferSize != GetSingleBufferSize()) - { - // buffer sizes increased - clear current buffers and proceed with creating new ones - m_listAllBuffers.clear(); - m_dequeBuffers.clear(); - } - else if (stCount == m_listAllBuffers.size()) - return; // nothing really changed - else if (stCount > m_listAllBuffers.size()) - stCount -= m_listAllBuffers.size(); // allocate only the missing buffers - else if (stCount < m_listAllBuffers.size()) - { - // there are too many buffers - reduce - m_dequeBuffers.clear(); - - size_t stCountToRemove = m_listAllBuffers.size() - stCount; - - m_listAllBuffers.erase(m_listAllBuffers.begin(), m_listAllBuffers.begin() + stCountToRemove); - for (const auto& upElement : m_listAllBuffers) - { - m_dequeBuffers.push_back(upElement.get()); - } - - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - return; - } - - // allocate buffers - while (stCount--) - { - auto upBuffer = std::make_unique(stBufferSize, nullptr); - m_dequeBuffers.push_back(upBuffer.get()); - m_listAllBuffers.push_back(std::move(upBuffer)); - } - - UpdateHasBuffers(); - UpdateAllBuffersAccountedFor(); - } - - size_t TOverlappedDataBufferQueue::GetTotalBufferCount() const - { - return m_listAllBuffers.size(); - } - - size_t TOverlappedDataBufferQueue::GetAvailableBufferCount() const - { - return m_dequeBuffers.size(); - } - - size_t TOverlappedDataBufferQueue::GetSingleBufferSize() const - { - if (m_listAllBuffers.empty()) - return 0; - - return (*m_listAllBuffers.begin())->GetBufferSize(); - } - - void TOverlappedDataBufferQueue::WaitForMissingBuffers(HANDLE hKillEvent) const - { - enum { eKillThread = 0, eAllBuffersReturned, eHandleCount }; - std::array arrHandles = { hKillEvent, m_eventAllBuffersAccountedFor.Handle() }; - - bool bExit = false; - while (!bExit) - { - DWORD dwResult = WaitForMultipleObjectsEx(eHandleCount, arrHandles.data(), false, INFINITE, true); - switch (dwResult) - { - case STATUS_USER_APC: - break; - - case WAIT_OBJECT_0 + eAllBuffersReturned: - bExit = true; - break; - - case WAIT_OBJECT_0 + eKillThread: - bExit = true; - break; - } - } - } -} Index: src/libchcore/TOverlappedMemoryPool.h =================================================================== diff -u -N --- src/libchcore/TOverlappedMemoryPool.h (revision 0) +++ src/libchcore/TOverlappedMemoryPool.h (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -0,0 +1,73 @@ +// ============================================================================ +// Copyright (C) 2001-2014 by Jozef Starosczyk +// ixen@copyhandler.com +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU Library General Public License +// (version 2) as published by the Free Software Foundation; +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// ============================================================================ +#ifndef __TOVERLAPPEDDATABUFFERQUEUE_H__ +#define __TOVERLAPPEDDATABUFFERQUEUE_H__ + +#include +#include "TEvent.h" + +namespace chcore +{ + class TOverlappedDataBuffer; + + class TOverlappedMemoryPool + { + public: + TOverlappedMemoryPool(); + TOverlappedMemoryPool(size_t stCount, size_t stBufferSize); + TOverlappedMemoryPool(const TOverlappedMemoryPool&) = delete; + ~TOverlappedMemoryPool(); + + TOverlappedMemoryPool& operator=(const TOverlappedMemoryPool&) = delete; + + void ReinitializeBuffers(size_t stCount, size_t stBufferSize); + size_t GetTotalBufferCount() const; + size_t GetAvailableBufferCount() const; + size_t GetSingleBufferSize() const; + + // buffer management + void AddBuffer(TOverlappedDataBuffer* pBuffer); + TOverlappedDataBuffer* GetBuffer(); + + // event access + HANDLE GetEventHasBuffers() const { return m_eventHasBuffers.Handle(); } + bool HasBuffers() const { return !m_dequeBuffers.empty(); } + + HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } + bool AreAllBuffersAccountedFor() const; + + void WaitForMissingBuffers(HANDLE hKillEvent) const; + + private: + void UpdateAllBuffersAccountedFor(); + void UpdateHasBuffers(); + + private: + std::vector> m_listAllBuffers; + + std::deque m_dequeBuffers; + + TEvent m_eventHasBuffers; + TEvent m_eventAllBuffersAccountedFor; + }; + + using TOverlappedMemoryPoolPtr = std::shared_ptr; +} + +#endif Index: src/libchcore/TOverlappedDataBufferQueue.h =================================================================== diff -u -N --- src/libchcore/TOverlappedDataBufferQueue.h (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/TOverlappedDataBufferQueue.h (revision 0) @@ -1,72 +0,0 @@ -// ============================================================================ -// Copyright (C) 2001-2014 by Jozef Starosczyk -// ixen@copyhandler.com -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU Library General Public License -// (version 2) as published by the Free Software Foundation; -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Library General Public -// License along with this program; if not, write to the -// Free Software Foundation, Inc., -// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -// ============================================================================ -#ifndef __TOVERLAPPEDDATABUFFERQUEUE_H__ -#define __TOVERLAPPEDDATABUFFERQUEUE_H__ - -#include -#include "TEvent.h" - -namespace chcore -{ - class TOverlappedDataBuffer; - - class TOverlappedDataBufferQueue - { - public: - TOverlappedDataBufferQueue(); - TOverlappedDataBufferQueue(size_t stCount, size_t stBufferSize); - TOverlappedDataBufferQueue(const TOverlappedDataBufferQueue&) = delete; - ~TOverlappedDataBufferQueue(); - - TOverlappedDataBufferQueue& operator=(const TOverlappedDataBufferQueue&) = delete; - - void ReinitializeBuffers(size_t stCount, size_t stBufferSize); - size_t GetTotalBufferCount() const; - size_t GetAvailableBufferCount() const; - size_t GetSingleBufferSize() const; - - // buffer management - void AddBuffer(TOverlappedDataBuffer* pBuffer); - TOverlappedDataBuffer* GetBuffer(); - - bool AreAllBuffersAccountedFor() const; - - // event access - HANDLE GetEventHasBuffers() const { return m_eventHasBuffers.Handle(); } - HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } - - void WaitForMissingBuffers(HANDLE hKillEvent) const; - - private: - void UpdateAllBuffersAccountedFor(); - void UpdateHasBuffers(); - - private: - std::vector> m_listAllBuffers; - - std::deque m_dequeBuffers; - - TEvent m_eventHasBuffers; - TEvent m_eventAllBuffersAccountedFor; - }; - - using TOverlappedDataBufferQueuePtr = std::shared_ptr; -} - -#endif Index: src/libchcore/TOverlappedReaderWriter.cpp =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/TOverlappedReaderWriter.cpp (.../TOverlappedReaderWriter.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -25,23 +25,23 @@ namespace chcore { - bool CompareBufferPositions::operator()(const TOverlappedDataBuffer* pBufferA, const TOverlappedDataBuffer* pBufferB) - { - return pBufferA->GetBufferOrder() < pBufferB->GetBufferOrder(); - } - - TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedDataBufferQueuePtr& spBuffers) : + TOverlappedReaderWriter::TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spMemoryPool, + file_size_t ullFilePos, DWORD dwChunkSize) : m_spLog(logger::MakeLogger(spLogFileData, L"DataBuffer")), - m_spBuffers(spBuffers), + m_spMemoryPool(spMemoryPool), + m_eventReadPossible(true, true), m_eventWritePossible(true, false), m_eventWriteFinished(true, false), m_eventAllBuffersAccountedFor(true, true), m_bDataSourceFinished(false), m_bDataWritingFinished(false), - m_ullNextReadBufferOrder(0), + m_dwDataChunkSize(dwChunkSize), + m_ullNextReadBufferOrder(ullFilePos), m_ullNextWriteBufferOrder(0), m_ullNextFinishedBufferOrder(0) { + if(!spMemoryPool) + throw TCoreException(eErr_InvalidArgument, L"spMemoryPool", LOCATION); } TOverlappedReaderWriter::~TOverlappedReaderWriter() @@ -50,36 +50,73 @@ TOverlappedDataBuffer* TOverlappedReaderWriter::GetEmptyBuffer() { - TOverlappedDataBuffer* pBuffer = m_spBuffers->GetBuffer(); - if(pBuffer) + TOverlappedDataBuffer* pBuffer = nullptr; + + // return buffers to re-read if exists + if(!m_setEmptyBuffers.empty()) + pBuffer = m_setEmptyBuffers.pop_front(); + else { - pBuffer->SetParam(this); - pBuffer->SetBufferOrder(m_ullNextReadBufferOrder++); + // get empty buffer and initialize + pBuffer = m_spMemoryPool->GetBuffer(); + if(pBuffer) + { + pBuffer->SetParam(this); + pBuffer->InitForRead(m_ullNextReadBufferOrder, m_dwDataChunkSize); - m_eventAllBuffersAccountedFor.ResetEvent(); + m_ullNextReadBufferOrder += m_dwDataChunkSize; + } } + // reset the accounted-for event only if we managed to get the pointer, otherwise nothing is changing + if(pBuffer) + m_eventAllBuffersAccountedFor.ResetEvent(); + + UpdateReadPossibleEvent(); // update read-possible always - if we're getting null with read-possible event set (which we should not), we need to reset it + return pBuffer; } - void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + void TOverlappedReaderWriter::AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer) { if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as empty; buffer-order: " << pBuffer->GetBufferOrder(); + LOG_TRACE(m_spLog) << L"Queuing buffer for re-read; buffer-order: " << pBuffer->GetFilePosition(); - pBuffer->SetParam(nullptr); - m_spBuffers->AddBuffer(pBuffer); + m_setEmptyBuffers.insert(pBuffer); + + m_eventReadPossible.SetEvent(); UpdateAllBuffersAccountedFor(); } + void TOverlappedReaderWriter::AddEmptyBuffer(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Releasing empty buffer; buffer-order: " << pBuffer->GetFilePosition(); + + m_spMemoryPool->AddBuffer(pBuffer); + + UpdateReadPossibleEvent(); + UpdateAllBuffersAccountedFor(); + } + + void TOverlappedReaderWriter::UpdateReadPossibleEvent() + { + if(!m_setEmptyBuffers.empty() || (!m_bDataSourceFinished && m_spMemoryPool->HasBuffers())) + m_eventReadPossible.SetEvent(); + else + m_eventReadPossible.ResetEvent(); + } + TOverlappedDataBuffer* TOverlappedReaderWriter::GetFullBuffer() { if (!m_setFullBuffers.empty()) { TOverlappedDataBuffer* pBuffer = *m_setFullBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextWriteBufferOrder) + if (pBuffer->GetFilePosition() != m_ullNextWriteBufferOrder) return nullptr; m_setFullBuffers.erase(m_setFullBuffers.begin()); @@ -90,7 +127,7 @@ if(pBuffer->IsLastPart()) m_bDataWritingFinished = true; - ++m_ullNextWriteBufferOrder; + m_ullNextWriteBufferOrder += m_dwDataChunkSize; } UpdateWritePossibleEvent(); @@ -107,15 +144,15 @@ if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetBufferOrder() << + LOG_TRACE(m_spLog) << L"Queuing buffer as full; buffer-order: " << pBuffer->GetFilePosition() << L", requested-data-size: " << pBuffer->GetRequestedDataSize() << L", real-data-size: " << pBuffer->GetRealDataSize() << L", file-position: " << pBuffer->GetFilePosition() << L", error-code: " << pBuffer->GetErrorCode() << L", status-code: " << pBuffer->GetStatusCode() << L", is-last-part: " << pBuffer->IsLastPart(); - std::pair pairInsertInfo = m_setFullBuffers.insert(pBuffer); + auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); if (!pairInsertInfo.second) throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); @@ -126,14 +163,41 @@ UpdateAllBuffersAccountedFor(); } + void TOverlappedReaderWriter::AddFailedFullBuffer(TOverlappedDataBuffer* pBuffer) + { + if(!pBuffer) + throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); + + LOG_TRACE(m_spLog) << L"Queuing buffer as full (failed); buffer-order: " << pBuffer->GetFilePosition() << + L", requested-data-size: " << pBuffer->GetRequestedDataSize() << + L", real-data-size: " << pBuffer->GetRealDataSize() << + L", file-position: " << pBuffer->GetFilePosition() << + L", error-code: " << pBuffer->GetErrorCode() << + L", status-code: " << pBuffer->GetStatusCode() << + L", is-last-part: " << pBuffer->IsLastPart(); + + // overwrite error code (to avoid treating the buffer as failed read) + pBuffer->SetErrorCode(ERROR_SUCCESS); + + auto pairInsertInfo = m_setFullBuffers.insert(pBuffer); + if(!pairInsertInfo.second) + throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); + + if(pBuffer->IsLastPart()) + m_bDataSourceFinished = true; + + UpdateWritePossibleEvent(); + UpdateAllBuffersAccountedFor(); + } + void TOverlappedReaderWriter::UpdateWritePossibleEvent() { if (m_bDataWritingFinished || m_setFullBuffers.empty()) m_eventWritePossible.ResetEvent(); else { TOverlappedDataBuffer* pFirstBuffer = *m_setFullBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextWriteBufferOrder) + if (pFirstBuffer->GetFilePosition() == m_ullNextWriteBufferOrder) m_eventWritePossible.SetEvent(); else m_eventWritePossible.ResetEvent(); @@ -145,7 +209,7 @@ if (!m_setFinishedBuffers.empty()) { TOverlappedDataBuffer* pBuffer = *m_setFinishedBuffers.begin(); - if (pBuffer->GetBufferOrder() != m_ullNextFinishedBufferOrder) + if (pBuffer->GetFilePosition() != m_ullNextFinishedBufferOrder) return nullptr; m_setFinishedBuffers.erase(m_setFinishedBuffers.begin()); @@ -165,7 +229,7 @@ throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); // allow next finished buffer to be processed - ++m_ullNextFinishedBufferOrder; + m_ullNextFinishedBufferOrder += m_dwDataChunkSize; UpdateWriteFinishedEvent(); } @@ -174,15 +238,15 @@ if (!pBuffer) throw TCoreException(eErr_InvalidPointer, L"pBuffer", LOCATION); - LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetBufferOrder() << + LOG_TRACE(m_spLog) << L"Queuing buffer as finished; buffer-order: " << pBuffer->GetFilePosition() << L", requested-data-size: " << pBuffer->GetRequestedDataSize() << L", real-data-size: " << pBuffer->GetRealDataSize() << L", file-position: " << pBuffer->GetFilePosition() << L", error-code: " << pBuffer->GetErrorCode() << L", status-code: " << pBuffer->GetStatusCode() << L", is-last-part: " << pBuffer->IsLastPart(); - std::pair pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); + auto pairInsertInfo = m_setFinishedBuffers.insert(pBuffer); if (!pairInsertInfo.second) throw TCoreException(eErr_InvalidOverlappedPosition, L"Tried to re-insert same buffer into queue", LOCATION); @@ -197,7 +261,7 @@ else { TOverlappedDataBuffer* pFirstBuffer = *m_setFinishedBuffers.begin(); - if (pFirstBuffer->GetBufferOrder() == m_ullNextFinishedBufferOrder) + if (pFirstBuffer->GetFilePosition() == m_ullNextFinishedBufferOrder) m_eventWriteFinished.SetEvent(); else m_eventWriteFinished.ResetEvent(); @@ -206,8 +270,8 @@ void TOverlappedReaderWriter::UpdateAllBuffersAccountedFor() { - size_t stCurrentBuffers = m_spBuffers->GetAvailableBufferCount() + m_setFullBuffers.size() + m_setFinishedBuffers.size(); - if (stCurrentBuffers == m_spBuffers->GetTotalBufferCount()) + size_t stCurrentBuffers = m_spMemoryPool->GetAvailableBufferCount() + m_setFullBuffers.size() + m_setFinishedBuffers.size() + m_setEmptyBuffers.size(); + if (stCurrentBuffers == m_spMemoryPool->GetTotalBufferCount()) m_eventAllBuffersAccountedFor.SetEvent(); else m_eventAllBuffersAccountedFor.ResetEvent(); @@ -217,7 +281,7 @@ { CleanupBuffers(); - if (!m_spBuffers->AreAllBuffersAccountedFor()) + if (!m_spMemoryPool->AreAllBuffersAccountedFor()) throw TCoreException(eErr_InternalProblem, L"Some buffers are still in use", LOCATION); m_bDataSourceFinished = false; @@ -241,7 +305,7 @@ { if ((*iterCurrent)->IsLastPart()) { - m_spBuffers->AddBuffer(*iterCurrent); + m_spMemoryPool->AddBuffer(*iterCurrent); iterCurrent = m_setFullBuffers.erase(iterCurrent); } else @@ -274,7 +338,7 @@ } } - auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spBuffers->AddBuffer(pBuffer); }; + auto funcAdd = [&](TOverlappedDataBuffer* pBuffer) { m_spMemoryPool->AddBuffer(pBuffer); }; std::for_each(m_setFullBuffers.begin(), m_setFullBuffers.end(), funcAdd); std::for_each(m_setFinishedBuffers.begin(), m_setFinishedBuffers.end(), funcAdd); Index: src/libchcore/TOverlappedReaderWriter.h =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/TOverlappedReaderWriter.h (.../TOverlappedReaderWriter.h) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -22,31 +22,29 @@ #include "TEvent.h" #include "../liblogger/TLogFileData.h" #include "../liblogger/TLogger.h" -#include "TOverlappedDataBufferQueue.h" +#include "TOverlappedMemoryPool.h" +#include "TOrderedBufferQueue.h" +#include "IFilesystemFile.h" namespace chcore { - class TOverlappedDataBuffer; - - struct CompareBufferPositions - { - bool operator()(const TOverlappedDataBuffer* rBufferA, const TOverlappedDataBuffer* rBufferB); - }; - class TOverlappedReaderWriter { public: - explicit TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedDataBufferQueuePtr& spBuffers); + explicit TOverlappedReaderWriter(const logger::TLogFileDataPtr& spLogFileData, const TOverlappedMemoryPoolPtr& spBuffers, + file_size_t ullFilePos, DWORD dwChunkSize); TOverlappedReaderWriter(const TOverlappedReaderWriter&) = delete; ~TOverlappedReaderWriter(); TOverlappedReaderWriter& operator=(const TOverlappedReaderWriter&) = delete; // buffer management + void AddFailedReadBuffer(TOverlappedDataBuffer* pBuffer); void AddEmptyBuffer(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* GetEmptyBuffer(); void AddFullBuffer(TOverlappedDataBuffer* pBuffer); + void AddFailedFullBuffer(TOverlappedDataBuffer* pBuffer); TOverlappedDataBuffer* GetFullBuffer(); void AddFinishedBuffer(TOverlappedDataBuffer* pBuffer); @@ -61,7 +59,7 @@ bool IsDataWritingFinished() const { return m_bDataWritingFinished; } // event access - HANDLE GetEventReadPossibleHandle() const { return m_spBuffers->GetEventHasBuffers(); } + HANDLE GetEventReadPossibleHandle() const { return m_eventReadPossible.Handle(); } HANDLE GetEventWritePossibleHandle() const { return m_eventWritePossible.Handle(); } HANDLE GetEventWriteFinishedHandle() const { return m_eventWriteFinished.Handle(); } HANDLE GetEventAllBuffersAccountedFor() const { return m_eventAllBuffersAccountedFor.Handle(); } @@ -70,28 +68,30 @@ private: void CleanupBuffers(); + void UpdateReadPossibleEvent(); void UpdateWritePossibleEvent(); void UpdateWriteFinishedEvent(); void UpdateAllBuffersAccountedFor(); private: logger::TLoggerPtr m_spLog; - TOverlappedDataBufferQueuePtr m_spBuffers; + TOverlappedMemoryPoolPtr m_spMemoryPool; - using FullBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions >; - FullBuffersSet m_setFullBuffers; + TOrderedBufferQueue m_setEmptyBuffers; // initialized empty buffers + TOrderedBufferQueue m_setFullBuffers; + TOrderedBufferQueue m_setFinishedBuffers; - using FinishedBuffersSet = std::set < TOverlappedDataBuffer*, CompareBufferPositions >; - FinishedBuffersSet m_setFinishedBuffers; + bool m_bDataSourceFinished = false; // input file was already read to the end + bool m_bDataWritingFinished = false; // output file was already written to the end - bool m_bDataSourceFinished; // input file was already read to the end - bool m_bDataWritingFinished; // output file was already written to the end + DWORD m_dwDataChunkSize = 0; - unsigned long long m_ullNextReadBufferOrder; // next order id for read buffers - unsigned long long m_ullNextWriteBufferOrder; // next order id to be processed when writing - unsigned long long m_ullNextFinishedBufferOrder; // next order id to be processed when finishing writing + unsigned long long m_ullNextReadBufferOrder = 0; // next order id for read buffers + unsigned long long m_ullNextWriteBufferOrder = 0; // next order id to be processed when writing + unsigned long long m_ullNextFinishedBufferOrder = 0; // next order id to be processed when finishing writing + TEvent m_eventReadPossible; TEvent m_eventWritePossible; TEvent m_eventWriteFinished; TEvent m_eventAllBuffersAccountedFor; Index: src/libchcore/TSubTaskCopyMove.cpp =================================================================== diff -u -N -rb89aea376d35ce4b0d6506f7d04dba73830d9268 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/TSubTaskCopyMove.cpp (.../TSubTaskCopyMove.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -37,7 +37,7 @@ #include "TPathContainer.h" #include "TScopedRunningTimeTracker.h" #include "TFeedbackHandlerWrapper.h" -#include "TOverlappedDataBufferQueue.h" +#include "TOverlappedMemoryPool.h" #include "TOverlappedDataBuffer.h" #include "RoundingFunctions.h" #include @@ -53,15 +53,15 @@ struct CUSTOM_COPY_PARAMS { CUSTOM_COPY_PARAMS() : - spBuffer(std::make_shared()) + spMemoryPool(std::make_shared()) { } TFileInfoPtr spSrcFile; // CFileInfo - src file TSmartPath pathDstFile; // dest path with filename TBufferSizes tBufferSizes; - TOverlappedDataBufferQueuePtr spBuffer; // buffer handling + TOverlappedMemoryPoolPtr spMemoryPool; // buffer handling bool bOnlyCreate = false; // flag from configuration - skips real copying - only create bool bProcessed = false; // has the element been processed ? (false if skipped) }; @@ -150,7 +150,7 @@ // remove changes in buffer sizes to avoid re-creation later rCfgTracker.RemoveModificationSet(TOptionsSet() % eTO_DefaultBufferSize % eTO_OneDiskBufferSize % eTO_TwoDisksBufferSize % eTO_CDBufferSize % eTO_LANBufferSize % eTO_UseOnlyDefaultBuffer % eTO_BufferQueueDepth); - AdjustBufferIfNeeded(ccp.spBuffer, ccp.tBufferSizes, true); + AdjustBufferIfNeeded(ccp.spMemoryPool, ccp.tBufferSizes, true); bool bIgnoreFolders = GetTaskPropValue(rConfig); bool bForceDirectories = GetTaskPropValue(rConfig); @@ -359,20 +359,24 @@ else if(bSkip) return TSubTaskBase::eSubResult_Continue; - // let the buffer queue know that we change the data source - TOverlappedReaderWriter tReaderWriter(m_spLog->GetLogFileData(), pData->spBuffer); - // recreate buffer if needed - AdjustBufferIfNeeded(pData->spBuffer, pData->tBufferSizes); + AdjustBufferIfNeeded(pData->spMemoryPool, pData->tBufferSizes); ATLTRACE(_T("CustomCopyFile: %s\n"), pData->spSrcFile->GetFullFilePath().ToString()); // establish count of data to read TBufferSizes::EBufferType eBufferIndex = GetBufferIndex(pData->tBufferSizes, pData->spSrcFile); m_tSubTaskStats.SetCurrentBufferIndex(eBufferIndex); - DWORD dwToRead = RoundUp(pData->tBufferSizes.GetSizeByType(eBufferIndex), IFilesystemFile::MaxSectorSize); + // determine buffer size to use for the operation + DWORD dwCurrentBufferSize = RoundUp(pData->tBufferSizes.GetSizeByType(eBufferIndex), IFilesystemFile::MaxSectorSize); + // resume copying from the position after the last processed mark; the proper value should be set + // by OpenSrcAndDstFilesFB() - that includes the no-buffering setting if required. + unsigned long long ullNextReadPos = m_tSubTaskStats.GetCurrentItemProcessedSize(); + + TOverlappedReaderWriter tReaderWriter(m_spLog->GetLogFileData(), pData->spMemoryPool, ullNextReadPos, dwCurrentBufferSize); + // read data from file to buffer // NOTE: order is critical here: // - write finished is first, so that all the data that were already queued to be written, will be written and accounted for (in stats) @@ -388,10 +392,6 @@ tReaderWriter.GetEventReadPossibleHandle() }; - // resume copying from the position after the last processed mark; the proper value should be set - // by OpenSrcAndDstFilesFB() - that includes the no-buffering setting if required. - unsigned long long ullNextReadPos = m_tSubTaskStats.GetCurrentItemProcessedSize(); - bool bStopProcessing = false; while(!bStopProcessing) { @@ -421,9 +421,6 @@ if (!pBuffer) throw TCoreException(eErr_InternalProblem, L"Read was possible, but no buffer is available", LOCATION); - pBuffer->InitForRead(ullNextReadPos, dwToRead); - ullNextReadPos += dwToRead; - eResult = tFileFBWrapper.ReadFileFB(fileSrc, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); if(eResult != TSubTaskBase::eSubResult_Continue) { @@ -453,24 +450,7 @@ // read error encountered - handle it eResult = HandleReadError(spFeedbackHandler, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); if(eResult == TSubTaskBase::eSubResult_Retry) - { - // re-request read of the same data - eResult = tFileFBWrapper.ReadFileFB(fileSrc, *pBuffer, pData->spSrcFile->GetFullFilePath(), bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - } + tReaderWriter.AddFailedReadBuffer(pBuffer); else if(eResult != TSubTaskBase::eSubResult_Continue) { tReaderWriter.AddEmptyBuffer(pBuffer); @@ -520,23 +500,7 @@ { eResult = HandleWriteError(spFeedbackHandler, *pBuffer, pData->pathDstFile, bSkip); if(eResult == TSubTaskBase::eSubResult_Retry) - { - eResult = tFileFBWrapper.WriteFileFB(fileDst, *pBuffer, pData->pathDstFile, bSkip); - if(eResult != TSubTaskBase::eSubResult_Continue) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - bStopProcessing = true; - } - else if(bSkip) - { - tReaderWriter.AddEmptyBuffer(pBuffer); - - AdjustProcessedSizeForSkip(pData->spSrcFile); - - pData->bProcessed = false; - bStopProcessing = true; - } - } + tReaderWriter.AddFailedFullBuffer(pBuffer); else if(eResult != TSubTaskBase::eSubResult_Continue) { tReaderWriter.AddEmptyBuffer(pBuffer); @@ -788,7 +752,7 @@ return eResult; } - bool TSubTaskCopyMove::AdjustBufferIfNeeded(const TOverlappedDataBufferQueuePtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce) + bool TSubTaskCopyMove::AdjustBufferIfNeeded(const TOverlappedMemoryPoolPtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce) { const TConfig& rConfig = GetContext().GetConfig(); TTaskConfigTracker& rCfgTracker = GetContext().GetCfgTracker(); Index: src/libchcore/TSubTaskCopyMove.h =================================================================== diff -u -N -rb89aea376d35ce4b0d6506f7d04dba73830d9268 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/TSubTaskCopyMove.h (.../TSubTaskCopyMove.h) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -29,7 +29,7 @@ #include "TBufferSizes.h" #include "IFilesystemFile.h" #include "../liblogger/TLogger.h" -#include "TOverlappedDataBufferQueue.h" +#include "TOverlappedMemoryPool.h" namespace chcore { @@ -61,7 +61,7 @@ private: TBufferSizes::EBufferType GetBufferIndex(const TBufferSizes& rBufferSizes, const TFileInfoPtr& spFileInfo); - bool AdjustBufferIfNeeded(const TOverlappedDataBufferQueuePtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce = false); + bool AdjustBufferIfNeeded(const TOverlappedMemoryPoolPtr& spBuffer, TBufferSizes& rBufferSizes, bool bForce = false); ESubOperationResult CustomCopyFileFB(const IFeedbackHandlerPtr& spFeedbackHandler, CUSTOM_COPY_PARAMS* pData); Index: src/libchcore/Tests/TOverlappedDataBufferTests.cpp =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/Tests/TOverlappedDataBufferTests.cpp (.../TOverlappedDataBufferTests.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -28,11 +28,10 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(32768, &queue); - EXPECT_EQ(0, buffer.GetBufferOrder()); EXPECT_NE(nullptr, buffer.GetBufferPtr()); EXPECT_EQ(32768, buffer.GetBufferSize()); EXPECT_EQ(0, buffer.GetBytesTransferred()); @@ -48,8 +47,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(32768, &queue); buffer.ReinitializeBuffer(16384); @@ -62,8 +61,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.ReinitializeBuffer(32768); @@ -76,8 +75,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -89,8 +88,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRealDataSize(123); @@ -102,34 +101,21 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetLastPart(true); EXPECT_TRUE(buffer.IsLastPart()); } -TEST(TOverlappedDataBufferTests, SetBufferOrder_GetBufferOrder) -{ - logger::TLogFileDataPtr spLogData(std::make_shared()); - - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); - TOverlappedDataBuffer buffer(16384, &queue); - - buffer.SetBufferOrder(123); - - EXPECT_EQ(123, buffer.GetBufferOrder()); -} - TEST(TOverlappedDataBufferTests, SetErrorCode_GetErrorCode) { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetErrorCode(123); @@ -141,8 +127,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetStatusCode(123); @@ -154,8 +140,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetBytesTransferred(123); @@ -167,8 +153,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetFilePosition(123); @@ -181,8 +167,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -208,8 +194,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -231,8 +217,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.SetRequestedDataSize(123); @@ -259,8 +245,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); @@ -280,8 +266,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); @@ -301,8 +287,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter queue(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter queue(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer buffer(16384, &queue); buffer.InitForRead(0, 1024); Index: src/libchcore/Tests/TOverlappedReaderWriterTests.cpp =================================================================== diff -u -N -rb89aea376d35ce4b0d6506f7d04dba73830d9268 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision b89aea376d35ce4b0d6506f7d04dba73830d9268) +++ src/libchcore/Tests/TOverlappedReaderWriterTests.cpp (.../TOverlappedReaderWriterTests.cpp) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -25,8 +25,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared()); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared()); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_EQ(nullptr, tReaderWriter.GetEmptyBuffer()); EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); @@ -51,8 +51,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_NE(nullptr, tReaderWriter.GetEmptyBuffer()); EXPECT_EQ(nullptr, tReaderWriter.GetFullBuffer()); @@ -74,8 +74,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); @@ -91,8 +91,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); tReaderWriter.GetEmptyBuffer(); @@ -103,8 +103,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_THROW(spBuffers->ReinitializeBuffers(3, 0), TCoreException); } @@ -113,8 +113,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); spBuffers->ReinitializeBuffers(3, 32768); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; @@ -131,8 +131,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); spBuffers->ReinitializeBuffers(3, 65536); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; @@ -149,8 +149,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 65536)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 65536)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); spBuffers->ReinitializeBuffers(3, 32768); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; @@ -167,8 +167,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); spBuffers->ReinitializeBuffers(5, 32768); EXPECT_EQ(5, spBuffers->GetTotalBufferCount()); @@ -187,8 +187,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(5, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(5, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); spBuffers->ReinitializeBuffers(3, 32768); EXPECT_EQ(3, spBuffers->GetTotalBufferCount()); @@ -206,8 +206,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_SIGNALED(tReaderWriter.GetEventReadPossibleHandle()); @@ -227,8 +227,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; @@ -248,8 +248,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_THROW(tReaderWriter.AddEmptyBuffer(nullptr), TCoreException); } @@ -259,8 +259,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); tReaderWriter.AddFullBuffer(pBuffer); @@ -274,8 +274,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; tReaderWriter.AddFullBuffer(pBuffers[1]); @@ -292,8 +292,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[1]->SetLastPart(true); @@ -309,8 +309,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[2]->SetLastPart(true); @@ -334,8 +334,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_THROW(tReaderWriter.AddFullBuffer(nullptr), TCoreException); } @@ -344,8 +344,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); pBuffer->InitForRead(0, 1280); @@ -360,19 +360,19 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[0]->InitForRead(0, 1000); pBuffers[0]->SetBytesTransferred(1000); pBuffers[0]->SetStatusCode(0); - pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->InitForRead(1000, 1200); pBuffers[1]->SetBytesTransferred(1200); pBuffers[1]->SetStatusCode(0); - pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->InitForRead(2200, 1400); pBuffers[2]->SetBytesTransferred(800); pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); @@ -394,19 +394,19 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[0]->InitForRead(0, 1000); pBuffers[0]->SetBytesTransferred(1000); pBuffers[0]->SetStatusCode(0); - pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->InitForRead(1000, 1200); pBuffers[1]->SetBytesTransferred(1200); pBuffers[1]->SetStatusCode(0); - pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->InitForRead(2200, 1400); pBuffers[2]->SetBytesTransferred(800); pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); @@ -423,19 +423,19 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; - pBuffers[0]->InitForRead(0, 1000); - pBuffers[0]->SetBytesTransferred(1000); + pBuffers[0]->InitForRead(0, 4096); + pBuffers[0]->SetBytesTransferred(4096); pBuffers[0]->SetStatusCode(0); - pBuffers[1]->InitForRead(0, 1200); - pBuffers[1]->SetBytesTransferred(1200); + pBuffers[1]->InitForRead(4096, 4096); + pBuffers[1]->SetBytesTransferred(4096); pBuffers[1]->SetStatusCode(0); - pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->InitForRead(8192, 4096); pBuffers[2]->SetBytesTransferred(800); pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); @@ -464,19 +464,19 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[0]->InitForRead(0, 1000); pBuffers[0]->SetBytesTransferred(1000); pBuffers[0]->SetStatusCode(0); - pBuffers[1]->InitForRead(0, 1200); + pBuffers[1]->InitForRead(1000, 1200); pBuffers[1]->SetBytesTransferred(1200); pBuffers[1]->SetStatusCode(0); - pBuffers[2]->InitForRead(0, 1400); + pBuffers[2]->InitForRead(2200, 1400); pBuffers[2]->SetBytesTransferred(800); pBuffers[2]->SetStatusCode(0); pBuffers[2]->SetLastPart(true); @@ -495,8 +495,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); EXPECT_THROW(tReaderWriter.AddFinishedBuffer(nullptr), TCoreException); } @@ -505,8 +505,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffer = tReaderWriter.GetEmptyBuffer(); tReaderWriter.AddFinishedBuffer(pBuffer); EXPECT_THROW(tReaderWriter.AddFinishedBuffer(pBuffer), TCoreException); @@ -517,8 +517,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); TOverlappedDataBuffer* pBuffers[3] = { tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer(), tReaderWriter.GetEmptyBuffer() }; pBuffers[0]->SetLastPart(true); @@ -537,8 +537,8 @@ { logger::TLogFileDataPtr spLogData(std::make_shared()); - TOverlappedDataBufferQueuePtr spBuffers(std::make_shared(3, 32768)); - TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers); + TOverlappedMemoryPoolPtr spBuffers(std::make_shared(3, 32768)); + TOverlappedReaderWriter tReaderWriter(spLogData, spBuffers, 0, 4096); tReaderWriter.GetEmptyBuffer(); Index: src/libchcore/libchcore.vc140.vcxproj =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/libchcore.vc140.vcxproj (.../libchcore.vc140.vcxproj) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -507,7 +507,8 @@ - + + @@ -749,7 +750,8 @@ - + + Index: src/libchcore/libchcore.vc140.vcxproj.filters =================================================================== diff -u -N -rd23ed007b8142c6faf6d8cad4a421ac243ef0146 -r1506d51ff1c0a5d156dab398051efc0c87473e81 --- src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision d23ed007b8142c6faf6d8cad4a421ac243ef0146) +++ src/libchcore/libchcore.vc140.vcxproj.filters (.../libchcore.vc140.vcxproj.filters) (revision 1506d51ff1c0a5d156dab398051efc0c87473e81) @@ -353,9 +353,6 @@ Source Files\Tools\Data Buffer - - Source Files\Tools\Data Buffer - Source Files\Tools @@ -467,6 +464,12 @@ Source Files\Tools\Data Buffer + + Source Files\Tools\Data Buffer + + + Source Files\Tools\Data Buffer + @@ -739,9 +742,6 @@ Source Files\Tools\Data Buffer - - Source Files\Tools\Data Buffer - Source Files\Tools @@ -853,5 +853,11 @@ Source Files\Tools\Data Buffer + + Source Files\Tools\Data Buffer + + + Source Files\Tools\Data Buffer + \ No newline at end of file