Module threading
[hide private]
[frames] | no frames]

Source Code for Module threading

  1  """Thread module emulating a subset of Java's threading model.""" 
  2   
  3  import sys as _sys 
  4   
  5  try: 
  6      import thread 
  7  except ImportError: 
  8      del _sys.modules[__name__] 
  9      raise 
 10   
 11  import warnings 
 12   
 13  from functools import wraps 
 14  from time import time as _time, sleep as _sleep 
 15  from traceback import format_exc as _format_exc 
 16  from collections import deque 
 17   
 18  # Note regarding PEP 8 compliant aliases 
 19  #  This threading model was originally inspired by Java, and inherited 
 20  # the convention of camelCase function and method names from that 
 21  # language. While those names are not in any imminent danger of being 
 22  # deprecated, starting with Python 2.6, the module now provides a 
 23  # PEP 8 compliant alias for any such method name. 
 24  # Using the new PEP 8 compliant names also facilitates substitution 
 25  # with the multiprocessing module, which doesn't provide the old 
 26  # Java inspired names. 
 27   
 28   
 29  # Rename some stuff so "from threading import *" is safe 
 30  __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 
 31             'current_thread', 'enumerate', 'Event', 
 32             'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 
 33             'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 
 34   
 35  _start_new_thread = thread.start_new_thread 
 36  _allocate_lock = thread.allocate_lock 
 37  _get_ident = thread.get_ident 
 38  ThreadError = thread.error 
 39  del thread 
 40   
 41   
 42  # sys.exc_clear is used to work around the fact that except blocks 
 43  # don't fully clear the exception until 3.0. 
 44  warnings.filterwarnings('ignore', category=DeprecationWarning, 
 45                          module='threading', message='sys.exc_clear') 
 46   
 47  # Debug support (adapted from ihooks.py). 
 48  # All the major classes here derive from _Verbose.  We force that to 
 49  # be a new-style class so that all the major classes here are new-style. 
 50  # This helps debugging (type(instance) is more revealing for instances 
 51  # of new-style classes). 
 52   
 53  _VERBOSE = False 
 54   
 55  if __debug__: 
56 57 - class _Verbose(object):
58
59 - def __init__(self, verbose=None):
60 if verbose is None: 61 verbose = _VERBOSE 62 self.__verbose = verbose
63
64 - def _note(self, format, *args):
65 if self.__verbose: 66 format = format % args 67 format = "%s: %s\n" % ( 68 current_thread().name, format) 69 _sys.stderr.write(format)
70 71 else:
72 # Disable this when using "python -O" 73 - class _Verbose(object):
74 - def __init__(self, verbose=None):
75 pass
76 - def _note(self, *args):
77 pass
78 79 # Support for profile and trace hooks 80 81 _profile_hook = None 82 _trace_hook = None
83 84 -def setprofile(func):
85 global _profile_hook 86 _profile_hook = func
87
88 -def settrace(func):
89 global _trace_hook 90 _trace_hook = func
91 92 # Synchronization classes 93 94 Lock = _allocate_lock
95 96 -def RLock(*args, **kwargs):
97 return _RLock(*args, **kwargs)
98
99 -class _RLock(_Verbose):
100
101 - def __init__(self, verbose=None):
102 _Verbose.__init__(self, verbose) 103 self.__block = _allocate_lock() 104 self.__owner = None 105 self.__count = 0
106
107 - def __repr__(self):
108 owner = self.__owner 109 try: 110 owner = _active[owner].name 111 except KeyError: 112 pass 113 return "<%s owner=%r count=%d>" % ( 114 self.__class__.__name__, owner, self.__count)
115
116 - def acquire(self, blocking=1):
117 me = _get_ident() 118 if self.__owner == me: 119 self.__count = self.__count + 1 120 if __debug__: 121 self._note("%s.acquire(%s): recursive success", self, blocking) 122 return 1 123 rc = self.__block.acquire(blocking) 124 if rc: 125 self.__owner = me 126 self.__count = 1 127 if __debug__: 128 self._note("%s.acquire(%s): initial success", self, blocking) 129 else: 130 if __debug__: 131 self._note("%s.acquire(%s): failure", self, blocking) 132 return rc
133 134 __enter__ = acquire 135
136 - def release(self):
137 if self.__owner != _get_ident(): 138 raise RuntimeError("cannot release un-acquired lock") 139 self.__count = count = self.__count - 1 140 if not count: 141 self.__owner = None 142 self.__block.release() 143 if __debug__: 144 self._note("%s.release(): final release", self) 145 else: 146 if __debug__: 147 self._note("%s.release(): non-final release", self)
148
149 - def __exit__(self, t, v, tb):
150 self.release()
151 152 # Internal methods used by condition variables 153
154 - def _acquire_restore(self, count_owner):
155 count, owner = count_owner 156 self.__block.acquire() 157 self.__count = count 158 self.__owner = owner 159 if __debug__: 160 self._note("%s._acquire_restore()", self)
161
162 - def _release_save(self):
163 if __debug__: 164 self._note("%s._release_save()", self) 165 count = self.__count 166 self.__count = 0 167 owner = self.__owner 168 self.__owner = None 169 self.__block.release() 170 return (count, owner)
171
172 - def _is_owned(self):
173 return self.__owner == _get_ident()
174
175 176 -def Condition(*args, **kwargs):
177 return _Condition(*args, **kwargs)
178
179 -class _Condition(_Verbose):
180
181 - def __init__(self, lock=None, verbose=None):
182 _Verbose.__init__(self, verbose) 183 if lock is None: 184 lock = RLock() 185 self.__lock = lock 186 # Export the lock's acquire() and release() methods 187 self.acquire = lock.acquire 188 self.release = lock.release 189 # If the lock defines _release_save() and/or _acquire_restore(), 190 # these override the default implementations (which just call 191 # release() and acquire() on the lock). Ditto for _is_owned(). 192 try: 193 self._release_save = lock._release_save 194 except AttributeError: 195 pass 196 try: 197 self._acquire_restore = lock._acquire_restore 198 except AttributeError: 199 pass 200 try: 201 self._is_owned = lock._is_owned 202 except AttributeError: 203 pass 204 self.__waiters = []
205
206 - def __enter__(self):
207 return self.__lock.__enter__()
208
209 - def __exit__(self, *args):
210 return self.__lock.__exit__(*args)
211
212 - def __repr__(self):
213 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
214
215 - def _release_save(self):
216 self.__lock.release() # No state to save
217
218 - def _acquire_restore(self, x):
219 self.__lock.acquire() # Ignore saved state
220
221 - def _is_owned(self):
222 # Return True if lock is owned by current_thread. 223 # This method is called only if __lock doesn't have _is_owned(). 224 if self.__lock.acquire(0): 225 self.__lock.release() 226 return False 227 else: 228 return True
229
230 - def wait(self, timeout=None):
231 if not self._is_owned(): 232 raise RuntimeError("cannot wait on un-acquired lock") 233 waiter = _allocate_lock() 234 waiter.acquire() 235 self.__waiters.append(waiter) 236 saved_state = self._release_save() 237 try: # restore state no matter what (e.g., KeyboardInterrupt) 238 if timeout is None: 239 waiter.acquire() 240 if __debug__: 241 self._note("%s.wait(): got it", self) 242 else: 243 # Balancing act: We can't afford a pure busy loop, so we 244 # have to sleep; but if we sleep the whole timeout time, 245 # we'll be unresponsive. The scheme here sleeps very 246 # little at first, longer as time goes on, but never longer 247 # than 20 times per second (or the timeout time remaining). 248 endtime = _time() + timeout 249 delay = 0.0005 # 500 us -> initial delay of 1 ms 250 while True: 251 gotit = waiter.acquire(0) 252 if gotit: 253 break 254 remaining = endtime - _time() 255 if remaining <= 0: 256 break 257 delay = min(delay * 2, remaining, .05) 258 _sleep(delay) 259 if not gotit: 260 if __debug__: 261 self._note("%s.wait(%s): timed out", self, timeout) 262 try: 263 self.__waiters.remove(waiter) 264 except ValueError: 265 pass 266 else: 267 if __debug__: 268 self._note("%s.wait(%s): got it", self, timeout) 269 finally: 270 self._acquire_restore(saved_state)
271
272 - def notify(self, n=1):
273 if not self._is_owned(): 274 raise RuntimeError("cannot notify on un-acquired lock") 275 __waiters = self.__waiters 276 waiters = __waiters[:n] 277 if not waiters: 278 if __debug__: 279 self._note("%s.notify(): no waiters", self) 280 return 281 self._note("%s.notify(): notifying %d waiter%s", self, n, 282 n!=1 and "s" or "") 283 for waiter in waiters: 284 waiter.release() 285 try: 286 __waiters.remove(waiter) 287 except ValueError: 288 pass
289
290 - def notifyAll(self):
291 self.notify(len(self.__waiters))
292 293 notify_all = notifyAll
294
295 296 -def Semaphore(*args, **kwargs):
297 return _Semaphore(*args, **kwargs)
298
299 -class _Semaphore(_Verbose):
300 301 # After Tim Peters' semaphore class, but not quite the same (no maximum) 302
303 - def __init__(self, value=1, verbose=None):
304 if value < 0: 305 raise ValueError("semaphore initial value must be >= 0") 306 _Verbose.__init__(self, verbose) 307 self.__cond = Condition(Lock()) 308 self.__value = value
309
310 - def acquire(self, blocking=1):
311 rc = False 312 self.__cond.acquire() 313 while self.__value == 0: 314 if not blocking: 315 break 316 if __debug__: 317 self._note("%s.acquire(%s): blocked waiting, value=%s", 318 self, blocking, self.__value) 319 self.__cond.wait() 320 else: 321 self.__value = self.__value - 1 322 if __debug__: 323 self._note("%s.acquire: success, value=%s", 324 self, self.__value) 325 rc = True 326 self.__cond.release() 327 return rc
328 329 __enter__ = acquire 330
331 - def release(self):
332 self.__cond.acquire() 333 self.__value = self.__value + 1 334 if __debug__: 335 self._note("%s.release: success, value=%s", 336 self, self.__value) 337 self.__cond.notify() 338 self.__cond.release()
339
340 - def __exit__(self, t, v, tb):
341 self.release()
342
343 344 -def BoundedSemaphore(*args, **kwargs):
345 return _BoundedSemaphore(*args, **kwargs)
346
347 -class _BoundedSemaphore(_Semaphore):
348 """Semaphore that checks that # releases is <= # acquires"""
349 - def __init__(self, value=1, verbose=None):
350 _Semaphore.__init__(self, value, verbose) 351 self._initial_value = value
352
353 - def release(self):
354 if self._Semaphore__value >= self._initial_value: 355 raise ValueError, "Semaphore released too many times" 356 return _Semaphore.release(self)
357
358 359 -def Event(*args, **kwargs):
360 return _Event(*args, **kwargs)
361
362 -class _Event(_Verbose):
363 364 # After Tim Peters' event class (without is_posted()) 365
366 - def __init__(self, verbose=None):
367 _Verbose.__init__(self, verbose) 368 self.__cond = Condition(Lock()) 369 self.__flag = False
370
371 - def isSet(self):
372 return self.__flag
373 374 is_set = isSet 375
376 - def set(self):
377 self.__cond.acquire() 378 try: 379 self.__flag = True 380 self.__cond.notify_all() 381 finally: 382 self.__cond.release()
383
384 - def clear(self):
385 self.__cond.acquire() 386 try: 387 self.__flag = False 388 finally: 389 self.__cond.release()
390
391 - def wait(self, timeout=None):
392 self.__cond.acquire() 393 try: 394 if not self.__flag: 395 self.__cond.wait(timeout) 396 finally: 397 self.__cond.release()
398 399 # Helper to generate new thread names 400 _counter = 0
401 -def _newname(template="Thread-%d"):
402 global _counter 403 _counter = _counter + 1 404 return template % _counter
405 406 # Active thread administration 407 _active_limbo_lock = _allocate_lock() 408 _active = {} # maps thread id to Thread object 409 _limbo = {}
410 411 412 # Main class for threads 413 414 -class Thread(_Verbose):
415 416 __initialized = False 417 # Need to store a reference to sys.exc_info for printing 418 # out exceptions when a thread tries to use a global var. during interp. 419 # shutdown and thus raises an exception about trying to perform some 420 # operation on/with a NoneType 421 __exc_info = _sys.exc_info 422 # Keep sys.exc_clear too to clear the exception just before 423 # allowing .join() to return. 424 __exc_clear = _sys.exc_clear 425
426 - def __init__(self, group=None, target=None, name=None, 427 args=(), kwargs=None, verbose=None):
428 assert group is None, "group argument must be None for now" 429 _Verbose.__init__(self, verbose) 430 if kwargs is None: 431 kwargs = {} 432 self.__target = target 433 self.__name = str(name or _newname()) 434 self.__args = args 435 self.__kwargs = kwargs 436 self.__daemonic = self._set_daemon() 437 self.__ident = None 438 self.__started = Event() 439 self.__stopped = False 440 self.__block = Condition(Lock()) 441 self.__initialized = True 442 # sys.stderr is not stored in the class like 443 # sys.exc_info since it can be changed between instances 444 self.__stderr = _sys.stderr
445
446 - def _set_daemon(self):
447 # Overridden in _MainThread and _DummyThread 448 return current_thread().daemon
449
450 - def __repr__(self):
451 assert self.__initialized, "Thread.__init__() was not called" 452 status = "initial" 453 if self.__started.is_set(): 454 status = "started" 455 if self.__stopped: 456 status = "stopped" 457 if self.__daemonic: 458 status += " daemon" 459 if self.__ident is not None: 460 status += " %s" % self.__ident 461 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
462
463 - def start(self):
464 if not self.__initialized: 465 raise RuntimeError("thread.__init__() not called") 466 if self.__started.is_set(): 467 raise RuntimeError("thread already started") 468 if __debug__: 469 self._note("%s.start(): starting thread", self) 470 _active_limbo_lock.acquire() 471 _limbo[self] = self 472 _active_limbo_lock.release() 473 try: 474 _start_new_thread(self.__bootstrap, ()) 475 except Exception: 476 with _active_limbo_lock: 477 del _limbo[self] 478 raise 479 self.__started.wait()
480
481 - def run(self):
482 try: 483 if self.__target: 484 self.__target(*self.__args, **self.__kwargs) 485 finally: 486 # Avoid a refcycle if the thread is running a function with 487 # an argument that has a member that points to the thread. 488 del self.__target, self.__args, self.__kwargs
489
490 - def __bootstrap(self):
491 # Wrapper around the real bootstrap code that ignores 492 # exceptions during interpreter cleanup. Those typically 493 # happen when a daemon thread wakes up at an unfortunate 494 # moment, finds the world around it destroyed, and raises some 495 # random exception *** while trying to report the exception in 496 # __bootstrap_inner() below ***. Those random exceptions 497 # don't help anybody, and they confuse users, so we suppress 498 # them. We suppress them only when it appears that the world 499 # indeed has already been destroyed, so that exceptions in 500 # __bootstrap_inner() during normal business hours are properly 501 # reported. Also, we only suppress them for daemonic threads; 502 # if a non-daemonic encounters this, something else is wrong. 503 try: 504 self.__bootstrap_inner() 505 except: 506 if self.__daemonic and _sys is None: 507 return 508 raise
509
510 - def _set_ident(self):
511 self.__ident = _get_ident()
512
513 - def __bootstrap_inner(self):
514 try: 515 self._set_ident() 516 self.__started.set() 517 _active_limbo_lock.acquire() 518 _active[self.__ident] = self 519 del _limbo[self] 520 _active_limbo_lock.release() 521 if __debug__: 522 self._note("%s.__bootstrap(): thread started", self) 523 524 if _trace_hook: 525 self._note("%s.__bootstrap(): registering trace hook", self) 526 _sys.settrace(_trace_hook) 527 if _profile_hook: 528 self._note("%s.__bootstrap(): registering profile hook", self) 529 _sys.setprofile(_profile_hook) 530 531 try: 532 self.run() 533 except SystemExit: 534 if __debug__: 535 self._note("%s.__bootstrap(): raised SystemExit", self) 536 except: 537 if __debug__: 538 self._note("%s.__bootstrap(): unhandled exception", self) 539 # If sys.stderr is no more (most likely from interpreter 540 # shutdown) use self.__stderr. Otherwise still use sys (as in 541 # _sys) in case sys.stderr was redefined since the creation of 542 # self. 543 if _sys: 544 _sys.stderr.write("Exception in thread %s:\n%s\n" % 545 (self.name, _format_exc())) 546 else: 547 # Do the best job possible w/o a huge amt. of code to 548 # approximate a traceback (code ideas from 549 # Lib/traceback.py) 550 exc_type, exc_value, exc_tb = self.__exc_info() 551 try: 552 print>>self.__stderr, ( 553 "Exception in thread " + self.name + 554 " (most likely raised during interpreter shutdown):") 555 print>>self.__stderr, ( 556 "Traceback (most recent call last):") 557 while exc_tb: 558 print>>self.__stderr, ( 559 ' File "%s", line %s, in %s' % 560 (exc_tb.tb_frame.f_code.co_filename, 561 exc_tb.tb_lineno, 562 exc_tb.tb_frame.f_code.co_name)) 563 exc_tb = exc_tb.tb_next 564 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 565 # Make sure that exc_tb gets deleted since it is a memory 566 # hog; deleting everything else is just for thoroughness 567 finally: 568 del exc_type, exc_value, exc_tb 569 else: 570 if __debug__: 571 self._note("%s.__bootstrap(): normal return", self) 572 finally: 573 # Prevent a race in 574 # test_threading.test_no_refcycle_through_target when 575 # the exception keeps the target alive past when we 576 # assert that it's dead. 577 self.__exc_clear() 578 finally: 579 with _active_limbo_lock: 580 self.__stop() 581 try: 582 # We don't call self.__delete() because it also 583 # grabs _active_limbo_lock. 584 del _active[_get_ident()] 585 except: 586 pass
587
588 - def __stop(self):
589 self.__block.acquire() 590 self.__stopped = True 591 self.__block.notify_all() 592 self.__block.release()
593
594 - def __delete(self):
595 "Remove current thread from the dict of currently running threads." 596 597 # Notes about running with dummy_thread: 598 # 599 # Must take care to not raise an exception if dummy_thread is being 600 # used (and thus this module is being used as an instance of 601 # dummy_threading). dummy_thread.get_ident() always returns -1 since 602 # there is only one thread if dummy_thread is being used. Thus 603 # len(_active) is always <= 1 here, and any Thread instance created 604 # overwrites the (if any) thread currently registered in _active. 605 # 606 # An instance of _MainThread is always created by 'threading'. This 607 # gets overwritten the instant an instance of Thread is created; both 608 # threads return -1 from dummy_thread.get_ident() and thus have the 609 # same key in the dict. So when the _MainThread instance created by 610 # 'threading' tries to clean itself up when atexit calls this method 611 # it gets a KeyError if another Thread instance was created. 612 # 613 # This all means that KeyError from trying to delete something from 614 # _active if dummy_threading is being used is a red herring. But 615 # since it isn't if dummy_threading is *not* being used then don't 616 # hide the exception. 617 618 try: 619 with _active_limbo_lock: 620 del _active[_get_ident()] 621 # There must not be any python code between the previous line 622 # and after the lock is released. Otherwise a tracing function 623 # could try to acquire the lock again in the same thread, (in 624 # current_thread()), and would block. 625 except KeyError: 626 if 'dummy_threading' not in _sys.modules: 627 raise
628
629 - def join(self, timeout=None):
630 if not self.__initialized: 631 raise RuntimeError("Thread.__init__() not called") 632 if not self.__started.is_set(): 633 raise RuntimeError("cannot join thread before it is started") 634 if self is current_thread(): 635 raise RuntimeError("cannot join current thread") 636 637 if __debug__: 638 if not self.__stopped: 639 self._note("%s.join(): waiting until thread stops", self) 640 self.__block.acquire() 641 try: 642 if timeout is None: 643 while not self.__stopped: 644 self.__block.wait() 645 if __debug__: 646 self._note("%s.join(): thread stopped", self) 647 else: 648 deadline = _time() + timeout 649 while not self.__stopped: 650 delay = deadline - _time() 651 if delay <= 0: 652 if __debug__: 653 self._note("%s.join(): timed out", self) 654 break 655 self.__block.wait(delay) 656 else: 657 if __debug__: 658 self._note("%s.join(): thread stopped", self) 659 finally: 660 self.__block.release()
661 662 @property
663 - def name(self):
664 assert self.__initialized, "Thread.__init__() not called" 665 return self.__name
666 667 @name.setter
668 - def name(self, name):
669 assert self.__initialized, "Thread.__init__() not called" 670 self.__name = str(name)
671 672 @property
673 - def ident(self):
674 assert self.__initialized, "Thread.__init__() not called" 675 return self.__ident
676
677 - def isAlive(self):
678 assert self.__initialized, "Thread.__init__() not called" 679 return self.__started.is_set() and not self.__stopped
680 681 is_alive = isAlive 682 683 @property
684 - def daemon(self):
685 assert self.__initialized, "Thread.__init__() not called" 686 return self.__daemonic
687 688 @daemon.setter
689 - def daemon(self, daemonic):
690 if not self.__initialized: 691 raise RuntimeError("Thread.__init__() not called") 692 if self.__started.is_set(): 693 raise RuntimeError("cannot set daemon status of active thread"); 694 self.__daemonic = daemonic
695
696 - def isDaemon(self):
697 return self.daemon
698
699 - def setDaemon(self, daemonic):
700 self.daemon = daemonic
701
702 - def getName(self):
703 return self.name
704
705 - def setName(self, name):
706 self.name = name
707
708 # The timer class was contributed by Itamar Shtull-Trauring 709 710 -def Timer(*args, **kwargs):
711 return _Timer(*args, **kwargs)
712
713 -class _Timer(Thread):
714 """Call a function after a specified number of seconds: 715 716 t = Timer(30.0, f, args=[], kwargs={}) 717 t.start() 718 t.cancel() # stop the timer's action if it's still waiting 719 """ 720
721 - def __init__(self, interval, function, args=[], kwargs={}):
722 Thread.__init__(self) 723 self.interval = interval 724 self.function = function 725 self.args = args 726 self.kwargs = kwargs 727 self.finished = Event()
728
729 - def cancel(self):
730 """Stop the timer if it hasn't finished yet""" 731 self.finished.set()
732
733 - def run(self):
734 self.finished.wait(self.interval) 735 if not self.finished.is_set(): 736 self.function(*self.args, **self.kwargs) 737 self.finished.set()
738
739 # Special thread class to represent the main thread 740 # This is garbage collected through an exit handler 741 742 -class _MainThread(Thread):
743
744 - def __init__(self):
745 Thread.__init__(self, name="MainThread") 746 self._Thread__started.set() 747 self._set_ident() 748 _active_limbo_lock.acquire() 749 _active[_get_ident()] = self 750 _active_limbo_lock.release()
751
752 - def _set_daemon(self):
753 return False
754
755 - def _exitfunc(self):
756 self._Thread__stop() 757 t = _pickSomeNonDaemonThread() 758 if t: 759 if __debug__: 760 self._note("%s: waiting for other threads", self) 761 while t: 762 t.join() 763 t = _pickSomeNonDaemonThread() 764 if __debug__: 765 self._note("%s: exiting", self) 766 self._Thread__delete()
767
768 -def _pickSomeNonDaemonThread():
769 for t in enumerate(): 770 if not t.daemon and t.is_alive(): 771 return t 772 return None
773
774 775 # Dummy thread class to represent threads not started here. 776 # These aren't garbage collected when they die, nor can they be waited for. 777 # If they invoke anything in threading.py that calls current_thread(), they 778 # leave an entry in the _active dict forever after. 779 # Their purpose is to return *something* from current_thread(). 780 # They are marked as daemon threads so we won't wait for them 781 # when we exit (conform previous semantics). 782 783 -class _DummyThread(Thread):
784
785 - def __init__(self):
786 Thread.__init__(self, name=_newname("Dummy-%d")) 787 788 # Thread.__block consumes an OS-level locking primitive, which 789 # can never be used by a _DummyThread. Since a _DummyThread 790 # instance is immortal, that's bad, so release this resource. 791 del self._Thread__block 792 793 self._Thread__started.set() 794 self._set_ident() 795 _active_limbo_lock.acquire() 796 _active[_get_ident()] = self 797 _active_limbo_lock.release()
798
799 - def _set_daemon(self):
800 return True
801
802 - def join(self, timeout=None):
803 assert False, "cannot join a dummy thread"
804
805 806 # Global API functions 807 808 -def currentThread():
809 try: 810 return _active[_get_ident()] 811 except KeyError: 812 ##print "current_thread(): no current thread for", _get_ident() 813 return _DummyThread()
814 815 current_thread = currentThread
816 817 -def activeCount():
818 _active_limbo_lock.acquire() 819 count = len(_active) + len(_limbo) 820 _active_limbo_lock.release() 821 return count
822 823 active_count = activeCount
824 825 -def _enumerate():
826 # Same as enumerate(), but without the lock. Internal use only. 827 return _active.values() + _limbo.values()
828
829 -def enumerate():
830 _active_limbo_lock.acquire() 831 active = _active.values() + _limbo.values() 832 _active_limbo_lock.release() 833 return active
834 835 from thread import stack_size 836 837 # Create the main thread object, 838 # and make it available for the interpreter 839 # (Py_Main) as threading._shutdown. 840 841 _shutdown = _MainThread()._exitfunc 842 843 # get thread-local implementation, either from the thread 844 # module, or from the python fallback 845 846 try: 847 from thread import _local as local 848 except ImportError: 849 from _threading_local import local
850 851 852 -def _after_fork():
853 # This function is called by Python/ceval.c:PyEval_ReInitThreads which 854 # is called from PyOS_AfterFork. Here we cleanup threading module state 855 # that should not exist after a fork. 856 857 # Reset _active_limbo_lock, in case we forked while the lock was held 858 # by another (non-forked) thread. http://bugs.python.org/issue874900 859 global _active_limbo_lock 860 _active_limbo_lock = _allocate_lock() 861 862 # fork() only copied the current thread; clear references to others. 863 new_active = {} 864 current = current_thread() 865 with _active_limbo_lock: 866 for thread in _active.itervalues(): 867 if thread is current: 868 # There is only one active thread. We reset the ident to 869 # its new value since it can have changed. 870 ident = _get_ident() 871 thread._Thread__ident = ident 872 new_active[ident] = thread 873 else: 874 # All the others are already stopped. 875 # We don't call _Thread__stop() because it tries to acquire 876 # thread._Thread__block which could also have been held while 877 # we forked. 878 thread._Thread__stopped = True 879 880 _limbo.clear() 881 _active.clear() 882 _active.update(new_active) 883 assert len(_active) == 1
884
885 886 # Self-test code 887 888 -def _test():
889 890 class BoundedQueue(_Verbose): 891 892 def __init__(self, limit): 893 _Verbose.__init__(self) 894 self.mon = RLock() 895 self.rc = Condition(self.mon) 896 self.wc = Condition(self.mon) 897 self.limit = limit 898 self.queue = deque()
899 900 def put(self, item): 901 self.mon.acquire() 902 while len(self.queue) >= self.limit: 903 self._note("put(%s): queue full", item) 904 self.wc.wait() 905 self.queue.append(item) 906 self._note("put(%s): appended, length now %d", 907 item, len(self.queue)) 908 self.rc.notify() 909 self.mon.release() 910 911 def get(self): 912 self.mon.acquire() 913 while not self.queue: 914 self._note("get(): queue empty") 915 self.rc.wait() 916 item = self.queue.popleft() 917 self._note("get(): got %s, %d left", item, len(self.queue)) 918 self.wc.notify() 919 self.mon.release() 920 return item 921 922 class ProducerThread(Thread): 923 924 def __init__(self, queue, quota): 925 Thread.__init__(self, name="Producer") 926 self.queue = queue 927 self.quota = quota 928 929 def run(self): 930 from random import random 931 counter = 0 932 while counter < self.quota: 933 counter = counter + 1 934 self.queue.put("%s.%d" % (self.name, counter)) 935 _sleep(random() * 0.00001) 936 937 938 class ConsumerThread(Thread): 939 940 def __init__(self, queue, count): 941 Thread.__init__(self, name="Consumer") 942 self.queue = queue 943 self.count = count 944 945 def run(self): 946 while self.count > 0: 947 item = self.queue.get() 948 print item 949 self.count = self.count - 1 950 951 NP = 3 952 QL = 4 953 NI = 5 954 955 Q = BoundedQueue(QL) 956 P = [] 957 for i in range(NP): 958 t = ProducerThread(Q, NI) 959 t.name = ("Producer-%d" % (i+1)) 960 P.append(t) 961 C = ConsumerThread(Q, NI*NP) 962 for t in P: 963 t.start() 964 _sleep(0.000001) 965 C.start() 966 for t in P: 967 t.join() 968 C.join() 969 970 if __name__ == '__main__': 971 _test() 972