Crypto++
network.cpp
1 // network.cpp - written and placed in the public domain by Wei Dai
2 
3 #include "pch.h"
4 #include "network.h"
5 #include "wait.h"
6 
7 #define CRYPTOPP_TRACE_NETWORK 0
8 
9 NAMESPACE_BEGIN(CryptoPP)
10 
11 #ifdef HIGHRES_TIMER_AVAILABLE
12 
13 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
14 {
15  if (!m_maxBytesPerSecond)
16  return ULONG_MAX;
17 
18  double curTime = GetCurTimeAndCleanUp();
19  lword total = 0;
20  for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
21  total += m_ops[i].second;
22  return SaturatingSubtract(m_maxBytesPerSecond, total);
23 }
24 
25 double LimitedBandwidth::TimeToNextTransceive()
26 {
27  if (!m_maxBytesPerSecond)
28  return 0;
29 
30  if (!m_nextTransceiveTime)
31  ComputeNextTransceiveTime();
32 
33  return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
34 }
35 
36 void LimitedBandwidth::NoteTransceive(lword size)
37 {
38  if (m_maxBytesPerSecond)
39  {
40  double curTime = GetCurTimeAndCleanUp();
41  m_ops.push_back(std::make_pair(curTime, size));
42  m_nextTransceiveTime = 0;
43  }
44 }
45 
46 void LimitedBandwidth::ComputeNextTransceiveTime()
47 {
48  double curTime = GetCurTimeAndCleanUp();
49  lword total = 0;
50  for (unsigned int i=0; i!=m_ops.size(); ++i)
51  total += m_ops[i].second;
52  m_nextTransceiveTime =
53  (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
54 }
55 
56 double LimitedBandwidth::GetCurTimeAndCleanUp()
57 {
58  if (!m_maxBytesPerSecond)
59  return 0;
60 
61  double curTime = m_timer.ElapsedTimeAsDouble();
62  while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
63  m_ops.pop_front();
64  return curTime;
65 }
66 
67 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
68 {
69  double nextTransceiveTime = TimeToNextTransceive();
70  if (nextTransceiveTime)
71  container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
72 }
73 
74 // *************************************************************
75 
77  lword& byteCount, bool blockingOutput,
78  unsigned long maxTime, bool checkDelimiter, byte delimiter)
79 {
80  m_blockedBySpeedLimit = false;
81 
82  if (!GetMaxBytesPerSecond())
83  {
84  size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
85  m_doPumpBlocked = (ret != 0);
86  return ret;
87  }
88 
89  bool forever = (maxTime == INFINITE_TIME);
90  unsigned long timeToGo = maxTime;
91  Timer timer(Timer::MILLISECONDS, forever);
92  lword maxSize = byteCount;
93  byteCount = 0;
94 
95  timer.StartTimer();
96 
97  while (true)
98  {
99  lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
100 
101  if (curMaxSize || m_doPumpBlocked)
102  {
103  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
104  size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
105  m_doPumpBlocked = (ret != 0);
106  if (curMaxSize)
107  {
108  NoteTransceive(curMaxSize);
109  byteCount += curMaxSize;
110  }
111  if (ret)
112  return ret;
113  }
114 
115  if (maxSize != ULONG_MAX && byteCount >= maxSize)
116  break;
117 
118  if (!forever)
119  {
120  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
121  if (!timeToGo)
122  break;
123  }
124 
125  double waitTime = TimeToNextTransceive();
126  if (!forever && waitTime > timeToGo)
127  {
128  m_blockedBySpeedLimit = true;
129  break;
130  }
131 
132  WaitObjectContainer container;
133  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
134  container.Wait((unsigned long)waitTime);
135  }
136 
137  return 0;
138 }
139 
140 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
141 {
142  if (messageCount == 0)
143  return 0;
144 
145  messageCount = 0;
146 
147  lword byteCount;
148  do {
149  byteCount = LWORD_MAX;
150  RETURN_IF_NONZERO(Pump2(byteCount, blocking));
151  } while(byteCount == LWORD_MAX);
152 
153  if (!m_messageEndSent && SourceExhausted())
154  {
155  RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
156  m_messageEndSent = true;
157  messageCount = 1;
158  }
159  return 0;
160 }
161 
162 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
163 {
164  m_blockedBySpeedLimit = false;
165 
166  size_t curBufSize = GetCurrentBufferSize();
167  if (curBufSize <= targetSize && (targetSize || !EofPending()))
168  return 0;
169 
170  if (!GetMaxBytesPerSecond())
171  return DoFlush(maxTime, targetSize);
172 
173  bool forever = (maxTime == INFINITE_TIME);
174  unsigned long timeToGo = maxTime;
175  Timer timer(Timer::MILLISECONDS, forever);
176  lword totalFlushed = 0;
177 
178  timer.StartTimer();
179 
180  while (true)
181  {
182  size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
183  if (flushSize || EofPending())
184  {
185  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
186  size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
187  if (ret)
188  {
189  NoteTransceive(ret);
190  curBufSize -= ret;
191  totalFlushed += ret;
192  }
193  }
194 
195  if (curBufSize <= targetSize && (targetSize || !EofPending()))
196  break;
197 
198  if (!forever)
199  {
200  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
201  if (!timeToGo)
202  break;
203  }
204 
205  double waitTime = TimeToNextTransceive();
206  if (!forever && waitTime > timeToGo)
207  {
208  m_blockedBySpeedLimit = true;
209  break;
210  }
211 
212  WaitObjectContainer container;
213  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
214  container.Wait((unsigned long)waitTime);
215  }
216 
217  return totalFlushed;
218 }
219 
220 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
221 {
222  TimedFlush(blocking ? INFINITE_TIME : 0);
223  return hardFlush && (!!GetCurrentBufferSize() || EofPending());
224 }
225 
226 // *************************************************************
227 
228 NetworkSource::NetworkSource(BufferedTransformation *attachment)
229  : NonblockingSource(attachment), m_buf(1024*16)
230  , m_waitingForResult(false), m_outputBlocked(false)
231  , m_dataBegin(0), m_dataEnd(0)
232 {
233 }
234 
236 {
238  + GetReceiver().GetMaxWaitObjectCount()
240 }
241 
243 {
244  if (BlockedBySpeedLimit())
245  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
246  else if (!m_outputBlocked)
247  {
248  if (m_dataBegin == m_dataEnd)
249  AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
250  else
251  container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
252  }
253 
254  AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
255 }
256 
257 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
258 {
259  NetworkReceiver &receiver = AccessReceiver();
260 
261  lword maxSize = byteCount;
262  byteCount = 0;
263  bool forever = maxTime == INFINITE_TIME;
264  Timer timer(Timer::MILLISECONDS, forever);
266 
267  if (m_outputBlocked)
268  goto DoOutput;
269 
270  while (true)
271  {
272  if (m_dataBegin == m_dataEnd)
273  {
274  if (receiver.EofReceived())
275  break;
276 
277  if (m_waitingForResult)
278  {
279  if (receiver.MustWaitForResult() &&
280  !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
281  CallStack("NetworkSource::DoPump() - wait receive result", 0)))
282  break;
283 
284  unsigned int recvResult = receiver.GetReceiveResult();
285 #if CRYPTOPP_TRACE_NETWORK
286  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
287 #endif
288  m_dataEnd += recvResult;
289  m_waitingForResult = false;
290 
291  if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
292  goto ReceiveNoWait;
293  }
294  else
295  {
296  m_dataEnd = m_dataBegin = 0;
297 
298  if (receiver.MustWaitToReceive())
299  {
300  if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
301  CallStack("NetworkSource::DoPump() - wait receive", 0)))
302  break;
303 
304  receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
305  m_waitingForResult = true;
306  }
307  else
308  {
309 ReceiveNoWait:
310  m_waitingForResult = true;
311  // call Receive repeatedly as long as data is immediately available,
312  // because some receivers tend to return data in small pieces
313 #if CRYPTOPP_TRACE_NETWORK
314  OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
315 #endif
316  while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
317  {
318  unsigned int recvResult = receiver.GetReceiveResult();
319 #if CRYPTOPP_TRACE_NETWORK
320  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
321 #endif
322  m_dataEnd += recvResult;
323  if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
324  {
325  m_waitingForResult = false;
326  break;
327  }
328  }
329  }
330  }
331  }
332  else
333  {
334  m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
335 
336  if (checkDelimiter)
337  m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
338 
339 DoOutput:
340  size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
341  if (result)
342  {
343  if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
344  CallStack("NetworkSource::DoPump() - wait attachment", 0)))
345  goto DoOutput;
346  else
347  {
348  m_outputBlocked = true;
349  return result;
350  }
351  }
352  m_outputBlocked = false;
353 
354  byteCount += m_putSize;
355  m_dataBegin += m_putSize;
356  if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
357  break;
358  if (maxSize != ULONG_MAX && byteCount == maxSize)
359  break;
360  // once time limit is reached, return even if there is more data waiting
361  // but make 0 a special case so caller can request a large amount of data to be
362  // pumped as long as it is immediately available
363  if (maxTime > 0 && timer.ElapsedTime() > maxTime)
364  break;
365  }
366  }
367 
368  return 0;
369 }
370 
371 // *************************************************************
372 
373 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
374  : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
375  , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
376  , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
377  , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
378  , m_currentSpeed(0), m_maxObservedSpeed(0)
379 {
380 }
381 
383 {
384  if (m_speedTimer.ElapsedTime() > 1000)
385  {
386  m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
387  m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
388  m_byteCountSinceLastTimerReset = 0;
389  m_speedTimer.StartTimer();
390 // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
391  }
392  return m_currentSpeed;
393 }
394 
396 {
397  lword m = GetMaxBytesPerSecond();
398  return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
399 }
400 
402 {
404 }
405 
407 {
408  if (BlockedBySpeedLimit())
409  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
410  else if (m_wasBlocked)
411  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
412  else if (!m_buffer.IsEmpty())
413  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
414  else if (EofPending())
415  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
416 }
417 
418 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
419 {
420  if (m_eofState == EOF_DONE)
421  {
422  if (length || messageEnd)
423  throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
424 
425  return 0;
426  }
427 
428  if (m_eofState > EOF_NONE)
429  goto EofSite;
430 
431  {
432  if (m_skipBytes)
433  {
434  assert(length >= m_skipBytes);
435  inString += m_skipBytes;
436  length -= m_skipBytes;
437  }
438 
439  m_buffer.Put(inString, length);
440 
441  if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
442  TimedFlush(0, 0);
443 
444  size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
445  if (blocking)
446  TimedFlush(INFINITE_TIME, targetSize);
447 
448  if (m_buffer.CurrentSize() > targetSize)
449  {
450  assert(!blocking);
451  m_wasBlocked = true;
452  m_skipBytes += length;
453  size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
454  return STDMAX<size_t>(blockedBytes, 1);
455  }
456 
457  m_wasBlocked = false;
458  m_skipBytes = 0;
459  }
460 
461  if (messageEnd)
462  {
463  m_eofState = EOF_PENDING_SEND;
464 
465  EofSite:
466  TimedFlush(blocking ? INFINITE_TIME : 0, 0);
467  if (m_eofState != EOF_DONE)
468  return 1;
469  }
470 
471  return 0;
472 }
473 
474 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
475 {
476  NetworkSender &sender = AccessSender();
477 
478  bool forever = maxTime == INFINITE_TIME;
479  Timer timer(Timer::MILLISECONDS, forever);
480  unsigned int totalFlushSize = 0;
481 
482  while (true)
483  {
484  if (m_buffer.CurrentSize() <= targetSize)
485  break;
486 
487  if (m_needSendResult)
488  {
489  if (sender.MustWaitForResult() &&
490  !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
491  CallStack("NetworkSink::DoFlush() - wait send result", 0)))
492  break;
493 
494  unsigned int sendResult = sender.GetSendResult();
495 #if CRYPTOPP_TRACE_NETWORK
496  OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
497 #endif
498  m_buffer.Skip(sendResult);
499  totalFlushSize += sendResult;
500  m_needSendResult = false;
501 
502  if (!m_buffer.AnyRetrievable())
503  break;
504  }
505 
506  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
507  if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
508  break;
509 
510  size_t contiguousSize = 0;
511  const byte *block = m_buffer.Spy(contiguousSize);
512 
513 #if CRYPTOPP_TRACE_NETWORK
514  OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
515 #endif
516  sender.Send(block, contiguousSize);
517  m_needSendResult = true;
518 
519  if (maxTime > 0 && timeOut == 0)
520  break; // once time limit is reached, return even if there is more data waiting
521  }
522 
523  m_byteCountSinceLastTimerReset += totalFlushSize;
525 
526  if (m_buffer.IsEmpty() && !m_needSendResult)
527  {
528  if (m_eofState == EOF_PENDING_SEND)
529  {
530  sender.SendEof();
531  m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
532  }
533 
534  while (m_eofState == EOF_PENDING_DELIVERY)
535  {
536  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
537  if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
538  break;
539 
540  if (sender.EofSent())
541  m_eofState = EOF_DONE;
542  }
543  }
544 
545  return totalFlushSize;
546 }
547 
548 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
549 
550 NAMESPACE_END
base class for all exceptions thrown by Crypto++
Definition: cryptlib.h:110
container of wait objects
Definition: wait.h:146
float GetMaxObservedSpeed() const
get the maximum observed speed of this sink in bytes per second
Definition: network.cpp:395
high resolution timer
Definition: hrtimer.h:51
lword TimedFlush(unsigned long maxTime, size_t targetSize=0)
flush to device for no more than maxTime milliseconds
Definition: network.cpp:162
some error not belong to any of the above categories
Definition: cryptlib.h:128
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
put wait objects into container
Definition: network.cpp:242
interface for buffered transformations
Definition: cryptlib.h:771
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
input multiple bytes for blocking or non-blocking processing
Definition: network.cpp:418
unsigned int GetMaxWaitObjectCount() const
maximum number of wait objects that this object can return
Definition: network.cpp:401
virtual unsigned int GetMaxWaitObjectCount() const =0
maximum number of wait objects that this object can return
virtual void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)=0
put wait objects into container
size_t Put(byte inByte, bool blocking=true)
input a byte for processing
Definition: cryptlib.h:785
size_t GeneralPump2(lword &byteCount, bool blockingOutput=true, unsigned long maxTime=INFINITE_TIME, bool checkDelimiter=false, byte delimiter='\n')
pump up to maxSize bytes using at most maxTime milliseconds
Definition: network.cpp:76
BufferedTransformation * AttachedTransformation()
returns the object immediately attached to this object or NULL for no attachment
Definition: filters.cpp:26
float ComputeCurrentSpeed()
compute the current speed of this sink in bytes per second
Definition: network.cpp:382
virtual lword Skip(lword skipMax=LWORD_MAX)
discard skipMax bytes from the output buffer
Definition: cryptlib.cpp:443
const unsigned long INFINITE_TIME
used to represent infinite time
Definition: cryptlib.h:96
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
put wait objects into container
Definition: cryptlib.cpp:321
unsigned int GetMaxWaitObjectCount() const
Definition: network.h:38
bool Wait(unsigned long milliseconds, CallStack const &callStack)
wait on this object
Definition: wait.cpp:388
bool AnyRetrievable() const
returns whether any bytes are currently ready for retrieval
Definition: queue.h:25
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
put wait objects into container
Definition: network.cpp:406
unsigned int GetMaxWaitObjectCount() const
maximum number of wait objects that this object can return
Definition: cryptlib.cpp:315
unsigned int GetMaxWaitObjectCount() const
maximum number of wait objects that this object can return
Definition: network.cpp:235
virtual bool Receive(byte *buf, size_t bufLen)=0
receive data from network source, returns whether result is immediately available ...
Network Receiver.
Definition: network.h:94
virtual size_t PutModifiable2(byte *inString, size_t length, int messageEnd, bool blocking)
input multiple bytes that may be modified by callee for blocking or non-blocking processing ...
Definition: cryptlib.h:817
a Source class that can pump from a device for a specified amount of time.
Definition: network.h:55
Network Sender.
Definition: network.h:152