Clone
ixen <ixen@copyhandler.com>
committed
on 28 Oct 16
Added better error-checking. Fixed several hang/crash scenarios. (CH-270)
ParallelizeReaderWriter + 4 more
src/libchcore/TOrderedBufferQueue.cpp (+13 -10)
15 15 //  License along with this program; if not, write to the
16 16 //  Free Software Foundation, Inc.,
17 17 //  59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18 18 // ============================================================================
19 19 #include "stdafx.h"
20 20 #include "TOrderedBufferQueue.h"
21 21 #include "TOverlappedDataBuffer.h"
22 22 #include "TCoreException.h"
23 23
24 24 namespace chcore
25 25 {
26 26         TOrderedBufferQueue::TOrderedBufferQueue(unsigned long long ullExpectedPosition) :
27 27                 m_eventHasBuffers(true, false),
28 28                 m_eventHasError(true, false),
29 29                 m_ullExpectedBufferPosition(ullExpectedPosition)
30 30         {
31 31         }
32 32
33 33         void TOrderedBufferQueue::Push(TOverlappedDataBuffer* pBuffer)
34 34         {
  35                 if(!pBuffer)
  36                         throw TCoreException(eErr_InvalidArgument, L"pBuffer is NULL", LOCATION);
35 37                 if(pBuffer->HasError())
36 38                         throw TCoreException(eErr_InvalidArgument, L"Cannot push buffer with error", LOCATION);
37 39
38 40                 auto pairInsert = m_setBuffers.insert(pBuffer);
39 41                 if (!pairInsert.second)
40 42                         throw TCoreException(eErr_InvalidArgument, L"Tried to insert duplicate buffer into the collection", LOCATION);
41 43
42 44                 if(pBuffer->GetFilePosition() == m_ullErrorPosition)
43 45                 {
44 46                         if(m_pFirstErrorBuffer != nullptr)
45 47                                 throw TCoreException(eErr_InternalProblem, L"Buffer with error was not retrieved prior to adding same-by-position buffer without error", LOCATION);
46 48                         m_ullErrorPosition = NoPosition;
47 49                         UpdateHasErrors();
48 50                 }
49 51
50 52                 UpdateHasBuffers();
51 53         }
52 54
53 55         TOverlappedDataBuffer* TOrderedBufferQueue::Pop()
54 56         {
55                   if(!IsBufferReady())
  57                 if(!HasPoppableBuffer())
56 58                         return nullptr;
57 59
58 60                 TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin();
59 61                 m_setBuffers.erase(m_setBuffers.begin());
60 62
61                   if(!pBuffer->HasError() && m_ullExpectedBufferPosition != NoPosition)
62 63                 m_ullExpectedBufferPosition += pBuffer->GetRequestedDataSize();
63 64
64 65                 UpdateHasBuffers();
65 66
66 67                 return pBuffer;
67 68         }
68 69
69 70         TOverlappedDataBuffer* TOrderedBufferQueue::PopError()
70 71         {
71 72                 if(!m_pFirstErrorBuffer)
72 73                         return nullptr;
73 74
74 75                 TOverlappedDataBuffer* pBuffer = m_pFirstErrorBuffer;
75 76                 m_pFirstErrorBuffer = nullptr;
76 77                 UpdateHasErrors();
77 78
78 79                 return pBuffer;
79 80         }
80 81
81 82         const TOverlappedDataBuffer* const TOrderedBufferQueue::Peek() const
82 83         {
83 84                 if(!m_setBuffers.empty())
84 85                         return *m_setBuffers.begin();
85 86                 return nullptr;
86 87         }
87 88
88           bool TOrderedBufferQueue::IsBufferReady() const
89           {
90                   return (!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || (*m_setBuffers.begin())->GetFilePosition() == m_ullExpectedBufferPosition));
91           }
92  
93 89         size_t TOrderedBufferQueue::GetCount() const
94 90         {
95 91                 return m_setBuffers.size() + (m_pFirstErrorBuffer ? 1 : 0);
96 92         }
97 93
98 94         bool TOrderedBufferQueue::IsEmpty() const
99 95         {
100 96                 return m_setBuffers.empty();
101 97         }
102 98
103 99         bool TOrderedBufferQueue::HasPoppableBuffer() const
104 100         {
105                   return !m_setBuffers.empty() && (*m_setBuffers.begin())->GetFilePosition() == m_ullExpectedBufferPosition;
  101                 if(m_setBuffers.empty())
  102                         return false;
  103
  104                 TOverlappedDataBuffer* pBuffer = *m_setBuffers.begin();
  105                 return pBuffer->GetFilePosition() == m_ullExpectedBufferPosition;
106 106         }
107 107
108 108         HANDLE TOrderedBufferQueue::GetHasBuffersEvent() const
109 109         {
110 110                 return m_eventHasBuffers.Handle();
111 111         }
112 112
113 113         HANDLE TOrderedBufferQueue::GetHasErrorEvent() const
114 114         {
115 115                 return m_eventHasError.Handle();
116 116         }
117 117
118 118         void TOrderedBufferQueue::ReleaseBuffers(const TBufferListPtr& spBuffers)
119 119         {
  120                 if(!spBuffers)
  121                         throw TCoreException(eErr_InvalidArgument, L"spBuffers is NULL", LOCATION);
  122
120 123                 for(TOverlappedDataBuffer* pBuffer : m_setBuffers)
121 124                 {
122 125                         spBuffers->Push(pBuffer);
123 126                 }
124 127                 m_setBuffers.clear();
125 128
126 129                 if(m_pFirstErrorBuffer)
127 130                 {
128 131                         spBuffers->Push(m_pFirstErrorBuffer);
129 132                         m_pFirstErrorBuffer = nullptr;
130 133                         m_ullErrorPosition = NoPosition;
131 134                 }
132 135
133 136                 UpdateHasBuffers();
134 137                 UpdateHasErrors();
135 138         }
136 139
137 140         void TOrderedBufferQueue::UpdateHasBuffers()
138 141         {
139                   if(!m_setBuffers.empty() && (m_ullExpectedBufferPosition == NoPosition || (*m_setBuffers.begin())->GetFilePosition() == m_ullExpectedBufferPosition))
  142                 if(HasPoppableBuffer())
140 143                 {
141 144                         m_eventHasBuffers.SetEvent();
142 145                         m_notifier(true);
143 146                 }
144 147                 else
145 148                 {
146 149                         m_eventHasBuffers.ResetEvent();
147 150                         m_notifier(false);
148 151                 }
149 152         }
150 153
151 154         void TOrderedBufferQueue::UpdateHasErrors()
152 155         {
153 156                 m_eventHasError.SetEvent(m_pFirstErrorBuffer != nullptr);
154 157         }
155 158
156 159         boost::signals2::signal<void(bool bAdded)>& TOrderedBufferQueue::GetNotifier()
157 160         {
158 161                 return m_notifier;
159 162         }