blocxx
Select.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Vintela, Inc. All rights reserved.
3* Copyright (C) 2006, Novell, Inc. All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without
6* modification, are permitted provided that the following conditions are met:
7*
8* * Redistributions of source code must retain the above copyright notice,
9* this list of conditions and the following disclaimer.
10* * Redistributions in binary form must reproduce the above copyright
11* notice, this list of conditions and the following disclaimer in the
12* documentation and/or other materials provided with the distribution.
13* * Neither the name of
14* Vintela, Inc.,
15* nor Novell, Inc.,
16* nor the names of its contributors or employees may be used to
17* endorse or promote products derived from this software without
18* specific prior written permission.
19*
20* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30* POSSIBILITY OF SUCH DAMAGE.
31*******************************************************************************/
32
33
39#include "blocxx/BLOCXX_config.h"
40#include "blocxx/Select.hpp"
41#include "blocxx/AutoPtr.hpp"
42#include "blocxx/Assertion.hpp"
43#include "blocxx/Thread.hpp" // for testCancel()
46
47#if defined(BLOCXX_WIN32)
48#include <cassert>
49#endif
50
51extern "C"
52{
53
54#ifndef BLOCXX_WIN32
55 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
56 #include <sys/epoll.h>
57 #endif
58 #if defined (BLOCXX_HAVE_SYS_POLL_H)
59 #include <sys/poll.h>
60 #endif
61 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
62 #include <sys/select.h>
63 #endif
64#endif
65
66#ifdef BLOCXX_HAVE_SYS_TIME_H
67 #include <sys/time.h>
68#endif
69
70#include <sys/types.h>
71
72#ifdef BLOCXX_HAVE_UNISTD_H
73 #include <unistd.h>
74#endif
75
76#include <errno.h>
77}
78
79namespace BLOCXX_NAMESPACE
80{
81
82namespace Select
83{
84
85namespace
86{
87 const float LOOP_TIMEOUT = 10.0;
88}
90// deprecated in 4.0.0
91int
93{
94 return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
95}
96
97#if defined(BLOCXX_WIN32)
99int
101{
102 int rc;
103 size_t hcount = static_cast<DWORD>(selarray.size());
105
106 size_t handleidx = 0;
107 for (size_t i = 0; i < selarray.size(); i++, handleidx++)
108 {
109 if(selarray[i].s.isSocket && selarray[i].s.networkevents)
110 {
111 ::WSAEventSelect(selarray[i].s.sockfd,
112 selarray[i].s.event, selarray[i].s.networkevents);
113 }
114
115 hdls[handleidx] = selarray[i].s.event;
116 }
117
118 TimeoutTimer timer(timeout);
119 timer.start();
120 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
121
123
124 switch (cc)
125 {
126 case WAIT_FAILED:
128 break;
129 case WAIT_TIMEOUT:
131 break;
132 default:
133 rc = cc - WAIT_OBJECT_0;
134
135 // If this is a socket, set it back to
136 // blocking mode
137 if(selarray[rc].s.isSocket)
138 {
139 if(selarray[rc].s.networkevents
140 && selarray[rc].s.doreset == false)
141 {
142 ::WSAEventSelect(selarray[rc].s.sockfd,
143 selarray[rc].s.event, selarray[rc].s.networkevents);
144 }
145 else
146 {
147 // Set socket back to blocking
148 ::WSAEventSelect(selarray[rc].s.sockfd,
149 selarray[rc].s.event, 0);
150 u_long ioctlarg = 0;
152 }
153 }
154 break;
155 }
156
157 if( rc < 0 )
158 return rc;
159
160 int availableCount = 0;
161 for (size_t i = 0; i < selarray.size(); i++)
162 {
163 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
164 {
165 if( selarray[i].waitForRead )
166 selarray[i].readAvailable = true;
167 if( selarray[i].waitForWrite )
168 selarray[i].writeAvailable = true;
170 }
171 else
172 {
173 selarray[i].readAvailable = false;
174 selarray[i].writeAvailable = false;
175 }
176 }
177 return availableCount;
178}
179
180
181#else
182
184// epoll version
185int
187{
188#ifdef BLOCXX_HAVE_SYS_EPOLL_H
189 int ecc = 0;
192 if(epfd.get() == -1)
193 {
194 if (errno == ENOSYS) // kernel doesn't support it
195 {
197 }
198 // Need to return something else?
200 }
201
204 for (size_t i = 0; i < selarray.size(); i++)
205 {
206 BLOCXX_ASSERT(selarray[i].s >= 0);
207 selarray[i].readAvailable = false;
208 selarray[i].writeAvailable = false;
209 selarray[i].wasError = false;
210 events[i].data = epoll_data_t(); // zero-init to make valgrind happy
211 events[i].data.u32 = i;
212 events[i].events = 0;
213 if(selarray[i].waitForRead)
214 {
215 events[i].events |= read_events;
216 }
217 if(selarray[i].waitForWrite)
218 {
219 events[i].events |= write_events;
220 }
221
222 if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
223 {
225 }
226 }
227
228 // here we spin checking for thread cancellation every so often.
229
231 timer.start();
232 int savedErrno;
233 do
234 {
236 const float maxWaitSec = LOOP_TIMEOUT;
237 ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
239 if (ecc < 0 && errno == EINTR)
240 {
241 ecc = 0;
242 errno = 0;
244 }
245 timer.loop();
246 } while ((ecc == 0) && !timer.expired());
247
248 if (ecc < 0)
249 {
252 }
253 if (ecc == 0)
254 {
256 }
257
258 for(int i = 0; i < ecc; i++)
259 {
260 SelectObject & so = selarray[events[i].data.u32];
261 so.readAvailable = so.waitForRead && (events[i].events & read_events);
262 so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
263 }
264
265 return ecc;
266#else
268#endif
269}
270
272// poll() version
273int
275{
276#if defined (BLOCXX_HAVE_SYS_POLL_H)
277 int rc = 0;
278
280
281 // here we spin checking for thread cancellation every so often.
283 timer.start();
284
285 int savedErrno;
286 do
287 {
288 for (size_t i = 0; i < selarray.size(); i++)
289 {
290 BLOCXX_ASSERT(selarray[i].s >= 0);
291 selarray[i].readAvailable = false;
292 selarray[i].writeAvailable = false;
293 selarray[i].wasError = false;
294 pfds[i].revents = 0;
295 pfds[i].fd = selarray[i].s;
296 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
297 if(selarray[i].waitForWrite)
298 pfds[i].events |= POLLOUT;
299 }
300
302 const float maxWaitSec = LOOP_TIMEOUT;
303 rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
305 if (rc < 0 && errno == EINTR)
306 {
307 rc = 0;
308 errno = 0;
310#ifdef BLOCXX_NETWARE
311 // When the NetWare server is shutting down, select will
312 // set errno to EINTR on return. If this thread does not
313 // yield control (cooperative multitasking) then we end
314 // up in a very tight loop and get a CPUHog server abbend.
316#endif
317 }
318
319 timer.loop();
320 } while ((rc == 0) && !timer.expired());
321
322 if (rc < 0)
323 {
326 }
327 if (rc == 0)
328 {
330 }
331 for (size_t i = 0; i < selarray.size(); i++)
332 {
333 if (pfds[i].revents & (POLLERR | POLLNVAL))
334 {
335 selarray[i].wasError = true;
336 }
337
338 if(selarray[i].waitForRead)
339 {
340 selarray[i].readAvailable = (pfds[i].revents &
341 (POLLIN | POLLPRI | POLLHUP));
342 }
343
344 if(selarray[i].waitForWrite)
345 {
346 selarray[i].writeAvailable = (pfds[i].revents &
347 (POLLOUT | POLLHUP));
348 }
349 }
350
351 return rc;
352#else
354#endif
355}
357// ::select() version
358int
360{
361#if defined (BLOCXX_HAVE_SYS_SELECT_H)
362 int rc = 0;
363 fd_set ifds;
364 fd_set ofds;
365
366 // here we spin checking for thread cancellation every so often.
368 timer.start();
369
370 int savedErrno;
371 do
372 {
373 int maxfd = 0;
374 FD_ZERO(&ifds);
375 FD_ZERO(&ofds);
376 for (size_t i = 0; i < selarray.size(); ++i)
377 {
378 int fd = selarray[i].s;
379 BLOCXX_ASSERT(fd >= 0);
380 if (maxfd < fd)
381 {
382 maxfd = fd;
383 }
385 {
386 errno = EINVAL;
388 }
389 if (selarray[i].waitForRead)
390 {
391 FD_SET(fd, &ifds);
392 }
393 if (selarray[i].waitForWrite)
394 {
395 FD_SET(fd, &ofds);
396 }
397 }
398
400 struct timeval tv;
401 const float maxWaitSec = LOOP_TIMEOUT;
402 rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
404 if (rc < 0 && errno == EINTR)
405 {
406 rc = 0;
407 errno = 0;
409#ifdef BLOCXX_NETWARE
410 // When the NetWare server is shutting down, select will
411 // set errno to EINTR on return. If this thread does not
412 // yield control (cooperative multitasking) then we end
413 // up in a very tight loop and get a CPUHog server abbend.
415#endif
416 }
417
418 timer.loop();
419 } while ((rc == 0) && !timer.expired());
420
421 if (rc < 0)
422 {
425 }
426 if (rc == 0)
427 {
429 }
430 int availableCount = 0;
431 int cval;
432 for (size_t i = 0; i < selarray.size(); i++)
433 {
434 selarray[i].wasError = false;
435 cval = 0;
436 if (FD_ISSET(selarray[i].s, &ifds))
437 {
438 selarray[i].readAvailable = true;
439 cval = 1;
440 }
441 else
442 {
443 selarray[i].readAvailable = false;
444 }
445
446 if (FD_ISSET(selarray[i].s, &ofds))
447 {
448 selarray[i].writeAvailable = true;
449 cval = 1;
450 }
451 else
452 {
453 selarray[i].writeAvailable = false;
454 }
455
457
458 }
459
460 return availableCount;
461#else
463#endif
464}
465
466int
468{
471 {
472 return rv;
473 }
474
477 {
478 return rv;
479 }
480
483 return rv;
484}
485
487#endif // #else BLOCXX_WIN32
488
489int
491{
492 return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
493}
494
496int
498{
500 soa.reserve(selarray.size());
501 for (size_t i = 0; i < selarray.size(); ++i)
502 {
504 curObj.waitForRead = true;
505 soa.push_back(curObj);
506 }
507 int rv = selectRW(soa, timeout);
508 if (rv < 0)
509 {
510 return rv;
511 }
512
513 // find the first selected object
514 for (size_t i = 0; i < soa.size(); ++i)
515 {
516 if (soa[i].readAvailable)
517 {
518 return i;
519 }
520 }
521 errno = 0;
522 return SELECT_ERROR;
523}
524
525} // end namespace Select
526
527} // end namespace BLOCXX_NAMESPACE
528
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(),...
Definition Assertion.hpp:57
Array<> wraps std::vector<> in COWReference<> adding ref counting and copy on write capability.
Definition Array.hpp:66
void reserve(size_type n)
Ensure the capacity is at least the size of a given value.
The AutoPtrVec class provides a simple class for smart pointers to a dynamically allocated array of o...
Definition AutoPtr.hpp:185
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
static void testCancel()
Test if this thread has been cancelled.
Definition Thread.cpp:432
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition Timeout.hpp:56
static Timeout relative(float seconds)
Definition Timeout.cpp:58
A TimeoutTimer is used by an algorithm to determine when a timeout has expired.
int selectRWSelect(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:359
Array< SelectObject > SelectObjectArray
Definition Select.hpp:113
int selectRWPoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:274
int select(const SelectTypeArray &selarray, UInt32 ms)
Select returns as soon as input is available on any of Select_t objects that are in given array.
Definition Select.cpp:490
const int SELECT_TIMEOUT
The value returned from select when the timeout value has expired.
Definition Select.hpp:59
const int SELECT_NOT_IMPLEMENTED
Used internally, but listed here to prevent conflicts.
Definition Select.hpp:67
int selectRW(SelectObjectArray &selarray, UInt32 ms)
Definition Select.cpp:92
int selectRWEpoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:186
const int SELECT_ERROR
The value returned from select when any error occurs other than timeout.
Definition Select.hpp:63
Taken from RFC 1321.
bool operator==(const Array< T > &x, const Array< T > &y)
bool readAvailable
Ouput parameter. Will be set to true to indicate that s has become available for reading.
Definition Select.hpp:107