00001
00002
00003
#include "pch.h"
00004
#include "network.h"
00005
00006 NAMESPACE_BEGIN(CryptoPP)
00007
00008 unsigned
int NonblockingSource::PumpMessages2(
unsigned int &messageCount,
bool blocking)
00009 {
00010
if (messageCount == 0)
00011
return 0;
00012
00013
unsigned long byteCount = ULONG_MAX;
00014 messageCount = 0;
00015 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00016
if (!m_messageEndSent && SourceExhausted())
00017 {
00018 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(),
true));
00019 m_messageEndSent =
true;
00020 messageCount = 1;
00021 }
00022
return 0;
00023 }
00024
00025
bool NonblockingSink::IsolatedFlush(
bool hardFlush,
bool blocking)
00026 {
00027
TimedFlush(blocking ? INFINITE_TIME : 0);
00028
return hardFlush && !!GetCurrentBufferSize();
00029 }
00030
00031
00032
00033
#ifdef HIGHRES_TIMER_AVAILABLE
00034
00035 NetworkSource::NetworkSource(
BufferedTransformation *attachment)
00036 :
NonblockingSource(attachment), m_buf(1024*4), m_bufSize(0), m_state(NORMAL)
00037 {
00038 }
00039
00040 unsigned int NetworkSource::GeneralPump2(
unsigned long &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter, byte delimiter)
00041 {
00042
NetworkReceiver &receiver = AccessReceiver();
00043
00044
unsigned long maxSize = byteCount;
00045 byteCount = 0;
00046
bool forever = maxTime ==
INFINITE_TIME;
00047
Timer timer(Timer::MILLISECONDS, forever);
00048
unsigned long timeout;
00049
BufferedTransformation *t = AttachedTransformation();
00050
00051
if (m_state == OUTPUT_BLOCKED)
00052
goto DoOutput;
00053
00054
while (
true)
00055 {
00056
if (m_state == WAITING_FOR_RESULT)
00057 {
00058
if (receiver.
MustWaitForResult())
00059 {
00060 timeout = SaturatingSubtract(maxTime, timer.
ElapsedTime());
00061
if (!receiver.
Wait(timeout))
00062
break;
00063 }
00064
00065
unsigned int recvResult = receiver.
GetReceiveResult();
00066
00067 m_bufSize += recvResult;
00068 m_state = NORMAL;
00069 }
00070
00071
if (m_bufSize == 0)
00072 {
00073
if (receiver.
EofReceived())
00074
break;
00075 }
00076
else
00077 {
00078 m_putSize = STDMIN((
unsigned long)m_bufSize, maxSize - byteCount);
00079
if (checkDelimiter)
00080 m_putSize = std::find(m_buf.
begin(), m_buf+m_putSize, delimiter) - m_buf;
00081
00082 DoOutput:
00083
unsigned int result = t->PutModifiable2(m_buf, m_putSize, 0, forever || blockingOutput);
00084
if (result)
00085 {
00086 timeout = SaturatingSubtract(maxTime, timer.
ElapsedTime());
00087
if (t->Wait(timeout))
00088
goto DoOutput;
00089
else
00090 {
00091 m_state = OUTPUT_BLOCKED;
00092
return result;
00093 }
00094 }
00095 m_state = NORMAL;
00096
00097 byteCount += m_putSize;
00098 m_bufSize -= m_putSize;
00099
if (m_bufSize > 0)
00100 {
00101 memmove(m_buf, m_buf+m_putSize, m_bufSize);
00102
if (checkDelimiter && m_buf[0] == delimiter)
00103
break;
00104 }
00105 }
00106
00107
if (byteCount == maxSize)
00108
break;
00109
00110
unsigned long elapsed = timer.
ElapsedTime();
00111
if (elapsed > maxTime)
00112
break;
00113
00114
if (receiver.
MustWaitToReceive())
00115 {
00116
if (!receiver.
Wait(maxTime - elapsed))
00117
break;
00118 }
00119
00120 receiver.
Receive(m_buf+m_bufSize, m_buf.
size()-m_bufSize);
00121 m_state = WAITING_FOR_RESULT;
00122 }
00123
00124
return 0;
00125 }
00126
00127
00128
00129 unsigned int NetworkSink::Put2(
const byte *inString,
unsigned int length,
int messageEnd,
bool blocking)
00130 {
00131
if (m_blockedBytes)
00132 {
00133 assert(length >= m_blockedBytes);
00134 inString += length - m_blockedBytes;
00135 length = m_blockedBytes;
00136 }
00137 m_buffer.
LazyPut(inString, length);
00138
00139
unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize;
00140
TimedFlush(blocking ?
INFINITE_TIME : 0, m_autoFlush ? 0 : targetSize);
00141
00142
if (m_buffer.
CurrentSize() > targetSize)
00143 {
00144 assert(!blocking);
00145 m_blockedBytes = STDMIN(m_buffer.
CurrentSize() - targetSize, (
unsigned long)length);
00146 m_buffer.
UndoLazyPut(m_blockedBytes);
00147 m_buffer.
FinalizeLazyPut();
00148
return STDMAX(m_blockedBytes, 1U);
00149 }
00150 m_blockedBytes = 0;
00151
00152
if (messageEnd)
00153 AccessSender().
SendEof();
00154
return 0;
00155 }
00156
00157 unsigned int NetworkSink::TimedFlush(
unsigned long maxTime,
unsigned int targetSize)
00158 {
00159
if (m_buffer.
IsEmpty())
00160
return 0;
00161
00162
NetworkSender &sender = AccessSender();
00163
00164
bool forever = maxTime ==
INFINITE_TIME;
00165
Timer timer(Timer::MILLISECONDS, forever);
00166
unsigned long timeout;
00167
unsigned int totalFlushSize = 0;
00168
00169
while (
true)
00170 {
00171
if (m_needSendResult)
00172 {
00173
if (sender.
MustWaitForResult())
00174 {
00175 timeout = SaturatingSubtract(maxTime, timer.
ElapsedTime());
00176
if (!sender.
Wait(timeout))
00177
break;
00178 }
00179
00180
unsigned int sendResult = sender.
GetSendResult();
00181 m_buffer.Skip(sendResult);
00182 totalFlushSize += sendResult;
00183 m_needSendResult =
false;
00184
00185
if (m_buffer.
CurrentSize() <= targetSize)
00186
break;
00187 }
00188
00189
unsigned long elapsed = timer.
ElapsedTime();
00190
if (elapsed > maxTime)
00191
break;
00192
00193
if (sender.
MustWaitToSend())
00194 {
00195
if (!sender.
Wait(maxTime - elapsed))
00196
break;
00197 }
00198
00199
unsigned int contiguousSize = 0;
00200
const byte *block = m_buffer.
Spy(contiguousSize);
00201
00202 sender.
Send(block, contiguousSize);
00203 m_needSendResult =
true;
00204 }
00205
00206
return totalFlushSize;
00207 }
00208
00209
#endif // #ifdef HIGHRES_TIMER_AVAILABLE
00210
00211 NAMESPACE_END