Vegastrike 0.5.1 rc1  1.0
Original sources for Vegastrike Evolved
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
vsnet_sockettcp.cpp
Go to the documentation of this file.
1 #include <config.h>
2 
3 #include "networking/const.h"
4 #include "vsnet_headers.h"
5 
6 #include <list>
7 
8 #include "vsnet_sockettcp.h"
9 #include "vsnet_err.h"
10 #include "vsnet_oss.h"
11 #include "vsnet_debug.h"
12 #include "packet.h"
13 
14 using std::cout;
15 using std::cerr;
16 using std::endl;
17 using std::ostream;
18 
19 /***********************************************************************
20 * VsnetTCPSocket::Blob - declaration
21 ***********************************************************************/
22 
24 {
25  char *buf;
26  size_t present_len;
27  size_t expected_len;
28 
29  Blob() : buf( 0 )
30  , present_len( 0 )
31  , expected_len( 0 ) {}
32 
33  Blob( size_t len ) : present_len( 0 )
34  , expected_len( len )
35  {
36  buf = new char[len];
37  }
38 
40  {
41  delete[] buf;
42  }
43 
44  inline size_t missing() const
45  {
47  }
48 
49  inline char * base()
50  {
51  return &buf[present_len];
52  }
53 
54 private:
55  Blob( const Blob &orig ); //forbidden
56  Blob& operator=( const Blob &orig ); //forbidden
57 };
58 
59 /***********************************************************************
60 * VsnetTCPSocket - definition
61 ***********************************************************************/
62 
63 VsnetTCPSocket::VsnetTCPSocket( int sock, const AddressIP &remote_ip, SocketSet &sets ) :
64  VsnetSocket( sock, remote_ip, "VsnetTCPSocket", sets )
65  , _incomplete_packet( 0 )
66  , _incomplete_header( 0 )
67  , _connection_closed( false )
68  , _sq_off( 0 )
69  , _mtu_size_estimation( 1024 )
70 {
71  COUT<<"enter "<<__PRETTY_FUNCTION__<<endl;
72  _sq_fd = sock;
73 }
74 
76 {
77  COUT<<"enter "<<__PRETTY_FUNCTION__<<endl;
79  delete _incomplete_packet;
80  if (valid() && !_connection_closed && get_fd() >= 0)
81  close_fd();
82 }
83 
84 //int VsnetTCPSocket::sendbuf( PacketMem& packet, const AddressIP*, int pcktflags )
85 //{
86 //int idx = 1;
87 //if( pcktflags & LOPRI ) idx = 0;
88 //if( pcktflags & HIPRI ) idx = 2;
89 //
91 //* Add a timestamp here -- drop packets when they get too old.
92 //*/
93 //
94 //_sq_mx.lock( );
95 //bool e = _sq.empty();
96 //
97 //_sq.push( idx, packet );
98 //_sq_mx.unlock( );
99 //
100 //if( e && _set ) _set->wakeup( );
101 //return packet.len();
102 //}
103 
104 int VsnetTCPSocket::sendbuf( Packet *packet, const AddressIP*, int pcktflags )
105 {
106  int idx = 1;
107  if (pcktflags&LOPRI) idx = 0;
108  if (pcktflags&HIPRI) idx = 2;
109  /* Add a timestamp here -- drop packets when they get too old.
110  */
111 
112  PacketPtr enq = PacketPtr( new Packet( *packet ) );
113 
114  _sq_mx.lock();
115  bool e = _sq.empty();
116 
117  _sq.push( idx, enq );
118  _sq_mx.unlock();
119  if (e && _set) _set->wakeup();
120  return packet->getSendBufferLength();
121 }
122 
124 {
125 #ifdef USE_NO_THREAD
126  if (_connection_closed) return;
127  if (_set) _set->waste_time( 0, 0 ); //waste zero time, but check sockets
128 #endif
129 }
130 
132 {
133  _sq_mx.lock();
134  bool e = ( _sq.empty() && _sq_current.empty() );
135 #ifdef VSNET_DEBUG
136  if (!e)
137  COUT<<"_sq "
138  <<(_sq.empty() ? "empty" : "not empty")
139  <<", _sq_current "
140  <<(_sq_current.empty() ? "empty" : "not empty")<<endl;
141 #endif
142  _sq_mx.unlock();
143  return !e;
144 }
145 
147 {
148  return _sq_fd;
149 }
150 
152 {
153  return _mtu_size_estimation;
154 }
155 
157 {
158  _sq_mx.lock();
159  int retval = _sq.getLength( pri );
160  _sq_mx.unlock();
161  return retval;
162 }
163 
165 {
166  _sq_mx.lock();
167  if ( _sq_current.empty() ) {
168  if ( _sq.empty() ) {
169 #ifdef VSNET_DEBUG
170  COUT<<"both queues are empty"<<endl;
171 #endif
172  _sq_mx.unlock();
173  return 0;
174  } else {
175  PacketPtr m = _sq.pop();
176  int len = m->getSendBufferLength();
177  Header h( len );
178  _sq_current.push( PacketMem( &h, sizeof (Header) ) );
179  _sq_current.push( PacketMem( m->getSendBuffer(), len ) );
180  _sq_off = 0;
181  }
182  }
183  PacketMem packet( _sq_current.front() );
184 
185  int numsent;
186  unsigned int len = packet.len();
187  assert( len > _sq_off );
188  len -= _sq_off;
189  const char *buf = packet.getConstBuf();
190  numsent = ::send( _sq_fd, &buf[_sq_off], len, 0 );
191  if (numsent < 0) {
192  switch (errno)
193  {
194 #if defined (_WIN32) && !defined (__CYGWIN__)
195  case WSAECONNRESET: //other side closed socket
196  case WSAECONNABORTED: //other side closed socket
197  case WSAEWOULDBLOCK:
198 #else
199  case ECONNRESET: //other side closed socket
200  case ECONNABORTED: //other side closed socket
201  case EWOULDBLOCK:
202 #endif
203  case EINTR:
204  _sq_mx.unlock();
205  return 0;
206 
207  case EFAULT:
208  _sq_mx.unlock();
209  COUT<<"EFAULT"<<endl
210  <<" *** An invalid user space address was specified for a parameter."<<endl
211  <<" *** fd : "<<_sq_fd<<endl
212  <<" *** buf : "<<std::hex<<(long) buf<<std::dec<<endl
213  <<" *** sq offset : "<<_sq_off<<endl
214  <<" *** packet len: "<<packet.len()<<endl;
215  return 0;
216 
217  default:
218  fprintf( stderr, "Failed sending TCP data of %d len to socket %d\n", len, _sq_fd );
219  perror( "\tsending TCP data" );
220  _sq_mx.unlock();
221  return -1;
222  }
223  } else if (numsent == 0) {
224  //other side closed socket - what to do now?
225  _sq_mx.unlock();
226  return numsent;
227  } else if ( (unsigned int) numsent == len ) {
228  _sq_off = 0;
229  _sq_current.pop();
230  _sq_mx.unlock();
231 #ifdef VSNET_DEBUG
232  //
233  //DEBUG block - remove soon
234  //
235  {
236  COUT<<"sent buffer with len "<<packet.len()<<": "<<endl;
237 #if 0
238  packet.dump( cout, 0 );
239 #endif
240  }
241 #endif
242  return numsent;
243  } else {
244  assert( (unsigned int) numsent < len ); //should be impossible
245  _sq_off += numsent;
246  _sq_mx.unlock();
247  return numsent;
248  }
249 }
250 
252 {
253  _sq_mx.lock();
254  while ( !_sq.empty() ) {
255  _sq.pop();
256 #ifdef VSNET_DEBUG
257  COUT<<"forgetting a packet in _sq"<<endl;
258 #endif
259  }
260  while ( !_sq_current.empty() ) {
261  _sq_current.pop();
262 #ifdef VSNET_DEBUG
263  COUT<<"forgetting a segment in _sq_current"<<endl;
264 #endif
265  _sq_off = 0;
266  }
267  _sq_mx.unlock();
268 }
269 
271 {
272  _cpq_mx.lock();
273  if (_cpq.empty() == false) {
274  PacketPtr ptr = _cpq.front();
275  _cpq.pop();
276  _cpq_mx.unlock();
277 
278  AddressIP dummy;
279  if (ipadr) *ipadr = _remote_ip;
280  p->copyfrom( *ptr.get() );
281  return ptr->getDataLength()+ptr->getHeaderLength();
282  } else if (_connection_closed) {
283  if (_set) _set->rem_pending( _sq_fd );
284  _cpq_mx.unlock();
285  close_fd();
286  COUT<<__PRETTY_FUNCTION__<<" connection is closed"<<endl;
287  return 0;
288  } else {
289  if (_set) _set->rem_pending( _sq_fd );
290  _cpq_mx.unlock();
291  return -1;
292  }
293 }
294 
295 void VsnetTCPSocket::child_disconnect( const char *s )
296 {
297  if (get_fd() > 0) {
298  if (close_fd() < 0)
299  COUT<<s<<" :\tWarning: disconnect error: "<<strerror( errno )<<endl;
300  else
301  COUT<<s<<" :\tWarning: disconnected"<<endl;
302  } else {
303  COUT<<s<<" :\tWarning: disconnect null socket: "<<strerror( errno )<<endl;
304  }
305 }
306 
307 void VsnetTCPSocket::dump( std::ostream &ostr ) const
308 {
309  ostr<<"( s="<<get_fd()<<" TCP r="<<_remote_ip<<" )";
310 }
311 
312 ostream & operator<<( ostream &ostr, const VsnetSocket &s )
313 {
314  s.dump( ostr );
315  return ostr;
316 }
317 
319 {
320  /* True is the correct answer when the connection is closed:
321  * the app must call recvbuf once after this to receive 0
322  * and start close processing.
323  * We could return packets from the queue first, but that may
324  * trigger an answer packet from the application, and give
325  * us trouble because of the closed socket.
326  */
327  if (_connection_closed)
328  return true;
329  bool retval = false;
330 
331  _cpq_mx.lock();
332  if (_cpq.empty() == false)
333  retval = true;
334  else if (_set)
335  _set->rem_pending( _sq_fd );
336  _cpq_mx.unlock();
337 
339 
340  return retval;
341 }
342 
344 {
345  if (_connection_closed) {
346  COUT<<"Connection already closed"<<endl;
347  return true; /* Pretty sure that recv will return 0. */
348  }
349  bool endless = true;
350  bool successful = false;
351  if (get_nonblock() == false && datalen == -1) endless = false;
352  do {
353  if ( (_incomplete_header > 0)
354  || (_incomplete_header == 0 && _incomplete_packet == 0) ) {
355  assert( _incomplete_packet == 0 ); //we expect a len, can not have data yet
356  assert( (unsigned int) _incomplete_header < sizeof (Header) ); //len is coded in sizeof(Header) bytes
357  unsigned int len = sizeof (Header)-_incomplete_header;
358  char *b = (char*) &_header;
359  int ret = VsnetOSS::recv( get_fd(), &b[_incomplete_header], len, 0 );
360  if (ret <= 0) {
361  if ( ret == 0 || vsnetEConnAborted() || vsnetEConnReset() ) {
362  COUT<<"Connection closed in header"<<endl;
363  _connection_closed = true;
364  close_fd();
365  if (_set) _set->add_pending( _sq_fd );
366  return true;
367  } else if (vsnetEWouldBlock() == false) {
368  COUT<<"recv returned "<<ret
369  <<" errno "<<vsnetGetLastError()
370  <<" = "<<vsnetLastError()
371  <<endl;
372  perror( "receiving TCP packetlength bytes" );
373  _connection_closed = true;
374  close_fd();
375  return true;
376  } else {
377  //COUT << "Received EWOULDBLOCK." << (get_nonblock()?"true":"false") << endl;
378  }
379  return successful;
380  }
381  assert( (unsigned int)ret <= len );
382  if (ret > 0) _incomplete_header += ret;
383  if (datalen != -1) datalen -= ret;
384  if ( _incomplete_header == sizeof (Header) ) {
385  _incomplete_header = 0;
386  len = _header.h_len();
387  _incomplete_packet = new Blob( len );
388  }
389  }
390  if (_incomplete_packet != 0) {
391  int len = _incomplete_packet->missing();
392  if (datalen > 0)
393  len = datalen < len ? datalen : len;
394  int ret = VsnetOSS::recv( get_fd(), _incomplete_packet->base(), len, 0 );
395  assert( ret <= len );
396  if (ret <= 0) {
397  if (ret == 0) {
398  COUT<<"Connection closed in data"<<endl;
399  _connection_closed = true;
400  close_fd();
401  if (_set) _set->add_pending( _sq_fd );
402  return true;
403  } else if ( vsnetEConnAborted() ) {
404  perror( "receiving TCP packet" );
405  if (get_fd() == -1) {
406  perror( "receiving dead TCP packet" );
407  } else {
408  COUT<<"Connection closed in error"<<endl;
409  _connection_closed = true;
410  close_fd();
411  if (_set) _set->add_pending( _sq_fd );
412  }
413  return true;
414  } else if ( vsnetEWouldBlock() ) {
415  static int i = 0;
416  if (i++%128 == 0)
417  COUT<<"Received EWOULDBLOCK in data."<<(get_nonblock() ? "true" : "false")<<endl;
418  }
419  return successful;
420  } else {
421  if (datalen != -1) {
422  datalen -= ret;
423  if (datalen <= 0)
424  endless = false;
425  }
427  if (ret == len) {
430 #ifdef VSNET_DEBUG
431  //
432  //DEBUG block - remove soon
433  //
434  {
436  COUT<<"received buffer with len "<<b->present_len<<": "<<endl;
437  PacketMem m( b->buf, b->present_len, PacketMem::LeaveOwnership );
438 #if 0
439  m.dump( cout, 3 );
440 #endif
441  }
442 #endif
443 
445  _incomplete_packet = 0;
446  successful = true;
447  endless = false; //If we're done, let's stop while we're ahead.
448 
449  //either endless is false, or we exit with EWOULDBLOCK
450  }
451  }
452  }
453  } while (endless); //exit only for EWOULDBLOCK or closed socket
454  return successful;
455 }
456 
458 {
459  assert( b );
460  assert( b->present_len == b->expected_len );
461 
462  PacketMem mem( b->buf, b->present_len, PacketMem::TakeOwnership );
463  PacketPtr ptr = PacketPtr( new Packet( mem ) );
464  //if (ptr->getCommand()!=CMD_POSUPDATE && ptr->getCommand()!=CMD_SNAPSHOT)
465  //COUT << "Completely received a packet of type " << ptr->getCommand() << endl;
466  b->buf = NULL;
467  _cpq_mx.lock();
468  _cpq.push( ptr );
469  _cpq_mx.unlock();
470  if (_set) _set->add_pending( _sq_fd );
471  delete b;
472 }
473 
474 /***********************************************************************
475 * VsnetTCPSocket::SqQueues
476 ***********************************************************************/
477 
479 {
480  _ct = 0;
481  //_debug_array[0] = 0;
482  //_debug_array[1] = 0;
483  //_debug_array[2] = 0;
484  //_debug_array[3] = 0;
485 }
486 
488 {
489  return _ct == 0;
490 }
491 
493 {
494  _q[idx].push( m );
495  _ct++;
496  //if( idx >= 0 && idx < 3 ) _debug_array[idx]++;
497  //else _debug_array[3]++;
498 }
499 
501 {
502  std::map< int, SqQueue >::iterator it;
503  it = _q.find( idx );
504  if ( it == _q.end() ) return 0;
505  return it->second.size();
506 }
507 
509 {
510  /* We need the reverse iterators because in this code, higher numbers
511  * indicate higher priorities, and those are further "back" in the
512  * map.
513  */
514  assert( _ct > 0 );
515  std::map< int, SqQueue >::reverse_iterator it;
516  for (it = _q.rbegin(); it != _q.rend(); it++)
517  if (it->second.empty() == false) {
518  PacketPtr r = it->second.front();
519  it->second.pop();
520  _ct--;
521  return r;
522  }
523  assert( 0 );
524  return PacketPtr();
525 }
526