00001
00002
00003
#include "pch.h"
00004
#include "wait.h"
00005
#include "misc.h"
00006
00007
#ifdef SOCKETS_AVAILABLE
00008
00009
#ifdef USE_BERKELEY_STYLE_SOCKETS
00010
#include <errno.h>
00011
#include <sys/types.h>
00012
#include <sys/time.h>
00013
#include <unistd.h>
00014
#endif
00015
00016 NAMESPACE_BEGIN(CryptoPP)
00017
00018
WaitObjectContainer::
WaitObjectContainer()
00019 {
00020 Clear();
00021 }
00022
00023
void WaitObjectContainer::Clear()
00024 {
00025
#ifdef USE_WINDOWS_STYLE_SOCKETS
00026
m_handles.clear();
00027
#else
00028
m_maxFd = 0;
00029 FD_ZERO(&m_readfds);
00030 FD_ZERO(&m_writefds);
00031
#endif
00032
m_noWait =
false;
00033 }
00034
00035
#ifdef USE_WINDOWS_STYLE_SOCKETS
00036
00037
struct WaitingThreadData
00038 {
00039
bool waitingToWait, terminate;
00040 HANDLE startWaiting, stopWaiting;
00041
const HANDLE *waitHandles;
00042
unsigned int count;
00043 HANDLE threadHandle;
00044 DWORD threadId;
00045 DWORD* error;
00046 };
00047
00048 WaitObjectContainer::~WaitObjectContainer()
00049 {
00050
try
00051 {
00052
if (!m_threads.empty())
00053 {
00054 HANDLE threadHandles[MAXIMUM_WAIT_OBJECTS];
00055
unsigned int i;
00056
for (i=0; i<m_threads.size(); i++)
00057 {
00058 WaitingThreadData &thread = *m_threads[i];
00059
while (!thread.waitingToWait)
00060 Sleep(0);
00061 thread.terminate =
true;
00062 threadHandles[i] = thread.threadHandle;
00063 }
00064 PulseEvent(m_startWaiting);
00065 ::WaitForMultipleObjects(m_threads.size(), threadHandles, TRUE, INFINITE);
00066
for (i=0; i<m_threads.size(); i++)
00067 CloseHandle(threadHandles[i]);
00068 CloseHandle(m_startWaiting);
00069 CloseHandle(m_stopWaiting);
00070 }
00071 }
00072
catch (...)
00073 {
00074 }
00075 }
00076
00077
00078
void WaitObjectContainer::AddHandle(HANDLE handle)
00079 {
00080 m_handles.push_back(handle);
00081 }
00082
00083 DWORD WINAPI WaitingThread(LPVOID lParam)
00084 {
00085 std::auto_ptr<WaitingThreadData> pThread((WaitingThreadData *)lParam);
00086 WaitingThreadData &thread = *pThread;
00087 std::vector<HANDLE> handles;
00088
00089
while (
true)
00090 {
00091 thread.waitingToWait =
true;
00092 ::WaitForSingleObject(thread.startWaiting, INFINITE);
00093 thread.waitingToWait =
false;
00094
00095
if (thread.terminate)
00096
break;
00097
if (!thread.count)
00098
continue;
00099
00100 handles.resize(thread.count + 1);
00101 handles[0] = thread.stopWaiting;
00102 std::copy(thread.waitHandles, thread.waitHandles+thread.count, handles.begin()+1);
00103
00104 DWORD result = ::WaitForMultipleObjects(handles.size(), &handles[0], FALSE, INFINITE);
00105
00106
if (result == WAIT_OBJECT_0)
00107
continue;
00108 SetEvent(thread.stopWaiting);
00109
if (!(result > WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size()))
00110 {
00111 assert(!
"error in WaitingThread");
00112 *thread.error = ::GetLastError();
00113 }
00114 }
00115
00116
return S_OK;
00117 }
00118
00119
void WaitObjectContainer::CreateThreads(
unsigned int count)
00120 {
00121
unsigned int currentCount = m_threads.size();
00122
if (currentCount == 0)
00123 {
00124 m_startWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
00125 m_stopWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
00126 }
00127
00128
if (currentCount < count)
00129 {
00130 m_threads.resize(count);
00131
for (
unsigned int i=currentCount; i<count; i++)
00132 {
00133 m_threads[i] =
new WaitingThreadData;
00134 WaitingThreadData &thread = *m_threads[i];
00135 thread.terminate =
false;
00136 thread.startWaiting = m_startWaiting;
00137 thread.stopWaiting = m_stopWaiting;
00138 thread.waitingToWait =
false;
00139 thread.threadHandle = CreateThread(NULL, 0, &WaitingThread, &thread, 0, &thread.threadId);
00140 }
00141 }
00142 }
00143
00144
bool WaitObjectContainer::Wait(
unsigned long milliseconds)
00145 {
00146
if (m_noWait || m_handles.empty())
00147
return true;
00148
00149
if (m_handles.size() > MAXIMUM_WAIT_OBJECTS)
00150 {
00151
00152
static const unsigned int WAIT_OBJECTS_PER_THREAD = MAXIMUM_WAIT_OBJECTS-1;
00153
unsigned int nThreads = (m_handles.size() + WAIT_OBJECTS_PER_THREAD - 1) / WAIT_OBJECTS_PER_THREAD;
00154
if (nThreads > MAXIMUM_WAIT_OBJECTS)
00155
throw Err(
"WaitObjectContainer: number of wait objects exceeds limit");
00156 CreateThreads(nThreads);
00157 DWORD error = S_OK;
00158
00159
for (
unsigned int i=0; i<m_threads.size(); i++)
00160 {
00161 WaitingThreadData &thread = *m_threads[i];
00162
while (!thread.waitingToWait)
00163 Sleep(0);
00164
if (i<nThreads)
00165 {
00166 thread.waitHandles = &m_handles[i*WAIT_OBJECTS_PER_THREAD];
00167 thread.count = STDMIN(WAIT_OBJECTS_PER_THREAD, m_handles.size() - i*WAIT_OBJECTS_PER_THREAD);
00168 thread.error = &error;
00169 }
00170
else
00171 thread.count = 0;
00172 }
00173
00174 ResetEvent(m_stopWaiting);
00175 PulseEvent(m_startWaiting);
00176
00177 DWORD result = ::WaitForSingleObject(m_stopWaiting, milliseconds);
00178
if (result == WAIT_OBJECT_0)
00179 {
00180
if (error == S_OK)
00181
return true;
00182
else
00183
throw Err(
"WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(error));
00184 }
00185 SetEvent(m_stopWaiting);
00186
if (result == WAIT_TIMEOUT)
00187
return false;
00188
else
00189
throw Err(
"WaitObjectContainer: WaitForSingleObject failed with error " + IntToString(::GetLastError()));
00190 }
00191
else
00192 {
00193 DWORD result = ::WaitForMultipleObjects(m_handles.size(), &m_handles[0], FALSE, milliseconds);
00194
if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + m_handles.size())
00195
return true;
00196
else if (result == WAIT_TIMEOUT)
00197
return false;
00198
else
00199
throw Err(
"WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(::GetLastError()));
00200 }
00201 }
00202
00203
#else
00204
00205
void WaitObjectContainer::AddReadFd(
int fd)
00206 {
00207 FD_SET(fd, &m_readfds);
00208 m_maxFd = STDMAX(m_maxFd, fd);
00209 }
00210
00211
void WaitObjectContainer::AddWriteFd(
int fd)
00212 {
00213 FD_SET(fd, &m_writefds);
00214 m_maxFd = STDMAX(m_maxFd, fd);
00215 }
00216
00217
bool WaitObjectContainer::Wait(
unsigned long milliseconds)
00218 {
00219
if (m_noWait || m_maxFd == 0)
00220
return true;
00221
00222 timeval tv, *timeout;
00223
00224
if (milliseconds ==
INFINITE_TIME)
00225 timeout = NULL;
00226
else
00227 {
00228 tv.tv_sec = milliseconds / 1000;
00229 tv.tv_usec = (milliseconds % 1000) * 1000;
00230 timeout = &tv;
00231 }
00232
00233
int result = select(m_maxFd+1, &m_readfds, &m_writefds, NULL, timeout);
00234
00235
if (result > 0)
00236
return true;
00237
else if (result == 0)
00238
return false;
00239
else
00240
throw Err(
"WaitObjectContainer: select failed with error " + errno);
00241 }
00242
00243
#endif
00244
00245
00246
00247 bool Waitable::Wait(
unsigned long milliseconds)
00248 {
00249
WaitObjectContainer container;
00250
GetWaitObjects(container);
00251
return container.
Wait(milliseconds);
00252 }
00253
00254 NAMESPACE_END
00255
00256
#endif