Vegastrike 0.5.1 rc1  1.0
Original sources for Vegastrike Evolved
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
vsnet_socketset.cpp
Go to the documentation of this file.
1 #include <config.h>
2 #include <sstream>
3 
4 #include "vsnet_headers.h"
5 #include "vsnet_socket.h"
6 #include "vsnet_socketbase.h"
7 #include "vsnet_socketset.h"
8 #include "vsnet_pipe.h"
9 #include "vsnet_dloadmgr.h"
10 
11 
12 SocketSet::SocketSet( bool blockmainthread ) :
13  VSThread( false )
14  , _blockmain( blockmainthread )
15  , _blockmain_pending( 0 )
16 {
17 #ifndef USE_NO_THREAD
18  _thread_end = false;
19 #endif
20  FD_ZERO( &_blockmain_set );
21 }
22 
24 {
25 #ifndef USE_NO_THREAD
26  _thread_mx.lock();
27  _thread_end = true;
28  _blockmain = false; //signalling would be dangerous
29  _thread_wakeup.closewrite();
30  _thread_cond.wait( _thread_mx );
31  _thread_wakeup.closeread();
32  _thread_mx.unlock();
33 #endif
34 }
36 {
37  _autoset.clear();
38 }
39 bool SocketSet::addDownloadManager( boost::shared_ptr< VsnetDownload::Client::Manager >mgr )
40 {
41  if ( !_client_mgr.expired() ) return false;
42  _client_mgr = mgr;
43  return true;
44 }
45 
46 bool SocketSet::addDownloadManager( boost::shared_ptr< VsnetDownload::Server::Manager >mgr )
47 {
48  if ( !_server_mgr.expired() ) return false;
49  _server_mgr = mgr;
50  return true;
51 }
52 
54 {
55  _autoset.insert( s );
56  private_wakeup();
57 }
58 
60 {
61  if ( _autoset.size() )
62  _autoset.erase( s );
63  private_wakeup();
64 }
65 
66 #ifdef USE_NO_THREAD
67 int SocketSet::wait( timeval *tv )
68 {
69  //assert( _blockmain ); // can't call wait if we haven't ordered the feature (?)
70  if (_blockmain_pending == 0) {
71  int ret;
72  do {
73  if (tv == NULL) {
74  ret = private_select( NULL );
75  } else {
76  timeval tvCopy( *tv ); //select resets timeval.
77  ret = private_select( &tvCopy );
78  }
79  } while (ret == 1);
80  return ret;
81  } else {
82 #ifdef VSNET_DEBUG
83  std::ostringstream ostr;
84  for (int i = 0; i < _blockmain_pending; i++)
85  if ( FD_ISSET( i, &_blockmain_set ) ) ostr<<" "<<i;
86  COUT<<"something pending for sockets:"
87  <<ostr.str()
88  <<" ("<<_blockmain_pending<<")"<<endl;
89 #endif
90  struct timeval zerotv;
91  zerotv.tv_sec = 0;
92  zerotv.tv_usec = 0;
93  return private_select( &zerotv );
94  }
95 }
96 #else
97 #error You are using threaded network mode - do you really want this?
98 void SocketSet::wait( timeval *tv )
99 {
100  assert( tv == NULL ); //No timeval.
101  assert( _blockmain ); //can't call wait if we haven't ordered the feature
102  _blockmain_mx.lock();
103  if (_blockmain_pending == 0) {
104  _blockmain_cond.wait( _blockmain_mx );
105  }
106 #ifdef VSNET_DEBUG
107  else {
108  std::ostringstream ostr;
109  for (int i = 0; i < _blockmain_pending; i++)
110  if ( FD_ISSET( i, &_blockmain_set ) ) ostr<<" "<<i;
111  COUT<<"something pending for sockets:"
112  <<ostr.str()
113  <<" ("<<_blockmain_pending<<")"<<endl;
114  }
115 #endif
116  _blockmain_mx.unlock();
117 }
118 #endif
119 
121 {
122  if (_blockmain) {
123  _blockmain_mx.lock();
124  FD_SET( fd, &_blockmain_set );
125  if (fd >= _blockmain_pending) _blockmain_pending = fd+1;
126  _blockmain_mx.unlock();
127  }
128 }
129 
131 {
132  if (_blockmain) {
133  _blockmain_mx.lock();
134  FD_CLR( fd, &_blockmain_set );
135  if (fd == _blockmain_pending-1) {
136  while (_blockmain_pending > 0) {
137  if ( FD_ISSET( _blockmain_pending-1, &_blockmain_set ) )
138  break;
139  _blockmain_pending -= 1;
140  }
141  }
142  _blockmain_mx.unlock();
143  }
144 }
145 
146 void SocketSet::private_addset( int fd, fd_set &fds, int &maxfd )
147 {
148  FD_SET( fd, &fds );
149  if (fd >= maxfd) maxfd = fd+1;
150 }
151 
152 int SocketSet::private_select( timeval *timeout )
153 {
154 #ifdef VSNET_DEBUG
155  fd_set debug_copy_of_read_set_select;
156  fd_set debug_copy_of_write_set_select;
157  FD_ZERO( &debug_copy_of_read_set_select );
158  FD_ZERO( &debug_copy_of_write_set_select );
159 #endif
160 
161  fd_set read_set_select;
162  fd_set write_set_select;
163  int max_sock_select = 0;
164 
165  FD_ZERO( &read_set_select );
166  FD_ZERO( &write_set_select );
167  if ( !_client_mgr.expired() ) {
168  boost::shared_ptr< VsnetDownload::Client::Manager >mgr( _client_mgr.lock() );
169  if ( (bool) mgr )
170  mgr->lower_check_queues();
171  }
172  if ( !_server_mgr.expired() ) {
173  boost::shared_ptr< VsnetDownload::Server::Manager >mgr( _server_mgr.lock() );
174  if ( (bool) mgr )
175  mgr->lower_check_queues();
176  }
177  //private_test_dump_request_sets( timeout );
178  for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
179  VsnetSocketBase *b = (*it);
180  int fd = b->get_fd();
181  bool wrote_on_negative = false;
182  if ( fd < 0 && b->write_on_negative() ) {
183  b->lower_clean_sendbuf();
184  wrote_on_negative = true;
185  fd = b->get_fd();
186  }
187  if (fd >= 0) {
188  private_addset( fd, read_set_select, max_sock_select );
189  if (b->need_test_writable() && !wrote_on_negative) {
190  private_addset( b->get_write_fd(),
191  write_set_select,
192  max_sock_select );
193  }
194  //printf("Checking for read/writability...");
195  else {
196 //printf("Checking for readability...");
197  }
198  }
199  }
200 #ifndef USE_NO_THREAD
201  private_addset( _thread_wakeup.getread(),
202  read_set_select,
203  max_sock_select );
204 #endif
205 
206 #ifdef VSNET_DEBUG
207  for (int i = 0; i < max_sock_select; i++) {
208  if ( FD_ISSET( i, &read_set_select ) )
209  FD_SET( i, &debug_copy_of_read_set_select );
210  if ( FD_ISSET( i, &write_set_select ) )
211  FD_SET( i, &debug_copy_of_write_set_select );
212  }
213 #endif
214 
215  int ret = ::select( max_sock_select,
216  &read_set_select,
217  &write_set_select,
218  0, timeout );
219  if (ret == -1) {
220 #if defined (_WIN32) && !defined (__CYGWIN__)
221  if (WSAGetLastError() != WSAEINVAL)
222  COUT<<"WIN32 error : "<<WSAGetLastError()<<endl;
223 #else
224  perror( "Select failed : " );
225 #endif
226  } else if (ret == 0) {
227 //printf("Nothing to do.\n");
228  } else {
229  ret++;
230 #if defined (VSNET_DEBUG)
231  private_test_dump_active_sets( max_sock_select,
232  debug_copy_of_read_set_select,
233  read_set_select,
234  debug_copy_of_write_set_select,
235  write_set_select );
236 #endif
237  for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
238  VsnetSocketBase *b = (*it);
239  int fd = b->get_fd();
240  if (fd >= 0) {
241  if ( FD_ISSET( fd, &read_set_select ) ) {
242 //printf("Reading.\n");
243  if ( !b->lower_selected() )
244  ret--; //No complete packet received yet.
245  }
246  if ( b->isReadyToSend( &write_set_select ) ) {
247 //printf("Writing.\n");
248  ret--;
249  b->lower_sendbuf();
250  }
251  } else {
252  if ( b->isReadyToSend( &write_set_select ) ) {
253 //printf("reconnecting?\n");
254 #ifdef VSNET_DEBUG
255  COUT<<"saw activity on "<<b->get_write_fd()
256  <<" but main file descriptor is "<<b->get_fd()<<endl;
257 #endif
258  b->lower_clean_sendbuf();
259  }
260  }
261  }
262 #ifndef USE_NO_THREAD
263  if ( FD_ISSET( _thread_wakeup.getread(), &read_set_select ) ) {
264  char c;
265  _thread_wakeup.read( &c, 1 );
266  }
267 #endif
268  }
269  if (_blockmain) {
270  //whatever the reason for leaving select, if we have been asked
271  //to signal the main thread on wakeup, we do it
272  _blockmain_mx.lock();
273  _blockmain_cond.signal();
274  _blockmain_mx.unlock();
275  }
276  return ret;
277 }
278 
280 {
281  private_wakeup();
282 }
283 
284 #ifdef USE_NO_THREAD
285 void SocketSet::private_wakeup()
287 {}
288 
290 void SocketSet::waste_time( long sec, long usec )
291 {
292  struct timeval tv;
293  tv.tv_sec = sec;
294  tv.tv_usec = usec;
295  private_select( &tv );
296 }
297 #else
298 void SocketSet::private_wakeup()
300 {
301  char c = 'w';
302  _thread_wakeup.write( &c, 1 );
303 }
304 
306 void SocketSet::waste_time( long sec, long usec )
307 {
308  struct timeval tv;
309  tv.tv_sec = sec;
310  tv.tv_usec = usec;
311  select( 0, NULL, NULL, NULL, &tv );
312 }
313 #endif
314 
316 {
317 #ifndef USE_NO_THREAD
318  while (!_thread_end)
319  private_select( NULL );
320  _thread_mx.lock();
321  _thread_cond.signal();
322  _thread_mx.unlock();
323 #endif
324 }
325 
326 #if defined (VSNET_DEBUG) || defined (__APPLE__)
327 void SocketSet::private_test_dump_active_sets( int maxfd,
328  fd_set &read_before,
329  fd_set &read_after,
330  fd_set &write_before,
331  fd_set &write_after )
332 {
333 #if 0
334  //Causes compile errors and I'm too lazy to figure it out at the moment.
335  std::ostringstream str_r;
336  std::ostringstream str_w;
337  str_r<<" *** read set: ";
338  str_w<<" *** write set: ";
339  for (int i = 0; i < maxfd; i++) {
340  if ( FD_ISSET( i, &read_before ) ) {
341  if ( FD_ISSET( i, &read_after ) )
342  str_r<<i<<"(!) ";
343  else
344  str_r<<i<<" ";
345  } else if ( FD_ISSET( i, &read_after ) ) {
346  str_r<<"(?>>"<<i<<"<<?)";
347  }
348  if ( FD_ISSET( i, &write_before ) ) {
349  if ( FD_ISSET( i, &write_after ) )
350  str_w<<i<<"(!) ";
351  else
352  str_w<<i<<" ";
353  } else if ( FD_ISSET( i, &write_after ) ) {
354  str_w<<"(?>>"<<i<<"<<?)";
355  }
356  }
357  str_r<<ends;
358  str_w<<ends;
359 
360  std::ostringstream ostr_r;
361  std::ostringstream ostr_w;
362  ostr_r<<" *** read fds tested: ";
363  ostr_w<<" *** write fds tested: ";
364  for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
365  VsnetSocketBase *b = (*it);
366  int fd = b->get_fd();
367  if (fd >= 0)
368  if ( FD_ISSET( fd, &read_after ) )
369  ostr_r<<b->get_socktype()<<" ";
370  fd = b->get_write_fd();
371  if (fd >= 0)
372  if ( FD_ISSET( fd, &write_after ) )
373  ostr_w<<"+"<<b->get_socktype()<<" ";
374  }
375 #ifndef USE_NO_THREAD
376  if ( FD_ISSET( _thread_wakeup.getread(), &read_after ) )
377  ostr_r<<_thread_wakeup.getread()<<"(pipe)";
378 #endif
379 
380  ostr_r<<ends;
381  ostr_w<<ends;
382  COUT<<"select saw activity:"<<endl
383  <<str_r.str()<<endl
384  <<str_w.str()<<endl
385  <<ostr_r.str()<<endl
386  <<ostr_w.str()<<endl;
387 #endif
388 }
389 #endif /* VSNET_DEBUG */
390 
391 #if defined (VSNET_DEBUG)
392 void SocketSet::private_test_dump_request_sets( timeval *timeout )
393 {
394  std::ostringstream ostr;
395  ostr<<"calling select with fds=";
396  for (Set::iterator it = _autoset.begin(); it != _autoset.end(); it++) {
397  VsnetSocketBase *b = (*it);
398  int fd = b->get_fd();
399  if (fd >= 0) {
400  ostr<<fd<<" ";
401  if ( b->need_test_writable() )
402  ostr<<"+"<<b->get_write_fd()<<" ";
403  }
404  }
405 #ifndef USE_NO_THREAD
406  ostr<<_thread_wakeup.getread()<<"(w)";
407 #endif
408  if (timeout)
409  ostr<<" t="<<timeout->tv_sec<<":"<<timeout->tv_usec;
410  else
411  ostr<<" t=NULL (blocking)";
412  ostr<<ends;
413  if (!timeout || timeout->tv_sec >= 1) {
414  COUT<<ostr.str()<<endl;
415  } else {
416  static long waitabit = 0;
417  waitabit += 1;
418  if (waitabit > 100) {
419  COUT<<ostr.str()<<endl;
420  waitabit = 0;
421  }
422  }
423 }
424 #endif /* VSNET_DEBUG */
425 
426 std::ostream&operator<<( std::ostream &ostr, const timeval &tv )
427 {
428  ostr<<tv.tv_sec<<":"<<tv.tv_usec;
429  return ostr;
430 }
431