1 """Proposed new threading module, emulating a subset of Java's threading model."""
18 _start_new_thread = thread.start_new_thread
19 _allocate_lock = thread.allocate_lock
20 _get_ident = thread.get_ident
21 ThreadError = thread.error
24 _print_exc = traceback.print_exc
44 def _note(self, format, *args):
46 format = format % args
47 format =
"%s: %s\n" % (
49 _sys.stderr.write(format)
56 def _note(self, *args):
65 return apply(_RLock, args, kwargs)
70 _Verbose.__init__(self, verbose)
76 return "<%s(%s, %d)>" % (
77 self.__class__.__name__,
78 self.
__owner and self.__owner.getName(),
86 self.
_note(
"%s.acquire(%s): recursive success", self, blocking)
88 rc = self.__block.acquire(blocking)
93 self.
_note(
"%s.acquire(%s): initial succes", self, blocking)
96 self.
_note(
"%s.acquire(%s): failure", self, blocking)
101 assert self.
__owner is me,
"release() of un-acquire()d lock"
105 self.__block.release()
107 self.
_note(
"%s.release(): final release", self)
110 self.
_note(
"%s.release(): non-final release", self)
114 def _acquire_restore(self, (count, owner)):
115 self.__block.acquire()
119 self.
_note(
"%s._acquire_restore()", self)
121 def _release_save(self):
123 self.
_note(
"%s._release_save()", self)
128 self.__block.release()
129 return (count, owner)
136 return apply(_Condition, args, kwargs)
141 _Verbose.__init__(self, verbose)
153 except AttributeError:
157 except AttributeError:
161 except AttributeError:
168 def _release_save(self):
169 self.__lock.release()
171 def _acquire_restore(self, x):
172 self.__lock.acquire()
175 if self.__lock.acquire(0):
176 self.__lock.release()
183 assert self.
_is_owned(),
"wait() of un-acquire()d lock"
186 self.__waiters.append(waiter)
192 self.
_note(
"%s.wait(): got it", self)
199 endtime =
_time() + timeout
202 gotit = waiter.acquire(0)
205 remaining = endtime -
_time()
208 delay =
min(delay * 2, remaining, .05)
212 self.
_note(
"%s.wait(%s): timed out", self, timeout)
214 self.__waiters.remove(waiter)
219 self.
_note(
"%s.wait(%s): got it", self, timeout)
225 assert self.
_is_owned(),
"notify() of un-acquire()d lock"
227 waiters = __waiters[:n]
230 self.
_note(
"%s.notify(): no waiters", self)
232 self.
_note(
"%s.notify(): notifying %d waiter%s", self, n,
234 for waiter
in waiters:
237 __waiters.remove(waiter)
246 return apply(_Semaphore, args, kwargs)
253 assert value >= 0,
"Semaphore initial value must be >= 0"
254 _Verbose.__init__(self, verbose)
260 self.__cond.acquire()
265 self.
_note(
"%s.acquire(%s): blocked waiting, value=%s",
271 self.
_note(
"%s.acquire: success, value=%s",
274 self.__cond.release()
278 self.__cond.acquire()
281 self.
_note(
"%s.release: success, value=%s",
284 self.__cond.release()
288 return apply(_BoundedSemaphore, args, kwargs)
291 """Semaphore that checks that # releases is <= # acquires"""
293 _Semaphore.__init__(self, value, verbose)
298 raise ValueError,
"Semaphore released too many times"
299 return _Semaphore.release(self)
303 return apply(_Event, args, kwargs)
310 _Verbose.__init__(self, verbose)
318 self.__cond.acquire()
320 self.__cond.notifyAll()
321 self.__cond.release()
324 self.__cond.acquire()
326 self.__cond.release()
329 self.__cond.acquire()
331 self.__cond.wait(timeout)
332 self.__cond.release()
336 def _newname(template="Thread-%d"):
338 _counter = _counter + 1
339 return template % _counter
353 def __init__(self, group=None, target=None, name=None,
354 args=(), kwargs={}, verbose=
None):
355 assert group
is None,
"group argument must be None for now"
356 _Verbose.__init__(self, verbose)
367 def _set_daemon(self):
372 assert self.
__initialized,
"Thread.__init__() was not called"
379 status = status +
" daemon"
380 return "<%s(%s, %s)>" % (self.__class__.__name__, self.
__name, status)
384 assert not self.
__started,
"thread already started"
386 self.
_note(
"%s.start(): starting thread", self)
387 _active_limbo_lock.acquire()
389 _active_limbo_lock.release()
398 def __bootstrap(self):
401 _active_limbo_lock.acquire()
404 _active_limbo_lock.release()
406 self.
_note(
"%s.__bootstrap(): thread started", self)
411 self.
_note(
"%s.__bootstrap(): raised SystemExit", self)
414 self.
_note(
"%s.__bootstrap(): unhandled exception", self)
417 _sys.stderr.write(
"Exception in thread %s:\n%s\n" %
418 (self.
getName(), s.getvalue()))
421 self.
_note(
"%s.__bootstrap(): normal return", self)
430 self.__block.acquire()
432 self.__block.notifyAll()
433 self.__block.release()
436 _active_limbo_lock.acquire()
438 _active_limbo_lock.release()
442 assert self.
__started,
"cannot join thread before it is started"
443 assert self
is not currentThread(),
"cannot join current thread"
446 self.
_note(
"%s.join(): waiting until thread stops", self)
447 self.__block.acquire()
452 self.
_note(
"%s.join(): thread stopped", self)
454 deadline =
_time() + timeout
456 delay = deadline -
_time()
459 self.
_note(
"%s.join(): timed out", self)
461 self.__block.wait(delay)
464 self.
_note(
"%s.join(): thread stopped", self)
465 self.__block.release()
485 assert not self.
__started,
"cannot set daemon status of active thread"
491 return _Timer(*args, **kwargs)
494 """Call a function after a specified number of seconds:
496 t = Timer(30.0, f, args=[], kwargs={})
498 t.cancel() # stop the timer's action if it's still waiting
501 def __init__(self, interval, function, args=[], kwargs={}):
502 Thread.__init__(self)
510 """Stop the timer if it hasn't finished yet"""
515 if not self.finished.isSet():
525 Thread.__init__(self, name=
"MainThread")
527 _active_limbo_lock.acquire()
529 _active_limbo_lock.release()
533 def _set_daemon(self):
536 def __exitfunc(self):
538 t = _pickSomeNonDaemonThread()
541 self.
_note(
"%s: waiting for other threads", self)
544 t = _pickSomeNonDaemonThread()
546 self.
_note(
"%s: exiting", self)
547 self._Thread__delete()
549 def _pickSomeNonDaemonThread():
551 if not t.isDaemon()
and t.isAlive():
566 Thread.__init__(self, name=_newname(
"Dummy-%d"))
568 _active_limbo_lock.acquire()
570 _active_limbo_lock.release()
572 def _set_daemon(self):
576 assert 0,
"cannot join a dummy thread"
589 _active_limbo_lock.acquire()
590 count = len(_active) + len(_limbo)
591 _active_limbo_lock.release()
595 _active_limbo_lock.acquire()
596 active = _active.values() + _limbo.values()
597 _active_limbo_lock.release()
613 _Verbose.__init__(self)
622 while len(self.queue) >= self.limit:
623 self._note(
"put(%s): queue full", item)
625 self.queue.append(item)
626 self._note(
"put(%s): appended, length now %d",
627 item, len(self.queue))
633 while not self.queue:
634 self._note(
"get(): queue empty")
638 self._note(
"get(): got %s, %d left", item, len(self.queue))
643 class ProducerThread(
Thread):
646 Thread.__init__(self, name=
"Producer")
651 from random
import random
653 while counter < self.quota:
654 counter = counter + 1
655 self.queue.put(
"%s.%d" % (self.getName(), counter))
659 class ConsumerThread(
Thread):
662 Thread.__init__(self, name=
"Consumer")
667 while self.count > 0:
668 item = self.queue.get()
670 self.count = self.count - 1
679 t = ProducerThread(Q, NI)
680 t.setName(
"Producer-%d" % (i+1))
682 C = ConsumerThread(Q, NI*NP)
691 if __name__ ==
'__main__':