28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
55 from errno
import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN
68 def poll (timeout=0.0, map=None):
72 r = []; w = []; e = []
73 for fd, obj
in map.items():
79 r,w,e = select.select (r,w,e, timeout)
80 except select.error, err:
83 r = []; w = []; e = []
95 obj.handle_read_event()
108 obj.handle_write_event()
118 if timeout
is not None:
120 timeout = int(timeout*1000)
123 for fd, obj
in map.items():
128 flags = flags | poll.POLLOUT
130 l.append ((fd, flags))
131 r = poll.poll (l, timeout)
139 if (flags & poll.POLLIN):
140 obj.handle_read_event()
141 if (flags & poll.POLLOUT):
142 obj.handle_write_event()
152 if timeout
is not None:
154 timeout = int(timeout*1000)
155 pollster = select.poll()
157 for fd, obj
in map.items():
160 flags = select.POLLIN
162 flags = flags | select.POLLOUT
164 pollster.register(fd, flags)
166 r = pollster.poll (timeout)
167 except select.error, err:
178 if (flags & select.POLLIN):
179 obj.handle_read_event()
180 if (flags & select.POLLOUT):
181 obj.handle_write_event()
187 def loop (timeout=30.0, use_poll=0, map=None):
193 if hasattr (select,
'poll'):
201 poll_fun (timeout, map)
214 self.socket.setblocking (0)
219 self.
addr = sock.getpeername()
227 status = [self.__class__.__module__+
"."+self.__class__.__name__]
229 status.append (
'listening')
231 status.append (
'connected')
232 if self.
addr is not None:
234 status.append (
'%s:%d' % self.
addr)
237 return '<%s at %#x>' % (
' '.join (status), id (self))
255 self.
socket = socket.socket (family, type)
256 self.socket.setblocking(0)
257 self.
_fileno = self.socket.fileno()
269 self.socket.setsockopt (
270 socket.SOL_SOCKET, socket.SO_REUSEADDR,
271 self.socket.getsockopt (socket.SOL_SOCKET,
272 socket.SO_REUSEADDR) | 1
290 return not self.accepting
301 if os.name ==
'nt' and num > 5:
303 return self.socket.listen (num)
307 return self.socket.bind (addr)
311 err = self.socket.connect_ex(address)
312 if err
in (EINPROGRESS, EALREADY, EWOULDBLOCK):
314 if err
in (0, EISCONN):
317 self.handle_connect()
319 raise socket.error, err
323 conn, addr = self.socket.accept()
325 except socket.error, why:
326 if why[0] == EWOULDBLOCK:
329 raise socket.error, why
333 result = self.socket.send (data)
335 except socket.error, why:
336 if why[0] == EWOULDBLOCK:
339 raise socket.error, why
344 data = self.socket.recv (buffer_size)
352 except socket.error, why:
354 if why[0]
in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
358 raise socket.error, why
367 return getattr (self.socket, attr)
374 sys.stderr.write (
'log: %s\n' %
str(message))
377 if __debug__
or type !=
'info':
378 print '%s: %s' % (type, message)
384 if not self.connected:
387 elif not self.connected:
388 self.handle_connect()
396 if not self.connected:
397 self.handle_connect()
409 self_repr = repr (self)
411 self_repr =
'<__repr__ (self) failed for object at %0x>' % id(self)
414 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
425 self.log_info (
'unhandled exception',
'warning')
428 self.log_info (
'unhandled read event',
'warning')
431 self.log_info (
'unhandled write event',
'warning')
434 self.log_info (
'unhandled connect event',
'warning')
437 self.log_info (
'unhandled accept event',
'warning')
440 self.log_info (
'unhandled close event',
'warning')
450 dispatcher.__init__ (self, sock)
455 num_sent = dispatcher.send (self, self.
out_buffer[:512])
466 self.log_info (
'sending %s' %
repr(data))
475 t,v,tb = sys.exc_info()
479 tb.tb_frame.f_code.co_filename,
480 tb.tb_frame.f_code.co_name,
490 file, function, line = tbinfo[-1]
491 info =
'[' +
'] ['.
join(map(
lambda x:
'|'.
join(x), tbinfo)) +
']'
492 return (file, function, line), t, v, info
497 for x
in map.values():
514 if os.name ==
'posix':
524 return apply (os.read, (self.
fd,)+args)
527 return apply (os.write, (self.
fd,)+args)
533 return os.close (self.
fd)
540 dispatcher.__init__ (self)
543 flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
544 flags = flags | os.O_NONBLOCK
545 fcntl.fcntl (fd, fcntl.F_SETFL, flags)