Module threading
[hide private]
[frames] | no frames]

Source Code for Module threading

   1  """Thread module emulating a subset of Java's threading model.""" 
   2   
   3  import sys as _sys 
   4   
   5  try: 
   6      import thread 
   7  except ImportError: 
   8      del _sys.modules[__name__] 
   9      raise 
  10   
  11  import warnings 
  12   
  13  from collections import deque as _deque 
  14  from itertools import count as _count 
  15  from time import time as _time, sleep as _sleep 
  16  from traceback import format_exc as _format_exc 
  17   
  18  # Note regarding PEP 8 compliant aliases 
  19  #  This threading model was originally inspired by Java, and inherited 
  20  # the convention of camelCase function and method names from that 
  21  # language. While those names are not in any imminent danger of being 
  22  # deprecated, starting with Python 2.6, the module now provides a 
  23  # PEP 8 compliant alias for any such method name. 
  24  # Using the new PEP 8 compliant names also facilitates substitution 
  25  # with the multiprocessing module, which doesn't provide the old 
  26  # Java inspired names. 
  27   
  28   
  29  # Rename some stuff so "from threading import *" is safe 
  30  __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 
  31             'current_thread', 'enumerate', 'Event', 
  32             'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 
  33             'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 
  34   
  35  _start_new_thread = thread.start_new_thread 
  36  _allocate_lock = thread.allocate_lock 
  37  _get_ident = thread.get_ident 
  38  ThreadError = thread.error 
  39  del thread 
  40   
  41   
  42  # sys.exc_clear is used to work around the fact that except blocks 
  43  # don't fully clear the exception until 3.0. 
  44  warnings.filterwarnings('ignore', category=DeprecationWarning, 
  45                          module='threading', message='sys.exc_clear') 
  46   
  47  # Debug support (adapted from ihooks.py). 
  48  # All the major classes here derive from _Verbose.  We force that to 
  49  # be a new-style class so that all the major classes here are new-style. 
  50  # This helps debugging (type(instance) is more revealing for instances 
  51  # of new-style classes). 
  52   
  53  _VERBOSE = False 
  54   
  55  if __debug__: 
56 57 - class _Verbose(object):
58
59 - def __init__(self, verbose=None):
60 if verbose is None: 61 verbose = _VERBOSE 62 self.__verbose = verbose
63
64 - def _note(self, format, *args):
65 if self.__verbose: 66 format = format % args 67 # Issue #4188: calling current_thread() can incur an infinite 68 # recursion if it has to create a DummyThread on the fly. 69 ident = _get_ident() 70 try: 71 name = _active[ident].name 72 except KeyError: 73 name = "<OS thread %d>" % ident 74 format = "%s: %s\n" % (name, format) 75 _sys.stderr.write(format)
76 77 else:
78 # Disable this when using "python -O" 79 - class _Verbose(object):
80 - def __init__(self, verbose=None):
81 pass
82 - def _note(self, *args):
83 pass
84 85 # Support for profile and trace hooks 86 87 _profile_hook = None 88 _trace_hook = None
89 90 -def setprofile(func):
91 """Set a profile function for all threads started from the threading module. 92 93 The func will be passed to sys.setprofile() for each thread, before its 94 run() method is called. 95 96 """ 97 global _profile_hook 98 _profile_hook = func
99
100 -def settrace(func):
101 """Set a trace function for all threads started from the threading module. 102 103 The func will be passed to sys.settrace() for each thread, before its run() 104 method is called. 105 106 """ 107 global _trace_hook 108 _trace_hook = func
109 110 # Synchronization classes 111 112 Lock = _allocate_lock
113 114 -def RLock(*args, **kwargs):
115 """Factory function that returns a new reentrant lock. 116 117 A reentrant lock must be released by the thread that acquired it. Once a 118 thread has acquired a reentrant lock, the same thread may acquire it again 119 without blocking; the thread must release it once for each time it has 120 acquired it. 121 122 """ 123 return _RLock(*args, **kwargs)
124
125 -class _RLock(_Verbose):
126 """A reentrant lock must be released by the thread that acquired it. Once a 127 thread has acquired a reentrant lock, the same thread may acquire it 128 again without blocking; the thread must release it once for each time it 129 has acquired it. 130 """ 131
132 - def __init__(self, verbose=None):
133 _Verbose.__init__(self, verbose) 134 self.__block = _allocate_lock() 135 self.__owner = None 136 self.__count = 0
137
138 - def __repr__(self):
139 owner = self.__owner 140 try: 141 owner = _active[owner].name 142 except KeyError: 143 pass 144 return "<%s owner=%r count=%d>" % ( 145 self.__class__.__name__, owner, self.__count)
146
147 - def acquire(self, blocking=1):
148 """Acquire a lock, blocking or non-blocking. 149 150 When invoked without arguments: if this thread already owns the lock, 151 increment the recursion level by one, and return immediately. Otherwise, 152 if another thread owns the lock, block until the lock is unlocked. Once 153 the lock is unlocked (not owned by any thread), then grab ownership, set 154 the recursion level to one, and return. If more than one thread is 155 blocked waiting until the lock is unlocked, only one at a time will be 156 able to grab ownership of the lock. There is no return value in this 157 case. 158 159 When invoked with the blocking argument set to true, do the same thing 160 as when called without arguments, and return true. 161 162 When invoked with the blocking argument set to false, do not block. If a 163 call without an argument would block, return false immediately; 164 otherwise, do the same thing as when called without arguments, and 165 return true. 166 167 """ 168 me = _get_ident() 169 if self.__owner == me: 170 self.__count = self.__count + 1 171 if __debug__: 172 self._note("%s.acquire(%s): recursive success", self, blocking) 173 return 1 174 rc = self.__block.acquire(blocking) 175 if rc: 176 self.__owner = me 177 self.__count = 1 178 if __debug__: 179 self._note("%s.acquire(%s): initial success", self, blocking) 180 else: 181 if __debug__: 182 self._note("%s.acquire(%s): failure", self, blocking) 183 return rc
184 185 __enter__ = acquire 186
187 - def release(self):
188 """Release a lock, decrementing the recursion level. 189 190 If after the decrement it is zero, reset the lock to unlocked (not owned 191 by any thread), and if any other threads are blocked waiting for the 192 lock to become unlocked, allow exactly one of them to proceed. If after 193 the decrement the recursion level is still nonzero, the lock remains 194 locked and owned by the calling thread. 195 196 Only call this method when the calling thread owns the lock. A 197 RuntimeError is raised if this method is called when the lock is 198 unlocked. 199 200 There is no return value. 201 202 """ 203 if self.__owner != _get_ident(): 204 raise RuntimeError("cannot release un-acquired lock") 205 self.__count = count = self.__count - 1 206 if not count: 207 self.__owner = None 208 self.__block.release() 209 if __debug__: 210 self._note("%s.release(): final release", self) 211 else: 212 if __debug__: 213 self._note("%s.release(): non-final release", self)
214
215 - def __exit__(self, t, v, tb):
216 self.release()
217 218 # Internal methods used by condition variables 219
220 - def _acquire_restore(self, count_owner):
221 count, owner = count_owner 222 self.__block.acquire() 223 self.__count = count 224 self.__owner = owner 225 if __debug__: 226 self._note("%s._acquire_restore()", self)
227
228 - def _release_save(self):
229 if __debug__: 230 self._note("%s._release_save()", self) 231 count = self.__count 232 self.__count = 0 233 owner = self.__owner 234 self.__owner = None 235 self.__block.release() 236 return (count, owner)
237
238 - def _is_owned(self):
239 return self.__owner == _get_ident()
240
241 242 -def Condition(*args, **kwargs):
243 """Factory function that returns a new condition variable object. 244 245 A condition variable allows one or more threads to wait until they are 246 notified by another thread. 247 248 If the lock argument is given and not None, it must be a Lock or RLock 249 object, and it is used as the underlying lock. Otherwise, a new RLock object 250 is created and used as the underlying lock. 251 252 """ 253 return _Condition(*args, **kwargs)
254
255 -class _Condition(_Verbose):
256 """Condition variables allow one or more threads to wait until they are 257 notified by another thread. 258 """ 259
260 - def __init__(self, lock=None, verbose=None):
261 _Verbose.__init__(self, verbose) 262 if lock is None: 263 lock = RLock() 264 self.__lock = lock 265 # Export the lock's acquire() and release() methods 266 self.acquire = lock.acquire 267 self.release = lock.release 268 # If the lock defines _release_save() and/or _acquire_restore(), 269 # these override the default implementations (which just call 270 # release() and acquire() on the lock). Ditto for _is_owned(). 271 try: 272 self._release_save = lock._release_save 273 except AttributeError: 274 pass 275 try: 276 self._acquire_restore = lock._acquire_restore 277 except AttributeError: 278 pass 279 try: 280 self._is_owned = lock._is_owned 281 except AttributeError: 282 pass 283 self.__waiters = []
284
285 - def __enter__(self):
286 return self.__lock.__enter__()
287
288 - def __exit__(self, *args):
289 return self.__lock.__exit__(*args)
290
291 - def __repr__(self):
292 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
293
294 - def _release_save(self):
295 self.__lock.release() # No state to save
296
297 - def _acquire_restore(self, x):
298 self.__lock.acquire() # Ignore saved state
299
300 - def _is_owned(self):
301 # Return True if lock is owned by current_thread. 302 # This method is called only if __lock doesn't have _is_owned(). 303 if self.__lock.acquire(0): 304 self.__lock.release() 305 return False 306 else: 307 return True
308
309 - def wait(self, timeout=None):
310 """Wait until notified or until a timeout occurs. 311 312 If the calling thread has not acquired the lock when this method is 313 called, a RuntimeError is raised. 314 315 This method releases the underlying lock, and then blocks until it is 316 awakened by a notify() or notifyAll() call for the same condition 317 variable in another thread, or until the optional timeout occurs. Once 318 awakened or timed out, it re-acquires the lock and returns. 319 320 When the timeout argument is present and not None, it should be a 321 floating point number specifying a timeout for the operation in seconds 322 (or fractions thereof). 323 324 When the underlying lock is an RLock, it is not released using its 325 release() method, since this may not actually unlock the lock when it 326 was acquired multiple times recursively. Instead, an internal interface 327 of the RLock class is used, which really unlocks it even when it has 328 been recursively acquired several times. Another internal interface is 329 then used to restore the recursion level when the lock is reacquired. 330 331 """ 332 if not self._is_owned(): 333 raise RuntimeError("cannot wait on un-acquired lock") 334 waiter = _allocate_lock() 335 waiter.acquire() 336 self.__waiters.append(waiter) 337 saved_state = self._release_save() 338 try: # restore state no matter what (e.g., KeyboardInterrupt) 339 if timeout is None: 340 waiter.acquire() 341 if __debug__: 342 self._note("%s.wait(): got it", self) 343 else: 344 # Balancing act: We can't afford a pure busy loop, so we 345 # have to sleep; but if we sleep the whole timeout time, 346 # we'll be unresponsive. The scheme here sleeps very 347 # little at first, longer as time goes on, but never longer 348 # than 20 times per second (or the timeout time remaining). 349 endtime = _time() + timeout 350 delay = 0.0005 # 500 us -> initial delay of 1 ms 351 while True: 352 gotit = waiter.acquire(0) 353 if gotit: 354 break 355 remaining = endtime - _time() 356 if remaining <= 0: 357 break 358 delay = min(delay * 2, remaining, .05) 359 _sleep(delay) 360 if not gotit: 361 if __debug__: 362 self._note("%s.wait(%s): timed out", self, timeout) 363 try: 364 self.__waiters.remove(waiter) 365 except ValueError: 366 pass 367 else: 368 if __debug__: 369 self._note("%s.wait(%s): got it", self, timeout) 370 finally: 371 self._acquire_restore(saved_state)
372
373 - def notify(self, n=1):
374 """Wake up one or more threads waiting on this condition, if any. 375 376 If the calling thread has not acquired the lock when this method is 377 called, a RuntimeError is raised. 378 379 This method wakes up at most n of the threads waiting for the condition 380 variable; it is a no-op if no threads are waiting. 381 382 """ 383 if not self._is_owned(): 384 raise RuntimeError("cannot notify on un-acquired lock") 385 __waiters = self.__waiters 386 waiters = __waiters[:n] 387 if not waiters: 388 if __debug__: 389 self._note("%s.notify(): no waiters", self) 390 return 391 self._note("%s.notify(): notifying %d waiter%s", self, n, 392 n!=1 and "s" or "") 393 for waiter in waiters: 394 waiter.release() 395 try: 396 __waiters.remove(waiter) 397 except ValueError: 398 pass
399
400 - def notifyAll(self):
401 """Wake up all threads waiting on this condition. 402 403 If the calling thread has not acquired the lock when this method 404 is called, a RuntimeError is raised. 405 406 """ 407 self.notify(len(self.__waiters))
408 409 notify_all = notifyAll
410
411 412 -def Semaphore(*args, **kwargs):
413 """A factory function that returns a new semaphore. 414 415 Semaphores manage a counter representing the number of release() calls minus 416 the number of acquire() calls, plus an initial value. The acquire() method 417 blocks if necessary until it can return without making the counter 418 negative. If not given, value defaults to 1. 419 420 """ 421 return _Semaphore(*args, **kwargs)
422
423 -class _Semaphore(_Verbose):
424 """Semaphores manage a counter representing the number of release() calls 425 minus the number of acquire() calls, plus an initial value. The acquire() 426 method blocks if necessary until it can return without making the counter 427 negative. If not given, value defaults to 1. 428 429 """ 430 431 # After Tim Peters' semaphore class, but not quite the same (no maximum) 432
433 - def __init__(self, value=1, verbose=None):
434 if value < 0: 435 raise ValueError("semaphore initial value must be >= 0") 436 _Verbose.__init__(self, verbose) 437 self.__cond = Condition(Lock()) 438 self.__value = value
439
440 - def acquire(self, blocking=1):
441 """Acquire a semaphore, decrementing the internal counter by one. 442 443 When invoked without arguments: if the internal counter is larger than 444 zero on entry, decrement it by one and return immediately. If it is zero 445 on entry, block, waiting until some other thread has called release() to 446 make it larger than zero. This is done with proper interlocking so that 447 if multiple acquire() calls are blocked, release() will wake exactly one 448 of them up. The implementation may pick one at random, so the order in 449 which blocked threads are awakened should not be relied on. There is no 450 return value in this case. 451 452 When invoked with blocking set to true, do the same thing as when called 453 without arguments, and return true. 454 455 When invoked with blocking set to false, do not block. If a call without 456 an argument would block, return false immediately; otherwise, do the 457 same thing as when called without arguments, and return true. 458 459 """ 460 rc = False 461 with self.__cond: 462 while self.__value == 0: 463 if not blocking: 464 break 465 if __debug__: 466 self._note("%s.acquire(%s): blocked waiting, value=%s", 467 self, blocking, self.__value) 468 self.__cond.wait() 469 else: 470 self.__value = self.__value - 1 471 if __debug__: 472 self._note("%s.acquire: success, value=%s", 473 self, self.__value) 474 rc = True 475 return rc
476 477 __enter__ = acquire 478
479 - def release(self):
480 """Release a semaphore, incrementing the internal counter by one. 481 482 When the counter is zero on entry and another thread is waiting for it 483 to become larger than zero again, wake up that thread. 484 485 """ 486 with self.__cond: 487 self.__value = self.__value + 1 488 if __debug__: 489 self._note("%s.release: success, value=%s", 490 self, self.__value) 491 self.__cond.notify()
492
493 - def __exit__(self, t, v, tb):
494 self.release()
495
496 497 -def BoundedSemaphore(*args, **kwargs):
498 """A factory function that returns a new bounded semaphore. 499 500 A bounded semaphore checks to make sure its current value doesn't exceed its 501 initial value. If it does, ValueError is raised. In most situations 502 semaphores are used to guard resources with limited capacity. 503 504 If the semaphore is released too many times it's a sign of a bug. If not 505 given, value defaults to 1. 506 507 Like regular semaphores, bounded semaphores manage a counter representing 508 the number of release() calls minus the number of acquire() calls, plus an 509 initial value. The acquire() method blocks if necessary until it can return 510 without making the counter negative. If not given, value defaults to 1. 511 512 """ 513 return _BoundedSemaphore(*args, **kwargs)
514
515 -class _BoundedSemaphore(_Semaphore):
516 """A bounded semaphore checks to make sure its current value doesn't exceed 517 its initial value. If it does, ValueError is raised. In most situations 518 semaphores are used to guard resources with limited capacity. 519 """ 520
521 - def __init__(self, value=1, verbose=None):
522 _Semaphore.__init__(self, value, verbose) 523 self._initial_value = value
524
525 - def release(self):
526 """Release a semaphore, incrementing the internal counter by one. 527 528 When the counter is zero on entry and another thread is waiting for it 529 to become larger than zero again, wake up that thread. 530 531 If the number of releases exceeds the number of acquires, 532 raise a ValueError. 533 534 """ 535 with self._Semaphore__cond: 536 if self._Semaphore__value >= self._initial_value: 537 raise ValueError("Semaphore released too many times") 538 self._Semaphore__value += 1 539 self._Semaphore__cond.notify()
540
541 542 -def Event(*args, **kwargs):
543 """A factory function that returns a new event. 544 545 Events manage a flag that can be set to true with the set() method and reset 546 to false with the clear() method. The wait() method blocks until the flag is 547 true. 548 549 """ 550 return _Event(*args, **kwargs)
551
552 -class _Event(_Verbose):
553 """A factory function that returns a new event object. An event manages a 554 flag that can be set to true with the set() method and reset to false 555 with the clear() method. The wait() method blocks until the flag is true. 556 557 """ 558 559 # After Tim Peters' event class (without is_posted()) 560
561 - def __init__(self, verbose=None):
562 _Verbose.__init__(self, verbose) 563 self.__cond = Condition(Lock()) 564 self.__flag = False
565
566 - def _reset_internal_locks(self):
567 # private! called by Thread._reset_internal_locks by _after_fork() 568 self.__cond.__init__(Lock())
569
570 - def isSet(self):
571 'Return true if and only if the internal flag is true.' 572 return self.__flag
573 574 is_set = isSet 575
576 - def set(self):
577 """Set the internal flag to true. 578 579 All threads waiting for the flag to become true are awakened. Threads 580 that call wait() once the flag is true will not block at all. 581 582 """ 583 with self.__cond: 584 self.__flag = True 585 self.__cond.notify_all()
586
587 - def clear(self):
588 """Reset the internal flag to false. 589 590 Subsequently, threads calling wait() will block until set() is called to 591 set the internal flag to true again. 592 593 """ 594 with self.__cond: 595 self.__flag = False
596
597 - def wait(self, timeout=None):
598 """Block until the internal flag is true. 599 600 If the internal flag is true on entry, return immediately. Otherwise, 601 block until another thread calls set() to set the flag to true, or until 602 the optional timeout occurs. 603 604 When the timeout argument is present and not None, it should be a 605 floating point number specifying a timeout for the operation in seconds 606 (or fractions thereof). 607 608 This method returns the internal flag on exit, so it will always return 609 True except if a timeout is given and the operation times out. 610 611 """ 612 with self.__cond: 613 if not self.__flag: 614 self.__cond.wait(timeout) 615 return self.__flag
616 617 # Helper to generate new thread names 618 _counter = _count().next 619 _counter() # Consume 0 so first non-main thread has id 1.
620 -def _newname(template="Thread-%d"):
621 return template % _counter()
622 623 # Active thread administration 624 _active_limbo_lock = _allocate_lock() 625 _active = {} # maps thread id to Thread object 626 _limbo = {}
627 628 629 # Main class for threads 630 631 -class Thread(_Verbose):
632 """A class that represents a thread of control. 633 634 This class can be safely subclassed in a limited fashion. 635 636 """ 637 __initialized = False 638 # Need to store a reference to sys.exc_info for printing 639 # out exceptions when a thread tries to use a global var. during interp. 640 # shutdown and thus raises an exception about trying to perform some 641 # operation on/with a NoneType 642 __exc_info = _sys.exc_info 643 # Keep sys.exc_clear too to clear the exception just before 644 # allowing .join() to return. 645 __exc_clear = _sys.exc_clear 646
647 - def __init__(self, group=None, target=None, name=None, 648 args=(), kwargs=None, verbose=None):
649 """This constructor should always be called with keyword arguments. Arguments are: 650 651 *group* should be None; reserved for future extension when a ThreadGroup 652 class is implemented. 653 654 *target* is the callable object to be invoked by the run() 655 method. Defaults to None, meaning nothing is called. 656 657 *name* is the thread name. By default, a unique name is constructed of 658 the form "Thread-N" where N is a small decimal number. 659 660 *args* is the argument tuple for the target invocation. Defaults to (). 661 662 *kwargs* is a dictionary of keyword arguments for the target 663 invocation. Defaults to {}. 664 665 If a subclass overrides the constructor, it must make sure to invoke 666 the base class constructor (Thread.__init__()) before doing anything 667 else to the thread. 668 669 """ 670 assert group is None, "group argument must be None for now" 671 _Verbose.__init__(self, verbose) 672 if kwargs is None: 673 kwargs = {} 674 self.__target = target 675 self.__name = str(name or _newname()) 676 self.__args = args 677 self.__kwargs = kwargs 678 self.__daemonic = self._set_daemon() 679 self.__ident = None 680 self.__started = Event() 681 self.__stopped = False 682 self.__block = Condition(Lock()) 683 self.__initialized = True 684 # sys.stderr is not stored in the class like 685 # sys.exc_info since it can be changed between instances 686 self.__stderr = _sys.stderr
687
688 - def _reset_internal_locks(self):
689 # private! Called by _after_fork() to reset our internal locks as 690 # they may be in an invalid state leading to a deadlock or crash. 691 if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block 692 self.__block.__init__() 693 self.__started._reset_internal_locks()
694 695 @property
696 - def _block(self):
697 # used by a unittest 698 return self.__block
699
700 - def _set_daemon(self):
701 # Overridden in _MainThread and _DummyThread 702 return current_thread().daemon
703
704 - def __repr__(self):
705 assert self.__initialized, "Thread.__init__() was not called" 706 status = "initial" 707 if self.__started.is_set(): 708 status = "started" 709 if self.__stopped: 710 status = "stopped" 711 if self.__daemonic: 712 status += " daemon" 713 if self.__ident is not None: 714 status += " %s" % self.__ident 715 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
716
717 - def start(self):
718 """Start the thread's activity. 719 720 It must be called at most once per thread object. It arranges for the 721 object's run() method to be invoked in a separate thread of control. 722 723 This method will raise a RuntimeError if called more than once on the 724 same thread object. 725 726 """ 727 if not self.__initialized: 728 raise RuntimeError("thread.__init__() not called") 729 if self.__started.is_set(): 730 raise RuntimeError("threads can only be started once") 731 if __debug__: 732 self._note("%s.start(): starting thread", self) 733 with _active_limbo_lock: 734 _limbo[self] = self 735 try: 736 _start_new_thread(self.__bootstrap, ()) 737 except Exception: 738 with _active_limbo_lock: 739 del _limbo[self] 740 raise 741 self.__started.wait()
742
743 - def run(self):
744 """Method representing the thread's activity. 745 746 You may override this method in a subclass. The standard run() method 747 invokes the callable object passed to the object's constructor as the 748 target argument, if any, with sequential and keyword arguments taken 749 from the args and kwargs arguments, respectively. 750 751 """ 752 try: 753 if self.__target: 754 self.__target(*self.__args, **self.__kwargs) 755 finally: 756 # Avoid a refcycle if the thread is running a function with 757 # an argument that has a member that points to the thread. 758 del self.__target, self.__args, self.__kwargs
759
760 - def __bootstrap(self):
761 # Wrapper around the real bootstrap code that ignores 762 # exceptions during interpreter cleanup. Those typically 763 # happen when a daemon thread wakes up at an unfortunate 764 # moment, finds the world around it destroyed, and raises some 765 # random exception *** while trying to report the exception in 766 # __bootstrap_inner() below ***. Those random exceptions 767 # don't help anybody, and they confuse users, so we suppress 768 # them. We suppress them only when it appears that the world 769 # indeed has already been destroyed, so that exceptions in 770 # __bootstrap_inner() during normal business hours are properly 771 # reported. Also, we only suppress them for daemonic threads; 772 # if a non-daemonic encounters this, something else is wrong. 773 try: 774 self.__bootstrap_inner() 775 except: 776 if self.__daemonic and _sys is None: 777 return 778 raise
779
780 - def _set_ident(self):
781 self.__ident = _get_ident()
782
783 - def __bootstrap_inner(self):
784 try: 785 self._set_ident() 786 self.__started.set() 787 with _active_limbo_lock: 788 _active[self.__ident] = self 789 del _limbo[self] 790 if __debug__: 791 self._note("%s.__bootstrap(): thread started", self) 792 793 if _trace_hook: 794 self._note("%s.__bootstrap(): registering trace hook", self) 795 _sys.settrace(_trace_hook) 796 if _profile_hook: 797 self._note("%s.__bootstrap(): registering profile hook", self) 798 _sys.setprofile(_profile_hook) 799 800 try: 801 self.run() 802 except SystemExit: 803 if __debug__: 804 self._note("%s.__bootstrap(): raised SystemExit", self) 805 except: 806 if __debug__: 807 self._note("%s.__bootstrap(): unhandled exception", self) 808 # If sys.stderr is no more (most likely from interpreter 809 # shutdown) use self.__stderr. Otherwise still use sys (as in 810 # _sys) in case sys.stderr was redefined since the creation of 811 # self. 812 if _sys and _sys.stderr is not None: 813 print>>_sys.stderr, ("Exception in thread %s:\n%s" % 814 (self.name, _format_exc())) 815 elif self.__stderr is not None: 816 # Do the best job possible w/o a huge amt. of code to 817 # approximate a traceback (code ideas from 818 # Lib/traceback.py) 819 exc_type, exc_value, exc_tb = self.__exc_info() 820 try: 821 print>>self.__stderr, ( 822 "Exception in thread " + self.name + 823 " (most likely raised during interpreter shutdown):") 824 print>>self.__stderr, ( 825 "Traceback (most recent call last):") 826 while exc_tb: 827 print>>self.__stderr, ( 828 ' File "%s", line %s, in %s' % 829 (exc_tb.tb_frame.f_code.co_filename, 830 exc_tb.tb_lineno, 831 exc_tb.tb_frame.f_code.co_name)) 832 exc_tb = exc_tb.tb_next 833 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 834 # Make sure that exc_tb gets deleted since it is a memory 835 # hog; deleting everything else is just for thoroughness 836 finally: 837 del exc_type, exc_value, exc_tb 838 else: 839 if __debug__: 840 self._note("%s.__bootstrap(): normal return", self) 841 finally: 842 # Prevent a race in 843 # test_threading.test_no_refcycle_through_target when 844 # the exception keeps the target alive past when we 845 # assert that it's dead. 846 self.__exc_clear() 847 finally: 848 with _active_limbo_lock: 849 self.__stop() 850 try: 851 # We don't call self.__delete() because it also 852 # grabs _active_limbo_lock. 853 del _active[_get_ident()] 854 except: 855 pass
856
857 - def __stop(self):
858 # DummyThreads delete self.__block, but they have no waiters to 859 # notify anyway (join() is forbidden on them). 860 if not hasattr(self, '_Thread__block'): 861 return 862 self.__block.acquire() 863 self.__stopped = True 864 self.__block.notify_all() 865 self.__block.release()
866
867 - def __delete(self):
868 "Remove current thread from the dict of currently running threads." 869 870 # Notes about running with dummy_thread: 871 # 872 # Must take care to not raise an exception if dummy_thread is being 873 # used (and thus this module is being used as an instance of 874 # dummy_threading). dummy_thread.get_ident() always returns -1 since 875 # there is only one thread if dummy_thread is being used. Thus 876 # len(_active) is always <= 1 here, and any Thread instance created 877 # overwrites the (if any) thread currently registered in _active. 878 # 879 # An instance of _MainThread is always created by 'threading'. This 880 # gets overwritten the instant an instance of Thread is created; both 881 # threads return -1 from dummy_thread.get_ident() and thus have the 882 # same key in the dict. So when the _MainThread instance created by 883 # 'threading' tries to clean itself up when atexit calls this method 884 # it gets a KeyError if another Thread instance was created. 885 # 886 # This all means that KeyError from trying to delete something from 887 # _active if dummy_threading is being used is a red herring. But 888 # since it isn't if dummy_threading is *not* being used then don't 889 # hide the exception. 890 891 try: 892 with _active_limbo_lock: 893 del _active[_get_ident()] 894 # There must not be any python code between the previous line 895 # and after the lock is released. Otherwise a tracing function 896 # could try to acquire the lock again in the same thread, (in 897 # current_thread()), and would block. 898 except KeyError: 899 if 'dummy_threading' not in _sys.modules: 900 raise
901
902 - def join(self, timeout=None):
903 """Wait until the thread terminates. 904 905 This blocks the calling thread until the thread whose join() method is 906 called terminates -- either normally or through an unhandled exception 907 or until the optional timeout occurs. 908 909 When the timeout argument is present and not None, it should be a 910 floating point number specifying a timeout for the operation in seconds 911 (or fractions thereof). As join() always returns None, you must call 912 isAlive() after join() to decide whether a timeout happened -- if the 913 thread is still alive, the join() call timed out. 914 915 When the timeout argument is not present or None, the operation will 916 block until the thread terminates. 917 918 A thread can be join()ed many times. 919 920 join() raises a RuntimeError if an attempt is made to join the current 921 thread as that would cause a deadlock. It is also an error to join() a 922 thread before it has been started and attempts to do so raises the same 923 exception. 924 925 """ 926 if not self.__initialized: 927 raise RuntimeError("Thread.__init__() not called") 928 if not self.__started.is_set(): 929 raise RuntimeError("cannot join thread before it is started") 930 if self is current_thread(): 931 raise RuntimeError("cannot join current thread") 932 933 if __debug__: 934 if not self.__stopped: 935 self._note("%s.join(): waiting until thread stops", self) 936 self.__block.acquire() 937 try: 938 if timeout is None: 939 while not self.__stopped: 940 self.__block.wait() 941 if __debug__: 942 self._note("%s.join(): thread stopped", self) 943 else: 944 deadline = _time() + timeout 945 while not self.__stopped: 946 delay = deadline - _time() 947 if delay <= 0: 948 if __debug__: 949 self._note("%s.join(): timed out", self) 950 break 951 self.__block.wait(delay) 952 else: 953 if __debug__: 954 self._note("%s.join(): thread stopped", self) 955 finally: 956 self.__block.release()
957 958 @property
959 - def name(self):
960 """A string used for identification purposes only. 961 962 It has no semantics. Multiple threads may be given the same name. The 963 initial name is set by the constructor. 964 965 """ 966 assert self.__initialized, "Thread.__init__() not called" 967 return self.__name
968 969 @name.setter
970 - def name(self, name):
971 assert self.__initialized, "Thread.__init__() not called" 972 self.__name = str(name)
973 974 @property
975 - def ident(self):
976 """Thread identifier of this thread or None if it has not been started. 977 978 This is a nonzero integer. See the thread.get_ident() function. Thread 979 identifiers may be recycled when a thread exits and another thread is 980 created. The identifier is available even after the thread has exited. 981 982 """ 983 assert self.__initialized, "Thread.__init__() not called" 984 return self.__ident
985
986 - def isAlive(self):
987 """Return whether the thread is alive. 988 989 This method returns True just before the run() method starts until just 990 after the run() method terminates. The module function enumerate() 991 returns a list of all alive threads. 992 993 """ 994 assert self.__initialized, "Thread.__init__() not called" 995 return self.__started.is_set() and not self.__stopped
996 997 is_alive = isAlive 998 999 @property
1000 - def daemon(self):
1001 """A boolean value indicating whether this thread is a daemon thread (True) or not (False). 1002 1003 This must be set before start() is called, otherwise RuntimeError is 1004 raised. Its initial value is inherited from the creating thread; the 1005 main thread is not a daemon thread and therefore all threads created in 1006 the main thread default to daemon = False. 1007 1008 The entire Python program exits when no alive non-daemon threads are 1009 left. 1010 1011 """ 1012 assert self.__initialized, "Thread.__init__() not called" 1013 return self.__daemonic
1014 1015 @daemon.setter
1016 - def daemon(self, daemonic):
1017 if not self.__initialized: 1018 raise RuntimeError("Thread.__init__() not called") 1019 if self.__started.is_set(): 1020 raise RuntimeError("cannot set daemon status of active thread"); 1021 self.__daemonic = daemonic
1022
1023 - def isDaemon(self):
1024 return self.daemon
1025
1026 - def setDaemon(self, daemonic):
1027 self.daemon = daemonic
1028
1029 - def getName(self):
1030 return self.name
1031
1032 - def setName(self, name):
1033 self.name = name
1034
1035 # The timer class was contributed by Itamar Shtull-Trauring 1036 1037 -def Timer(*args, **kwargs):
1038 """Factory function to create a Timer object. 1039 1040 Timers call a function after a specified number of seconds: 1041 1042 t = Timer(30.0, f, args=[], kwargs={}) 1043 t.start() 1044 t.cancel() # stop the timer's action if it's still waiting 1045 1046 """ 1047 return _Timer(*args, **kwargs)
1048
1049 -class _Timer(Thread):
1050 """Call a function after a specified number of seconds: 1051 1052 t = Timer(30.0, f, args=[], kwargs={}) 1053 t.start() 1054 t.cancel() # stop the timer's action if it's still waiting 1055 1056 """ 1057
1058 - def __init__(self, interval, function, args=[], kwargs={}):
1059 Thread.__init__(self) 1060 self.interval = interval 1061 self.function = function 1062 self.args = args 1063 self.kwargs = kwargs 1064 self.finished = Event()
1065
1066 - def cancel(self):
1067 """Stop the timer if it hasn't finished yet""" 1068 self.finished.set()
1069
1070 - def run(self):
1071 self.finished.wait(self.interval) 1072 if not self.finished.is_set(): 1073 self.function(*self.args, **self.kwargs) 1074 self.finished.set()
1075
1076 # Special thread class to represent the main thread 1077 # This is garbage collected through an exit handler 1078 1079 -class _MainThread(Thread):
1080
1081 - def __init__(self):
1082 Thread.__init__(self, name="MainThread") 1083 self._Thread__started.set() 1084 self._set_ident() 1085 with _active_limbo_lock: 1086 _active[_get_ident()] = self
1087
1088 - def _set_daemon(self):
1089 return False
1090
1091 - def _exitfunc(self):
1092 self._Thread__stop() 1093 t = _pickSomeNonDaemonThread() 1094 if t: 1095 if __debug__: 1096 self._note("%s: waiting for other threads", self) 1097 while t: 1098 t.join() 1099 t = _pickSomeNonDaemonThread() 1100 if __debug__: 1101 self._note("%s: exiting", self) 1102 self._Thread__delete()
1103
1104 -def _pickSomeNonDaemonThread():
1105 for t in enumerate(): 1106 if not t.daemon and t.is_alive(): 1107 return t 1108 return None
1109
1110 1111 # Dummy thread class to represent threads not started here. 1112 # These aren't garbage collected when they die, nor can they be waited for. 1113 # If they invoke anything in threading.py that calls current_thread(), they 1114 # leave an entry in the _active dict forever after. 1115 # Their purpose is to return *something* from current_thread(). 1116 # They are marked as daemon threads so we won't wait for them 1117 # when we exit (conform previous semantics). 1118 1119 -class _DummyThread(Thread):
1120
1121 - def __init__(self):
1122 Thread.__init__(self, name=_newname("Dummy-%d")) 1123 1124 # Thread.__block consumes an OS-level locking primitive, which 1125 # can never be used by a _DummyThread. Since a _DummyThread 1126 # instance is immortal, that's bad, so release this resource. 1127 del self._Thread__block 1128 1129 self._Thread__started.set() 1130 self._set_ident() 1131 with _active_limbo_lock: 1132 _active[_get_ident()] = self
1133
1134 - def _set_daemon(self):
1135 return True
1136
1137 - def join(self, timeout=None):
1138 assert False, "cannot join a dummy thread"
1139
1140 1141 # Global API functions 1142 1143 -def currentThread():
1144 """Return the current Thread object, corresponding to the caller's thread of control. 1145 1146 If the caller's thread of control was not created through the threading 1147 module, a dummy thread object with limited functionality is returned. 1148 1149 """ 1150 try: 1151 return _active[_get_ident()] 1152 except KeyError: 1153 ##print "current_thread(): no current thread for", _get_ident() 1154 return _DummyThread()
1155 1156 current_thread = currentThread
1157 1158 -def activeCount():
1159 """Return the number of Thread objects currently alive. 1160 1161 The returned count is equal to the length of the list returned by 1162 enumerate(). 1163 1164 """ 1165 with _active_limbo_lock: 1166 return len(_active) + len(_limbo)
1167 1168 active_count = activeCount
1169 1170 -def _enumerate():
1171 # Same as enumerate(), but without the lock. Internal use only. 1172 return _active.values() + _limbo.values()
1173
1174 -def enumerate():
1175 """Return a list of all Thread objects currently alive. 1176 1177 The list includes daemonic threads, dummy thread objects created by 1178 current_thread(), and the main thread. It excludes terminated threads and 1179 threads that have not yet been started. 1180 1181 """ 1182 with _active_limbo_lock: 1183 return _active.values() + _limbo.values()
1184 1185 from thread import stack_size 1186 1187 # Create the main thread object, 1188 # and make it available for the interpreter 1189 # (Py_Main) as threading._shutdown. 1190 1191 _shutdown = _MainThread()._exitfunc 1192 1193 # get thread-local implementation, either from the thread 1194 # module, or from the python fallback 1195 1196 try: 1197 from thread import _local as local 1198 except ImportError: 1199 from _threading_local import local
1200 1201 1202 -def _after_fork():
1203 # This function is called by Python/ceval.c:PyEval_ReInitThreads which 1204 # is called from PyOS_AfterFork. Here we cleanup threading module state 1205 # that should not exist after a fork. 1206 1207 # Reset _active_limbo_lock, in case we forked while the lock was held 1208 # by another (non-forked) thread. http://bugs.python.org/issue874900 1209 global _active_limbo_lock 1210 _active_limbo_lock = _allocate_lock() 1211 1212 # fork() only copied the current thread; clear references to others. 1213 new_active = {} 1214 current = current_thread() 1215 with _active_limbo_lock: 1216 for thread in _enumerate(): 1217 # Any lock/condition variable may be currently locked or in an 1218 # invalid state, so we reinitialize them. 1219 if hasattr(thread, '_reset_internal_locks'): 1220 thread._reset_internal_locks() 1221 if thread is current: 1222 # There is only one active thread. We reset the ident to 1223 # its new value since it can have changed. 1224 ident = _get_ident() 1225 thread._Thread__ident = ident 1226 new_active[ident] = thread 1227 else: 1228 # All the others are already stopped. 1229 thread._Thread__stop() 1230 1231 _limbo.clear() 1232 _active.clear() 1233 _active.update(new_active) 1234 assert len(_active) == 1
1235
1236 1237 # Self-test code 1238 1239 -def _test():
1240 1241 class BoundedQueue(_Verbose): 1242 1243 def __init__(self, limit): 1244 _Verbose.__init__(self) 1245 self.mon = RLock() 1246 self.rc = Condition(self.mon) 1247 self.wc = Condition(self.mon) 1248 self.limit = limit 1249 self.queue = _deque()
1250 1251 def put(self, item): 1252 self.mon.acquire() 1253 while len(self.queue) >= self.limit: 1254 self._note("put(%s): queue full", item) 1255 self.wc.wait() 1256 self.queue.append(item) 1257 self._note("put(%s): appended, length now %d", 1258 item, len(self.queue)) 1259 self.rc.notify() 1260 self.mon.release() 1261 1262 def get(self): 1263 self.mon.acquire() 1264 while not self.queue: 1265 self._note("get(): queue empty") 1266 self.rc.wait() 1267 item = self.queue.popleft() 1268 self._note("get(): got %s, %d left", item, len(self.queue)) 1269 self.wc.notify() 1270 self.mon.release() 1271 return item 1272 1273 class ProducerThread(Thread): 1274 1275 def __init__(self, queue, quota): 1276 Thread.__init__(self, name="Producer") 1277 self.queue = queue 1278 self.quota = quota 1279 1280 def run(self): 1281 from random import random 1282 counter = 0 1283 while counter < self.quota: 1284 counter = counter + 1 1285 self.queue.put("%s.%d" % (self.name, counter)) 1286 _sleep(random() * 0.00001) 1287 1288 1289 class ConsumerThread(Thread): 1290 1291 def __init__(self, queue, count): 1292 Thread.__init__(self, name="Consumer") 1293 self.queue = queue 1294 self.count = count 1295 1296 def run(self): 1297 while self.count > 0: 1298 item = self.queue.get() 1299 print item 1300 self.count = self.count - 1 1301 1302 NP = 3 1303 QL = 4 1304 NI = 5 1305 1306 Q = BoundedQueue(QL) 1307 P = [] 1308 for i in range(NP): 1309 t = ProducerThread(Q, NI) 1310 t.name = ("Producer-%d" % (i+1)) 1311 P.append(t) 1312 C = ConsumerThread(Q, NI*NP) 1313 for t in P: 1314 t.start() 1315 _sleep(0.000001) 1316 C.start() 1317 for t in P: 1318 t.join() 1319 C.join() 1320 1321 if __name__ == '__main__': 1322 _test() 1323