Vega strike Python Modules doc  0.5.1
Documentation of the " Modules " folder of Vega strike
 All Data Structures Namespaces Files Functions Variables
asyncore.py
Go to the documentation of this file.
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
4 
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
7 #
8 # All Rights Reserved
9 #
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
18 #
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
27 
28 """Basic infrastructure for asynchronous socket service clients and servers.
29 
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
38 
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
48 
49 import exceptions
50 import select
51 import socket
52 import sys
53 
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56  ENOTCONN, ESHUTDOWN, EINTR, EISCONN
57 
58 try:
59  socket_map
60 except NameError:
61  socket_map = {}
62 
63 class ExitNow (exceptions.Exception):
64  pass
65 
66 DEBUG = 0
67 
68 def poll (timeout=0.0, map=None):
69  if map is None:
70  map = socket_map
71  if map:
72  r = []; w = []; e = []
73  for fd, obj in map.items():
74  if obj.readable():
75  r.append (fd)
76  if obj.writable():
77  w.append (fd)
78  try:
79  r,w,e = select.select (r,w,e, timeout)
80  except select.error, err:
81  if err[0] != EINTR:
82  raise
83  r = []; w = []; e = []
84 
85  if DEBUG:
86  print r,w,e
87 
88  for fd in r:
89  try:
90  obj = map[fd]
91  except KeyError:
92  continue
93 
94  try:
95  obj.handle_read_event()
96  except ExitNow:
97  raise ExitNow
98  except:
99  obj.handle_error()
100 
101  for fd in w:
102  try:
103  obj = map[fd]
104  except KeyError:
105  continue
106 
107  try:
108  obj.handle_write_event()
109  except ExitNow:
110  raise ExitNow
111  except:
112  obj.handle_error()
113 
114 def poll2 (timeout=0.0, map=None):
115  import poll
116  if map is None:
117  map=socket_map
118  if timeout is not None:
119  # timeout is in milliseconds
120  timeout = int(timeout*1000)
121  if map:
122  l = []
123  for fd, obj in map.items():
124  flags = 0
125  if obj.readable():
126  flags = poll.POLLIN
127  if obj.writable():
128  flags = flags | poll.POLLOUT
129  if flags:
130  l.append ((fd, flags))
131  r = poll.poll (l, timeout)
132  for fd, flags in r:
133  try:
134  obj = map[fd]
135  except KeyError:
136  continue
137 
138  try:
139  if (flags & poll.POLLIN):
140  obj.handle_read_event()
141  if (flags & poll.POLLOUT):
142  obj.handle_write_event()
143  except ExitNow:
144  raise ExitNow
145  except:
146  obj.handle_error()
147 
148 def poll3 (timeout=0.0, map=None):
149  # Use the poll() support added to the select module in Python 2.0
150  if map is None:
151  map=socket_map
152  if timeout is not None:
153  # timeout is in milliseconds
154  timeout = int(timeout*1000)
155  pollster = select.poll()
156  if map:
157  for fd, obj in map.items():
158  flags = 0
159  if obj.readable():
160  flags = select.POLLIN
161  if obj.writable():
162  flags = flags | select.POLLOUT
163  if flags:
164  pollster.register(fd, flags)
165  try:
166  r = pollster.poll (timeout)
167  except select.error, err:
168  if err[0] != EINTR:
169  raise
170  r = []
171  for fd, flags in r:
172  try:
173  obj = map[fd]
174  except KeyError:
175  continue
176 
177  try:
178  if (flags & select.POLLIN):
179  obj.handle_read_event()
180  if (flags & select.POLLOUT):
181  obj.handle_write_event()
182  except ExitNow:
183  raise ExitNow
184  except:
185  obj.handle_error()
186 
187 def loop (timeout=30.0, use_poll=0, map=None):
188 
189  if map is None:
190  map=socket_map
191 
192  if use_poll:
193  if hasattr (select, 'poll'):
194  poll_fun = poll3
195  else:
196  poll_fun = poll2
197  else:
198  poll_fun = poll
199 
200  while map:
201  poll_fun (timeout, map)
202 
204  debug = 0
205  connected = 0
206  accepting = 0
207  closing = 0
208  addr = None
209 
210  def __init__ (self, sock=None, map=None):
211  if sock:
212  self.set_socket (sock, map)
213  # I think it should inherit this anyway
214  self.socket.setblocking (0)
215  self.connected = 1
216  # XXX Does the constructor require that the socket passed
217  # be connected?
218  try:
219  self.addr = sock.getpeername()
220  except socket.error:
221  # The addr isn't crucial
222  pass
223  else:
224  self.socket = None
225 
226  def __repr__ (self):
227  status = [self.__class__.__module__+"."+self.__class__.__name__]
228  if self.accepting and self.addr:
229  status.append ('listening')
230  elif self.connected:
231  status.append ('connected')
232  if self.addr is not None:
233  try:
234  status.append ('%s:%d' % self.addr)
235  except TypeError:
236  status.append (repr(self.addr))
237  return '<%s at %#x>' % (' '.join (status), id (self))
238 
239  def add_channel (self, map=None):
240  #self.log_info ('adding channel %s' % self)
241  if map is None:
242  map=socket_map
243  map [self._fileno] = self
244 
245  def del_channel (self, map=None):
246  fd = self._fileno
247  if map is None:
248  map=socket_map
249  if map.has_key (fd):
250  #self.log_info ('closing channel %d:%s' % (fd, self))
251  del map [fd]
252 
253  def create_socket (self, family, type):
254  self.family_and_type = family, type
255  self.socket = socket.socket (family, type)
256  self.socket.setblocking(0)
257  self._fileno = self.socket.fileno()
258  self.add_channel()
259 
260  def set_socket (self, sock, map=None):
261  self.socket = sock
262 ## self.__dict__['socket'] = sock
263  self._fileno = sock.fileno()
264  self.add_channel (map)
265 
266  def set_reuse_addr (self):
267  # try to re-use a server port if possible
268  try:
269  self.socket.setsockopt (
270  socket.SOL_SOCKET, socket.SO_REUSEADDR,
271  self.socket.getsockopt (socket.SOL_SOCKET,
272  socket.SO_REUSEADDR) | 1
273  )
274  except socket.error:
275  pass
276 
277  # ==================================================
278  # predicates for select()
279  # these are used as filters for the lists of sockets
280  # to pass to select().
281  # ==================================================
282 
283  def readable (self):
284  return 1
285 
286  if os.name == 'mac':
287  # The macintosh will select a listening socket for
288  # write if you let it. What might this mean?
289  def writable (self):
290  return not self.accepting
291  else:
292  def writable (self):
293  return 1
294 
295  # ==================================================
296  # socket object methods.
297  # ==================================================
298 
299  def listen (self, num):
300  self.accepting = 1
301  if os.name == 'nt' and num > 5:
302  num = 1
303  return self.socket.listen (num)
304 
305  def bind (self, addr):
306  self.addr = addr
307  return self.socket.bind (addr)
308 
309  def connect (self, address):
310  self.connected = 0
311  err = self.socket.connect_ex(address)
312  if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
313  return
314  if err in (0, EISCONN):
315  self.addr = address
316  self.connected = 1
317  self.handle_connect()
318  else:
319  raise socket.error, err
320 
321  def accept (self):
322  try:
323  conn, addr = self.socket.accept()
324  return conn, addr
325  except socket.error, why:
326  if why[0] == EWOULDBLOCK:
327  pass
328  else:
329  raise socket.error, why
330 
331  def send (self, data):
332  try:
333  result = self.socket.send (data)
334  return result
335  except socket.error, why:
336  if why[0] == EWOULDBLOCK:
337  return 0
338  else:
339  raise socket.error, why
340  return 0
341 
342  def recv (self, buffer_size):
343  try:
344  data = self.socket.recv (buffer_size)
345  if not data:
346  # a closed connection is indicated by signaling
347  # a read condition, and having recv() return 0.
348  self.handle_close()
349  return ''
350  else:
351  return data
352  except socket.error, why:
353  # winsock sometimes throws ENOTCONN
354  if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
355  self.handle_close()
356  return ''
357  else:
358  raise socket.error, why
359 
360  def close (self):
361  self.del_channel()
362  self.socket.close()
363 
364  # cheap inheritance, used to pass all other attribute
365  # references to the underlying socket object.
366  def __getattr__ (self, attr):
367  return getattr (self.socket, attr)
368 
369  # log and log_info maybe overriden to provide more sophisitcated
370  # logging and warning methods. In general, log is for 'hit' logging
371  # and 'log_info' is for informational, warning and error logging.
372 
373  def log (self, message):
374  sys.stderr.write ('log: %s\n' % str(message))
375 
376  def log_info (self, message, type='info'):
377  if __debug__ or type != 'info':
378  print '%s: %s' % (type, message)
379 
380  def handle_read_event (self):
381  if self.accepting:
382  # for an accepting socket, getting a read implies
383  # that we are connected
384  if not self.connected:
385  self.connected = 1
386  self.handle_accept()
387  elif not self.connected:
388  self.handle_connect()
389  self.connected = 1
390  self.handle_read()
391  else:
392  self.handle_read()
393 
394  def handle_write_event (self):
395  # getting a write implies that we are connected
396  if not self.connected:
397  self.handle_connect()
398  self.connected = 1
399  self.handle_write()
400 
401  def handle_expt_event (self):
402  self.handle_expt()
403 
404  def handle_error (self):
405  nil, t, v, tbinfo = compact_traceback()
406 
407  # sometimes a user repr method will crash.
408  try:
409  self_repr = repr (self)
410  except:
411  self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
412 
413  self.log_info (
414  'uncaptured python exception, closing channel %s (%s:%s %s)' % (
415  self_repr,
416  t,
417  v,
418  tbinfo
419  ),
420  'error'
421  )
422  self.close()
423 
424  def handle_expt (self):
425  self.log_info ('unhandled exception', 'warning')
426 
427  def handle_read (self):
428  self.log_info ('unhandled read event', 'warning')
429 
430  def handle_write (self):
431  self.log_info ('unhandled write event', 'warning')
432 
433  def handle_connect (self):
434  self.log_info ('unhandled connect event', 'warning')
435 
436  def handle_accept (self):
437  self.log_info ('unhandled accept event', 'warning')
438 
439  def handle_close (self):
440  self.log_info ('unhandled close event', 'warning')
441  self.close()
442 
443 # ---------------------------------------------------------------------------
444 # adds simple buffered output capability, useful for simple clients.
445 # [for more sophisticated usage use asynchat.async_chat]
446 # ---------------------------------------------------------------------------
447 
449  def __init__ (self, sock=None):
450  dispatcher.__init__ (self, sock)
451  self.out_buffer = ''
452 
453  def initiate_send (self):
454  num_sent = 0
455  num_sent = dispatcher.send (self, self.out_buffer[:512])
456  self.out_buffer = self.out_buffer[num_sent:]
457 
458  def handle_write (self):
459  self.initiate_send()
460 
461  def writable (self):
462  return (not self.connected) or len(self.out_buffer)
463 
464  def send (self, data):
465  if self.debug:
466  self.log_info ('sending %s' % repr(data))
467  self.out_buffer = self.out_buffer + data
468  self.initiate_send()
469 
470 # ---------------------------------------------------------------------------
471 # used for debugging.
472 # ---------------------------------------------------------------------------
473 
475  t,v,tb = sys.exc_info()
476  tbinfo = []
477  while 1:
478  tbinfo.append ((
479  tb.tb_frame.f_code.co_filename,
480  tb.tb_frame.f_code.co_name,
481  str(tb.tb_lineno)
482  ))
483  tb = tb.tb_next
484  if not tb:
485  break
486 
487  # just to be safe
488  del tb
489 
490  file, function, line = tbinfo[-1]
491  info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
492  return (file, function, line), t, v, info
493 
494 def close_all (map=None):
495  if map is None:
496  map=socket_map
497  for x in map.values():
498  x.socket.close()
499  map.clear()
500 
501 # Asynchronous File I/O:
502 #
503 # After a little research (reading man pages on various unixen, and
504 # digging through the linux kernel), I've determined that select()
505 # isn't meant for doing doing asynchronous file i/o.
506 # Heartening, though - reading linux/mm/filemap.c shows that linux
507 # supports asynchronous read-ahead. So _MOST_ of the time, the data
508 # will be sitting in memory for us already when we go to read it.
509 #
510 # What other OS's (besides NT) support async file i/o? [VMS?]
511 #
512 # Regardless, this is useful for pipes, and stdin/stdout...
513 
514 if os.name == 'posix':
515  import fcntl
516 
518  # here we override just enough to make a file
519  # look like a socket for the purposes of asyncore.
520  def __init__ (self, fd):
521  self.fd = fd
522 
523  def recv (self, *args):
524  return apply (os.read, (self.fd,)+args)
525 
526  def send (self, *args):
527  return apply (os.write, (self.fd,)+args)
528 
529  read = recv
530  write = send
531 
532  def close (self):
533  return os.close (self.fd)
534 
535  def fileno (self):
536  return self.fd
537 
539  def __init__ (self, fd):
540  dispatcher.__init__ (self)
541  self.connected = 1
542  # set it to non-blocking mode
543  flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
544  flags = flags | os.O_NONBLOCK
545  fcntl.fcntl (fd, fcntl.F_SETFL, flags)
546  self.set_file (fd)
547 
548  def set_file (self, fd):
549  self._fileno = fd
550  self.socket = file_wrapper (fd)
551  self.add_channel()