Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

network.cpp

00001 // network.cpp - written and placed in the public domain by Wei Dai
00002 
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006 
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008 
00009 NAMESPACE_BEGIN(CryptoPP)
00010 
00011 unsigned int NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00012 {
00013         if (messageCount == 0)
00014                 return 0;
00015 
00016         unsigned long byteCount = ULONG_MAX;
00017         messageCount = 0;
00018         RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00019         if (!m_messageEndSent && SourceExhausted())
00020         {
00021                 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00022                 m_messageEndSent = true;
00023                 messageCount = 1;
00024         }
00025         return 0;
00026 }
00027 
00028 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00029 {
00030         TimedFlush(blocking ? INFINITE_TIME : 0);
00031         return hardFlush && !!GetCurrentBufferSize();
00032 }
00033 
00034 // *************************************************************
00035 
00036 #ifdef HIGHRES_TIMER_AVAILABLE
00037 
00038 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00039         : NonblockingSource(attachment), m_buf(1024*16)
00040         , m_waitingForResult(false), m_outputBlocked(false)
00041         , m_dataBegin(0), m_dataEnd(0)
00042 {
00043 }
00044 
00045 void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
00046 {
00047         if (!m_outputBlocked)
00048         {
00049                 if (m_dataBegin == m_dataEnd)
00050                         AccessReceiver().GetWaitObjects(container); 
00051                 else
00052                         container.SetNoWait();
00053         }
00054         AttachedTransformation()->GetWaitObjects(container);
00055 }
00056 
00057 unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00058 {
00059         NetworkReceiver &receiver = AccessReceiver();
00060 
00061         unsigned long maxSize = byteCount;
00062         byteCount = 0;
00063         bool forever = maxTime == INFINITE_TIME;
00064         Timer timer(Timer::MILLISECONDS, forever);
00065         BufferedTransformation *t = AttachedTransformation();
00066 
00067         if (m_outputBlocked)
00068                 goto DoOutput;
00069 
00070         while (true)
00071         {
00072                 if (m_dataBegin == m_dataEnd)
00073                 {
00074                         if (receiver.EofReceived())
00075                                 break;
00076 
00077                         if (m_waitingForResult)
00078                         {
00079                                 if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00080                                         break;
00081 
00082                                 unsigned int recvResult = receiver.GetReceiveResult();
00083 #if CRYPTOPP_TRACE_NETWORK
00084                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00085 #endif
00086                                 m_dataEnd += recvResult;
00087                                 m_waitingForResult = false;
00088 
00089                                 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00090                                         goto ReceiveNoWait;
00091                         }
00092                         else
00093                         {
00094                                 m_dataEnd = m_dataBegin = 0;
00095 
00096                                 if (receiver.MustWaitToReceive())
00097                                 {
00098                                         if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00099                                                 break;
00100 
00101                                         receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00102                                         m_waitingForResult = true;
00103                                 }
00104                                 else
00105                                 {
00106 ReceiveNoWait:
00107                                         m_waitingForResult = true;
00108                                         // call Receive repeatedly as long as data is immediately available,
00109                                         // because some receivers tend to return data in small pieces
00110 #if CRYPTOPP_TRACE_NETWORK
00111                                         OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00112 #endif
00113                                         while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00114                                         {
00115                                                 unsigned int recvResult = receiver.GetReceiveResult();
00116 #if CRYPTOPP_TRACE_NETWORK
00117                                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00118 #endif
00119                                                 m_dataEnd += recvResult;
00120                                                 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00121                                                 {
00122                                                         m_waitingForResult = false;
00123                                                         break;
00124                                                 }
00125                                         }
00126                                 }
00127                         }
00128                 }
00129                 else
00130                 {
00131                         m_putSize = STDMIN((unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount);
00132                         if (checkDelimiter)
00133                                 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00134 
00135 DoOutput:
00136                         unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00137                         if (result)
00138                         {
00139                                 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00140                                         goto DoOutput;
00141                                 else
00142                                 {
00143                                         m_outputBlocked = true;
00144                                         return result;
00145                                 }
00146                         }
00147                         m_outputBlocked = false;
00148 
00149                         byteCount += m_putSize;
00150                         m_dataBegin += m_putSize;
00151                         if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00152                                 break;
00153                         if (byteCount == maxSize)
00154                                 break;
00155                         // once time limit is reached, return even if there is more data waiting
00156                         // but make 0 a special case so caller can request a large amount of data to be
00157                         // pumped as long as it is immediately available
00158                         if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00159                                 break;
00160                 }
00161         }
00162 
00163         return 0;
00164 }
00165 
00166 // *************************************************************
00167 
00168 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00169         : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00170         , m_needSendResult(false), m_wasBlocked(false)
00171         , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 
00172         , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00173         , m_currentSpeed(0), m_maxObservedSpeed(0)
00174 {
00175 }
00176 
00177 float NetworkSink::ComputeCurrentSpeed()
00178 {
00179         if (m_speedTimer.ElapsedTime() > 1000)
00180         {
00181                 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00182                 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00183                 m_byteCountSinceLastTimerReset = 0;
00184                 m_speedTimer.StartTimer();
00185 //              OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
00186         }
00187         return m_currentSpeed;
00188 }
00189 
00190 unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking)
00191 {
00192         if (m_skipBytes)
00193         {
00194                 assert(length >= m_skipBytes);
00195                 inString += m_skipBytes;
00196                 length -= m_skipBytes;
00197         }
00198         m_buffer.LazyPut(inString, length);
00199 
00200         if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00201                 TimedFlush(0, 0);
00202 
00203         unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize;
00204         if (blocking)
00205                 TimedFlush(INFINITE_TIME, targetSize);
00206 
00207         if (m_buffer.CurrentSize() > targetSize)
00208         {
00209                 assert(!blocking);
00210                 unsigned int blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length);
00211                 m_buffer.UndoLazyPut(blockedBytes);
00212                 m_buffer.FinalizeLazyPut();
00213                 m_wasBlocked = true;
00214                 m_skipBytes += length - blockedBytes;
00215                 return STDMAX(blockedBytes, 1U);
00216         }
00217 
00218         m_buffer.FinalizeLazyPut();
00219         m_wasBlocked = false;
00220         m_skipBytes = 0;
00221 
00222         if (messageEnd)
00223                 AccessSender().SendEof();
00224         return 0;
00225 }
00226 
00227 unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetSize)
00228 {
00229         NetworkSender &sender = AccessSender();
00230 
00231         bool forever = maxTime == INFINITE_TIME;
00232         Timer timer(Timer::MILLISECONDS, forever);
00233         unsigned int totalFlushSize = 0;
00234 
00235         while (true)
00236         {
00237                 if (m_buffer.CurrentSize() <= targetSize)
00238                         break;
00239                 
00240                 if (m_needSendResult)
00241                 {
00242                         if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00243                                 break;
00244 
00245                         unsigned int sendResult = sender.GetSendResult();
00246 #if CRYPTOPP_TRACE_NETWORK
00247                         OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00248 #endif
00249                         m_buffer.Skip(sendResult);
00250                         totalFlushSize += sendResult;
00251                         m_needSendResult = false;
00252 
00253                         if (!m_buffer.AnyRetrievable())
00254                                 break;
00255                 }
00256 
00257                 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00258                 if (sender.MustWaitToSend() && !sender.Wait(timeOut))
00259                         break;
00260 
00261                 unsigned int contiguousSize = 0;
00262                 const byte *block = m_buffer.Spy(contiguousSize);
00263 
00264 #if CRYPTOPP_TRACE_NETWORK
00265                 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00266 #endif
00267                 sender.Send(block, contiguousSize);
00268                 m_needSendResult = true;
00269 
00270                 if (maxTime > 0 && timeOut == 0)
00271                         break;  // once time limit is reached, return even if there is more data waiting
00272         }
00273 
00274         m_byteCountSinceLastTimerReset += totalFlushSize;
00275         ComputeCurrentSpeed();
00276         
00277         return totalFlushSize;
00278 }
00279 
00280 #endif  // #ifdef HIGHRES_TIMER_AVAILABLE
00281 
00282 NAMESPACE_END

Generated on Tue Oct 26 18:51:39 2004 for Crypto++ by  doxygen 1.3.9.1