Clone
ixen <ixen@copyhandler.com>
committed
on 26 Oct 16
Split ordered queue with position locking from simple version of ordered queue (CH-270).
ParallelizeReaderWriter + 4 more
src/libchcore/TOrderedBufferQueue.cpp (+0 -28)
6 6 //  it under the terms of the GNU Library General Public License
7 7 //  (version 2) as published by the Free Software Foundation;
8 8 //
9 9 //  This program is distributed in the hope that it will be useful,
10 10 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 12 //  GNU General Public License for more details.
13 13 //
14 14 //  You should have received a copy of the GNU Library General Public
15 15 //  License along with this program; if not, write to the
16 16 //  Free Software Foundation, Inc.,
17 17 //  59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18 18 // ============================================================================
19 19 #include "stdafx.h"
20 20 #include "TOrderedBufferQueue.h"
21 21 #include "TOverlappedDataBuffer.h"
22 22 #include "TCoreException.h"
23 23
24 24 namespace chcore
25 25 {
26           TOrderedBufferQueue::TOrderedBufferQueue() :
27                   m_eventHasBuffers(true, false)
28           {
29           }
30  
31 26         TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) :
32 27                 m_eventHasBuffers(true, false),
33 28                 m_ullExpectedBufferPosition(ullExpectedPosition)
34 29         {
35 30         }
36 31
37 32         void TOrderedBufferQueue::Push(TOverlappedDataBuffer* pBuffer)
38 33         {
39 34                 auto pairInsert = m_setBuffers.insert(pBuffer);
40 35                 if (!pairInsert.second)
41 36                         throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION);
42 37
43 38                 UpdateHasBuffers();
44 39         }
45 40
46 41         TOverlappedDataBuffer* TOrderedBufferQueue::Pop()
47 42         {
48 43                 if(!IsBufferReady())
49 44                         return nullptr;
50 45
 
54 49                 if(!pBuffer->HasError() && m_ullExpectedBufferPosition != NoPosition)
55 50                         m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize();
56 51
57 52                 UpdateHasBuffers();
58 53
59 54                 return pBuffer;
60 55         }
61 56
62 57         const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const
63 58         {
64 59                 if(!m_setBuffers.empty())
65 60                         return *m_setBuffers.begin();
66 61                 return nullptr;
67 62         }
68 63
69 64         bool TOrderedBufferQueue::IsBufferReady() const
70 65         {
71 66                 return (!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || (*m_setBuffers.begin())->GetFilePosition() == m_ullExpectedBufferPosition));
72 67         }
73 68
74           void TOrderedBufferQueue::Clear()
75           {
76                   m_setBuffers.clear();
77                   m_ullExpectedBufferPosition = NoPosition;
78                   m_eventHasBuffers.ResetEvent();
79           }
80  
81           std::vector<TOverlappedDataBuffer*> TOrderedBufferQueue::GetUnneededLastParts()
82           {
83                   auto iterFind = std::find_if(m_setBuffers.begin(), m_setBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return pBuffer->IsLastPart(); });
84                   if(iterFind == m_setBuffers.end() || ++iterFind == m_setBuffers.end())
85                           return std::vector<TOverlappedDataBuffer*>();
86  
87                   auto iterInvalidParts = std::find_if(iterFind, m_setBuffers.end(), [](TOverlappedDataBuffer* pBuffer) { return !pBuffer->IsLastPart(); });
88                   if(iterInvalidParts != m_setBuffers.end())
89                           throw TCoreException(eErr_InvalidArgument, L"Found non-last-parts after last-part", LOCATION);
90  
91                   std::vector<TOverlappedDataBuffer*> vBuffers(iterFind, m_setBuffers.end());
92                   m_setBuffers.erase(iterFind, m_setBuffers.end());
93  
94                   return vBuffers;
95           }
96  
97 69         size_t TOrderedBufferQueue::GetCount() const
98 70         {
99 71                 return m_setBuffers.size();
100 72         }
101 73
102 74         bool TOrderedBufferQueue::IsEmpty() const
103 75         {
104 76                 return m_setBuffers.empty();
105 77         }
106 78
107 79         HANDLE TOrderedBufferQueue::GetHasBuffersEvent() const
108 80         {
109 81                 return m_eventHasBuffers.Handle();
110 82         }
111 83
112 84         void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers)
113 85         {
114 86                 for(TOverlappedDataBuffer* pBuffer : m_setBuffers)
115 87                 {
116 88                         spBuffers->Push(pBuffer);