14 , _blockmain( blockmainthread )
15 , _blockmain_pending( 0 )
20 FD_ZERO( &_blockmain_set );
29 _thread_wakeup.closewrite();
30 _thread_cond.
wait( _thread_mx );
31 _thread_wakeup.closeread();
41 if ( !_client_mgr.expired() )
return false;
48 if ( !_server_mgr.expired() )
return false;
61 if ( _autoset.size() )
70 if (_blockmain_pending == 0) {
74 ret = private_select( NULL );
76 timeval tvCopy( *tv );
77 ret = private_select( &tvCopy );
83 std::ostringstream ostr;
84 for (
int i = 0;
i < _blockmain_pending;
i++)
85 if ( FD_ISSET(
i, &_blockmain_set ) ) ostr<<
" "<<
i;
86 COUT<<
"something pending for sockets:"
88 <<
" ("<<_blockmain_pending<<
")"<<endl;
90 struct timeval zerotv;
93 return private_select( &zerotv );
97 #error You are using threaded network mode - do you really want this?
100 assert( tv == NULL );
101 assert( _blockmain );
102 _blockmain_mx.
lock();
103 if (_blockmain_pending == 0) {
104 _blockmain_cond.
wait( _blockmain_mx );
108 std::ostringstream ostr;
109 for (
int i = 0;
i < _blockmain_pending;
i++)
110 if ( FD_ISSET( i, &_blockmain_set ) ) ostr<<
" "<<
i;
111 COUT<<
"something pending for sockets:"
113 <<
" ("<<_blockmain_pending<<
")"<<endl;
123 _blockmain_mx.
lock();
124 FD_SET( fd, &_blockmain_set );
125 if (fd >= _blockmain_pending) _blockmain_pending = fd+1;
133 _blockmain_mx.
lock();
134 FD_CLR( fd, &_blockmain_set );
135 if (fd == _blockmain_pending-1) {
136 while (_blockmain_pending > 0) {
137 if ( FD_ISSET( _blockmain_pending-1, &_blockmain_set ) )
139 _blockmain_pending -= 1;
146 void SocketSet::private_addset(
int fd, fd_set &fds,
int &maxfd )
149 if (fd >= maxfd) maxfd = fd+1;
152 int SocketSet::private_select( timeval *timeout )
155 fd_set debug_copy_of_read_set_select;
156 fd_set debug_copy_of_write_set_select;
157 FD_ZERO( &debug_copy_of_read_set_select );
158 FD_ZERO( &debug_copy_of_write_set_select );
161 fd_set read_set_select;
162 fd_set write_set_select;
163 int max_sock_select = 0;
165 FD_ZERO( &read_set_select );
166 FD_ZERO( &write_set_select );
167 if ( !_client_mgr.expired() ) {
168 boost::shared_ptr< VsnetDownload::Client::Manager >mgr( _client_mgr.lock() );
170 mgr->lower_check_queues();
172 if ( !_server_mgr.expired() ) {
173 boost::shared_ptr< VsnetDownload::Server::Manager >mgr( _server_mgr.lock() );
175 mgr->lower_check_queues();
178 for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
181 bool wrote_on_negative =
false;
182 if ( fd < 0 && b->write_on_negative() ) {
184 wrote_on_negative =
true;
188 private_addset( fd, read_set_select, max_sock_select );
200 #ifndef USE_NO_THREAD
201 private_addset( _thread_wakeup.getread(),
207 for (
int i = 0; i < max_sock_select; i++) {
208 if ( FD_ISSET( i, &read_set_select ) )
209 FD_SET( i, &debug_copy_of_read_set_select );
210 if ( FD_ISSET( i, &write_set_select ) )
211 FD_SET( i, &debug_copy_of_write_set_select );
215 int ret = ::select( max_sock_select,
220 #if defined (_WIN32) && !defined (__CYGWIN__)
221 if (WSAGetLastError() != WSAEINVAL)
222 COUT<<
"WIN32 error : "<<WSAGetLastError()<<endl;
224 perror(
"Select failed : " );
226 }
else if (ret == 0) {
230 #if defined (VSNET_DEBUG)
231 private_test_dump_active_sets( max_sock_select,
232 debug_copy_of_read_set_select,
234 debug_copy_of_write_set_select,
237 for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
241 if ( FD_ISSET( fd, &read_set_select ) ) {
256 <<
" but main file descriptor is "<<b->
get_fd()<<endl;
262 #ifndef USE_NO_THREAD
263 if ( FD_ISSET( _thread_wakeup.getread(), &read_set_select ) ) {
265 _thread_wakeup.read( &c, 1 );
272 _blockmain_mx.
lock();
285 void SocketSet::private_wakeup()
295 private_select( &tv );
298 void SocketSet::private_wakeup()
302 _thread_wakeup.write( &c, 1 );
311 select( 0, NULL, NULL, NULL, &tv );
317 #ifndef USE_NO_THREAD
319 private_select( NULL );
326 #if defined (VSNET_DEBUG) || defined (__APPLE__)
327 void SocketSet::private_test_dump_active_sets(
int maxfd,
330 fd_set &write_before,
331 fd_set &write_after )
335 std::ostringstream str_r;
336 std::ostringstream str_w;
337 str_r<<
" *** read set: ";
338 str_w<<
" *** write set: ";
339 for (
int i = 0; i < maxfd; i++) {
340 if ( FD_ISSET( i, &read_before ) ) {
341 if ( FD_ISSET( i, &read_after ) )
345 }
else if ( FD_ISSET( i, &read_after ) ) {
346 str_r<<
"(?>>"<<i<<
"<<?)";
348 if ( FD_ISSET( i, &write_before ) ) {
349 if ( FD_ISSET( i, &write_after ) )
353 }
else if ( FD_ISSET( i, &write_after ) ) {
354 str_w<<
"(?>>"<<i<<
"<<?)";
360 std::ostringstream ostr_r;
361 std::ostringstream ostr_w;
362 ostr_r<<
" *** read fds tested: ";
363 ostr_w<<
" *** write fds tested: ";
364 for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
368 if ( FD_ISSET( fd, &read_after ) )
372 if ( FD_ISSET( fd, &write_after ) )
375 #ifndef USE_NO_THREAD
376 if ( FD_ISSET( _thread_wakeup.getread(), &read_after ) )
377 ostr_r<<_thread_wakeup.getread()<<
"(pipe)";
382 COUT<<
"select saw activity:"<<endl
386 <<ostr_w.str()<<endl;
391 #if defined (VSNET_DEBUG)
392 void SocketSet::private_test_dump_request_sets( timeval *timeout )
394 std::ostringstream ostr;
395 ostr<<
"calling select with fds=";
396 for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
405 #ifndef USE_NO_THREAD
406 ostr<<_thread_wakeup.getread()<<
"(w)";
409 ostr<<
" t="<<timeout->tv_sec<<
":"<<timeout->tv_usec;
411 ostr<<
" t=NULL (blocking)";
413 if (!timeout || timeout->tv_sec >= 1) {
414 COUT<<ostr.str()<<endl;
416 static long waitabit = 0;
418 if (waitabit > 100) {
419 COUT<<ostr.str()<<endl;
426 std::ostream&
operator<<( std::ostream &ostr,
const timeval &tv )
428 ostr<<tv.tv_sec<<
":"<<tv.tv_usec;