Module threading
[hide private]
[frames] | no frames]

Source Code for Module threading

  1  """Thread module emulating a subset of Java's threading model.""" 
  2   
  3  import sys as _sys 
  4   
  5  try: 
  6      import thread 
  7  except ImportError: 
  8      del _sys.modules[__name__] 
  9      raise 
 10   
 11  import warnings 
 12   
 13  from time import time as _time, sleep as _sleep 
 14  from traceback import format_exc as _format_exc 
 15   
 16  # Note regarding PEP 8 compliant aliases 
 17  #  This threading model was originally inspired by Java, and inherited 
 18  # the convention of camelCase function and method names from that 
 19  # language. While those names are not in any imminent danger of being 
 20  # deprecated, starting with Python 2.6, the module now provides a 
 21  # PEP 8 compliant alias for any such method name. 
 22  # Using the new PEP 8 compliant names also facilitates substitution 
 23  # with the multiprocessing module, which doesn't provide the old 
 24  # Java inspired names. 
 25   
 26   
 27  # Rename some stuff so "from threading import *" is safe 
 28  __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 
 29             'current_thread', 'enumerate', 'Event', 
 30             'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 
 31             'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 
 32   
 33  _start_new_thread = thread.start_new_thread 
 34  _allocate_lock = thread.allocate_lock 
 35  _get_ident = thread.get_ident 
 36  ThreadError = thread.error 
 37  del thread 
 38   
 39   
 40  # sys.exc_clear is used to work around the fact that except blocks 
 41  # don't fully clear the exception until 3.0. 
 42  warnings.filterwarnings('ignore', category=DeprecationWarning, 
 43                          module='threading', message='sys.exc_clear') 
 44   
 45  # Debug support (adapted from ihooks.py). 
 46  # All the major classes here derive from _Verbose.  We force that to 
 47  # be a new-style class so that all the major classes here are new-style. 
 48  # This helps debugging (type(instance) is more revealing for instances 
 49  # of new-style classes). 
 50   
 51  _VERBOSE = False 
 52   
 53  if __debug__: 
54 55 - class _Verbose(object):
56
57 - def __init__(self, verbose=None):
58 if verbose is None: 59 verbose = _VERBOSE 60 self.__verbose = verbose
61
62 - def _note(self, format, *args):
63 if self.__verbose: 64 format = format % args 65 # Issue #4188: calling current_thread() can incur an infinite 66 # recursion if it has to create a DummyThread on the fly. 67 ident = _get_ident() 68 try: 69 name = _active[ident].name 70 except KeyError: 71 name = "<OS thread %d>" % ident 72 format = "%s: %s\n" % (name, format) 73 _sys.stderr.write(format)
74 75 else:
76 # Disable this when using "python -O" 77 - class _Verbose(object):
78 - def __init__(self, verbose=None):
79 pass
80 - def _note(self, *args):
81 pass
82 83 # Support for profile and trace hooks 84 85 _profile_hook = None 86 _trace_hook = None
87 88 -def setprofile(func):
89 global _profile_hook 90 _profile_hook = func
91
92 -def settrace(func):
93 global _trace_hook 94 _trace_hook = func
95 96 # Synchronization classes 97 98 Lock = _allocate_lock
99 100 -def RLock(*args, **kwargs):
101 return _RLock(*args, **kwargs)
102
103 -class _RLock(_Verbose):
104
105 - def __init__(self, verbose=None):
106 _Verbose.__init__(self, verbose) 107 self.__block = _allocate_lock() 108 self.__owner = None 109 self.__count = 0
110
111 - def __repr__(self):
112 owner = self.__owner 113 try: 114 owner = _active[owner].name 115 except KeyError: 116 pass 117 return "<%s owner=%r count=%d>" % ( 118 self.__class__.__name__, owner, self.__count)
119
120 - def acquire(self, blocking=1):
121 me = _get_ident() 122 if self.__owner == me: 123 self.__count = self.__count + 1 124 if __debug__: 125 self._note("%s.acquire(%s): recursive success", self, blocking) 126 return 1 127 rc = self.__block.acquire(blocking) 128 if rc: 129 self.__owner = me 130 self.__count = 1 131 if __debug__: 132 self._note("%s.acquire(%s): initial success", self, blocking) 133 else: 134 if __debug__: 135 self._note("%s.acquire(%s): failure", self, blocking) 136 return rc
137 138 __enter__ = acquire 139
140 - def release(self):
141 if self.__owner != _get_ident(): 142 raise RuntimeError("cannot release un-acquired lock") 143 self.__count = count = self.__count - 1 144 if not count: 145 self.__owner = None 146 self.__block.release() 147 if __debug__: 148 self._note("%s.release(): final release", self) 149 else: 150 if __debug__: 151 self._note("%s.release(): non-final release", self)
152
153 - def __exit__(self, t, v, tb):
154 self.release()
155 156 # Internal methods used by condition variables 157
158 - def _acquire_restore(self, count_owner):
159 count, owner = count_owner 160 self.__block.acquire() 161 self.__count = count 162 self.__owner = owner 163 if __debug__: 164 self._note("%s._acquire_restore()", self)
165
166 - def _release_save(self):
167 if __debug__: 168 self._note("%s._release_save()", self) 169 count = self.__count 170 self.__count = 0 171 owner = self.__owner 172 self.__owner = None 173 self.__block.release() 174 return (count, owner)
175
176 - def _is_owned(self):
177 return self.__owner == _get_ident()
178
179 180 -def Condition(*args, **kwargs):
181 return _Condition(*args, **kwargs)
182
183 -class _Condition(_Verbose):
184
185 - def __init__(self, lock=None, verbose=None):
186 _Verbose.__init__(self, verbose) 187 if lock is None: 188 lock = RLock() 189 self.__lock = lock 190 # Export the lock's acquire() and release() methods 191 self.acquire = lock.acquire 192 self.release = lock.release 193 # If the lock defines _release_save() and/or _acquire_restore(), 194 # these override the default implementations (which just call 195 # release() and acquire() on the lock). Ditto for _is_owned(). 196 try: 197 self._release_save = lock._release_save 198 except AttributeError: 199 pass 200 try: 201 self._acquire_restore = lock._acquire_restore 202 except AttributeError: 203 pass 204 try: 205 self._is_owned = lock._is_owned 206 except AttributeError: 207 pass 208 self.__waiters = []
209
210 - def __enter__(self):
211 return self.__lock.__enter__()
212
213 - def __exit__(self, *args):
214 return self.__lock.__exit__(*args)
215
216 - def __repr__(self):
217 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
218
219 - def _release_save(self):
220 self.__lock.release() # No state to save
221
222 - def _acquire_restore(self, x):
223 self.__lock.acquire() # Ignore saved state
224
225 - def _is_owned(self):
226 # Return True if lock is owned by current_thread. 227 # This method is called only if __lock doesn't have _is_owned(). 228 if self.__lock.acquire(0): 229 self.__lock.release() 230 return False 231 else: 232 return True
233
234 - def wait(self, timeout=None):
235 if not self._is_owned(): 236 raise RuntimeError("cannot wait on un-acquired lock") 237 waiter = _allocate_lock() 238 waiter.acquire() 239 self.__waiters.append(waiter) 240 saved_state = self._release_save() 241 try: # restore state no matter what (e.g., KeyboardInterrupt) 242 if timeout is None: 243 waiter.acquire() 244 if __debug__: 245 self._note("%s.wait(): got it", self) 246 else: 247 # Balancing act: We can't afford a pure busy loop, so we 248 # have to sleep; but if we sleep the whole timeout time, 249 # we'll be unresponsive. The scheme here sleeps very 250 # little at first, longer as time goes on, but never longer 251 # than 20 times per second (or the timeout time remaining). 252 endtime = _time() + timeout 253 delay = 0.0005 # 500 us -> initial delay of 1 ms 254 while True: 255 gotit = waiter.acquire(0) 256 if gotit: 257 break 258 remaining = endtime - _time() 259 if remaining <= 0: 260 break 261 delay = min(delay * 2, remaining, .05) 262 _sleep(delay) 263 if not gotit: 264 if __debug__: 265 self._note("%s.wait(%s): timed out", self, timeout) 266 try: 267 self.__waiters.remove(waiter) 268 except ValueError: 269 pass 270 else: 271 if __debug__: 272 self._note("%s.wait(%s): got it", self, timeout) 273 finally: 274 self._acquire_restore(saved_state)
275
276 - def notify(self, n=1):
277 if not self._is_owned(): 278 raise RuntimeError("cannot notify on un-acquired lock") 279 __waiters = self.__waiters 280 waiters = __waiters[:n] 281 if not waiters: 282 if __debug__: 283 self._note("%s.notify(): no waiters", self) 284 return 285 self._note("%s.notify(): notifying %d waiter%s", self, n, 286 n!=1 and "s" or "") 287 for waiter in waiters: 288 waiter.release() 289 try: 290 __waiters.remove(waiter) 291 except ValueError: 292 pass
293
294 - def notifyAll(self):
295 self.notify(len(self.__waiters))
296 297 notify_all = notifyAll
298
299 300 -def Semaphore(*args, **kwargs):
301 return _Semaphore(*args, **kwargs)
302
303 -class _Semaphore(_Verbose):
304 305 # After Tim Peters' semaphore class, but not quite the same (no maximum) 306
307 - def __init__(self, value=1, verbose=None):
308 if value < 0: 309 raise ValueError("semaphore initial value must be >= 0") 310 _Verbose.__init__(self, verbose) 311 self.__cond = Condition(Lock()) 312 self.__value = value
313
314 - def acquire(self, blocking=1):
315 rc = False 316 self.__cond.acquire() 317 while self.__value == 0: 318 if not blocking: 319 break 320 if __debug__: 321 self._note("%s.acquire(%s): blocked waiting, value=%s", 322 self, blocking, self.__value) 323 self.__cond.wait() 324 else: 325 self.__value = self.__value - 1 326 if __debug__: 327 self._note("%s.acquire: success, value=%s", 328 self, self.__value) 329 rc = True 330 self.__cond.release() 331 return rc
332 333 __enter__ = acquire 334
335 - def release(self):
336 self.__cond.acquire() 337 self.__value = self.__value + 1 338 if __debug__: 339 self._note("%s.release: success, value=%s", 340 self, self.__value) 341 self.__cond.notify() 342 self.__cond.release()
343
344 - def __exit__(self, t, v, tb):
345 self.release()
346
347 348 -def BoundedSemaphore(*args, **kwargs):
349 return _BoundedSemaphore(*args, **kwargs)
350
351 -class _BoundedSemaphore(_Semaphore):
352 """Semaphore that checks that # releases is <= # acquires"""
353 - def __init__(self, value=1, verbose=None):
354 _Semaphore.__init__(self, value, verbose) 355 self._initial_value = value
356
357 - def release(self):
358 if self._Semaphore__value >= self._initial_value: 359 raise ValueError, "Semaphore released too many times" 360 return _Semaphore.release(self)
361
362 363 -def Event(*args, **kwargs):
364 return _Event(*args, **kwargs)
365
366 -class _Event(_Verbose):
367 368 # After Tim Peters' event class (without is_posted()) 369
370 - def __init__(self, verbose=None):
371 _Verbose.__init__(self, verbose) 372 self.__cond = Condition(Lock()) 373 self.__flag = False
374
375 - def _reset_internal_locks(self):
376 # private! called by Thread._reset_internal_locks by _after_fork() 377 self.__cond.__init__()
378
379 - def isSet(self):
380 return self.__flag
381 382 is_set = isSet 383
384 - def set(self):
385 self.__cond.acquire() 386 try: 387 self.__flag = True 388 self.__cond.notify_all() 389 finally: 390 self.__cond.release()
391
392 - def clear(self):
393 self.__cond.acquire() 394 try: 395 self.__flag = False 396 finally: 397 self.__cond.release()
398
399 - def wait(self, timeout=None):
400 self.__cond.acquire() 401 try: 402 if not self.__flag: 403 self.__cond.wait(timeout) 404 return self.__flag 405 finally: 406 self.__cond.release()
407 408 # Helper to generate new thread names 409 _counter = 0
410 -def _newname(template="Thread-%d"):
411 global _counter 412 _counter = _counter + 1 413 return template % _counter
414 415 # Active thread administration 416 _active_limbo_lock = _allocate_lock() 417 _active = {} # maps thread id to Thread object 418 _limbo = {}
419 420 421 # Main class for threads 422 423 -class Thread(_Verbose):
424 425 __initialized = False 426 # Need to store a reference to sys.exc_info for printing 427 # out exceptions when a thread tries to use a global var. during interp. 428 # shutdown and thus raises an exception about trying to perform some 429 # operation on/with a NoneType 430 __exc_info = _sys.exc_info 431 # Keep sys.exc_clear too to clear the exception just before 432 # allowing .join() to return. 433 __exc_clear = _sys.exc_clear 434
435 - def __init__(self, group=None, target=None, name=None, 436 args=(), kwargs=None, verbose=None):
437 assert group is None, "group argument must be None for now" 438 _Verbose.__init__(self, verbose) 439 if kwargs is None: 440 kwargs = {} 441 self.__target = target 442 self.__name = str(name or _newname()) 443 self.__args = args 444 self.__kwargs = kwargs 445 self.__daemonic = self._set_daemon() 446 self.__ident = None 447 self.__started = Event() 448 self.__stopped = False 449 self.__block = Condition(Lock()) 450 self.__initialized = True 451 # sys.stderr is not stored in the class like 452 # sys.exc_info since it can be changed between instances 453 self.__stderr = _sys.stderr
454
455 - def _reset_internal_locks(self):
456 # private! Called by _after_fork() to reset our internal locks as 457 # they may be in an invalid state leading to a deadlock or crash. 458 if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block 459 self.__block.__init__() 460 self.__started._reset_internal_locks()
461 462 @property
463 - def _block(self):
464 # used by a unittest 465 return self.__block
466
467 - def _set_daemon(self):
468 # Overridden in _MainThread and _DummyThread 469 return current_thread().daemon
470
471 - def __repr__(self):
472 assert self.__initialized, "Thread.__init__() was not called" 473 status = "initial" 474 if self.__started.is_set(): 475 status = "started" 476 if self.__stopped: 477 status = "stopped" 478 if self.__daemonic: 479 status += " daemon" 480 if self.__ident is not None: 481 status += " %s" % self.__ident 482 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
483
484 - def start(self):
485 if not self.__initialized: 486 raise RuntimeError("thread.__init__() not called") 487 if self.__started.is_set(): 488 raise RuntimeError("threads can only be started once") 489 if __debug__: 490 self._note("%s.start(): starting thread", self) 491 with _active_limbo_lock: 492 _limbo[self] = self 493 try: 494 _start_new_thread(self.__bootstrap, ()) 495 except Exception: 496 with _active_limbo_lock: 497 del _limbo[self] 498 raise 499 self.__started.wait()
500
501 - def run(self):
502 try: 503 if self.__target: 504 self.__target(*self.__args, **self.__kwargs) 505 finally: 506 # Avoid a refcycle if the thread is running a function with 507 # an argument that has a member that points to the thread. 508 del self.__target, self.__args, self.__kwargs
509
510 - def __bootstrap(self):
511 # Wrapper around the real bootstrap code that ignores 512 # exceptions during interpreter cleanup. Those typically 513 # happen when a daemon thread wakes up at an unfortunate 514 # moment, finds the world around it destroyed, and raises some 515 # random exception *** while trying to report the exception in 516 # __bootstrap_inner() below ***. Those random exceptions 517 # don't help anybody, and they confuse users, so we suppress 518 # them. We suppress them only when it appears that the world 519 # indeed has already been destroyed, so that exceptions in 520 # __bootstrap_inner() during normal business hours are properly 521 # reported. Also, we only suppress them for daemonic threads; 522 # if a non-daemonic encounters this, something else is wrong. 523 try: 524 self.__bootstrap_inner() 525 except: 526 if self.__daemonic and _sys is None: 527 return 528 raise
529
530 - def _set_ident(self):
531 self.__ident = _get_ident()
532
533 - def __bootstrap_inner(self):
534 try: 535 self._set_ident() 536 self.__started.set() 537 with _active_limbo_lock: 538 _active[self.__ident] = self 539 del _limbo[self] 540 if __debug__: 541 self._note("%s.__bootstrap(): thread started", self) 542 543 if _trace_hook: 544 self._note("%s.__bootstrap(): registering trace hook", self) 545 _sys.settrace(_trace_hook) 546 if _profile_hook: 547 self._note("%s.__bootstrap(): registering profile hook", self) 548 _sys.setprofile(_profile_hook) 549 550 try: 551 self.run() 552 except SystemExit: 553 if __debug__: 554 self._note("%s.__bootstrap(): raised SystemExit", self) 555 except: 556 if __debug__: 557 self._note("%s.__bootstrap(): unhandled exception", self) 558 # If sys.stderr is no more (most likely from interpreter 559 # shutdown) use self.__stderr. Otherwise still use sys (as in 560 # _sys) in case sys.stderr was redefined since the creation of 561 # self. 562 if _sys: 563 _sys.stderr.write("Exception in thread %s:\n%s\n" % 564 (self.name, _format_exc())) 565 else: 566 # Do the best job possible w/o a huge amt. of code to 567 # approximate a traceback (code ideas from 568 # Lib/traceback.py) 569 exc_type, exc_value, exc_tb = self.__exc_info() 570 try: 571 print>>self.__stderr, ( 572 "Exception in thread " + self.name + 573 " (most likely raised during interpreter shutdown):") 574 print>>self.__stderr, ( 575 "Traceback (most recent call last):") 576 while exc_tb: 577 print>>self.__stderr, ( 578 ' File "%s", line %s, in %s' % 579 (exc_tb.tb_frame.f_code.co_filename, 580 exc_tb.tb_lineno, 581 exc_tb.tb_frame.f_code.co_name)) 582 exc_tb = exc_tb.tb_next 583 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 584 # Make sure that exc_tb gets deleted since it is a memory 585 # hog; deleting everything else is just for thoroughness 586 finally: 587 del exc_type, exc_value, exc_tb 588 else: 589 if __debug__: 590 self._note("%s.__bootstrap(): normal return", self) 591 finally: 592 # Prevent a race in 593 # test_threading.test_no_refcycle_through_target when 594 # the exception keeps the target alive past when we 595 # assert that it's dead. 596 self.__exc_clear() 597 finally: 598 with _active_limbo_lock: 599 self.__stop() 600 try: 601 # We don't call self.__delete() because it also 602 # grabs _active_limbo_lock. 603 del _active[_get_ident()] 604 except: 605 pass
606
607 - def __stop(self):
608 self.__block.acquire() 609 self.__stopped = True 610 self.__block.notify_all() 611 self.__block.release()
612
613 - def __delete(self):
614 "Remove current thread from the dict of currently running threads." 615 616 # Notes about running with dummy_thread: 617 # 618 # Must take care to not raise an exception if dummy_thread is being 619 # used (and thus this module is being used as an instance of 620 # dummy_threading). dummy_thread.get_ident() always returns -1 since 621 # there is only one thread if dummy_thread is being used. Thus 622 # len(_active) is always <= 1 here, and any Thread instance created 623 # overwrites the (if any) thread currently registered in _active. 624 # 625 # An instance of _MainThread is always created by 'threading'. This 626 # gets overwritten the instant an instance of Thread is created; both 627 # threads return -1 from dummy_thread.get_ident() and thus have the 628 # same key in the dict. So when the _MainThread instance created by 629 # 'threading' tries to clean itself up when atexit calls this method 630 # it gets a KeyError if another Thread instance was created. 631 # 632 # This all means that KeyError from trying to delete something from 633 # _active if dummy_threading is being used is a red herring. But 634 # since it isn't if dummy_threading is *not* being used then don't 635 # hide the exception. 636 637 try: 638 with _active_limbo_lock: 639 del _active[_get_ident()] 640 # There must not be any python code between the previous line 641 # and after the lock is released. Otherwise a tracing function 642 # could try to acquire the lock again in the same thread, (in 643 # current_thread()), and would block. 644 except KeyError: 645 if 'dummy_threading' not in _sys.modules: 646 raise
647
648 - def join(self, timeout=None):
649 if not self.__initialized: 650 raise RuntimeError("Thread.__init__() not called") 651 if not self.__started.is_set(): 652 raise RuntimeError("cannot join thread before it is started") 653 if self is current_thread(): 654 raise RuntimeError("cannot join current thread") 655 656 if __debug__: 657 if not self.__stopped: 658 self._note("%s.join(): waiting until thread stops", self) 659 self.__block.acquire() 660 try: 661 if timeout is None: 662 while not self.__stopped: 663 self.__block.wait() 664 if __debug__: 665 self._note("%s.join(): thread stopped", self) 666 else: 667 deadline = _time() + timeout 668 while not self.__stopped: 669 delay = deadline - _time() 670 if delay <= 0: 671 if __debug__: 672 self._note("%s.join(): timed out", self) 673 break 674 self.__block.wait(delay) 675 else: 676 if __debug__: 677 self._note("%s.join(): thread stopped", self) 678 finally: 679 self.__block.release()
680 681 @property
682 - def name(self):
683 assert self.__initialized, "Thread.__init__() not called" 684 return self.__name
685 686 @name.setter
687 - def name(self, name):
688 assert self.__initialized, "Thread.__init__() not called" 689 self.__name = str(name)
690 691 @property
692 - def ident(self):
693 assert self.__initialized, "Thread.__init__() not called" 694 return self.__ident
695
696 - def isAlive(self):
697 assert self.__initialized, "Thread.__init__() not called" 698 return self.__started.is_set() and not self.__stopped
699 700 is_alive = isAlive 701 702 @property
703 - def daemon(self):
704 assert self.__initialized, "Thread.__init__() not called" 705 return self.__daemonic
706 707 @daemon.setter
708 - def daemon(self, daemonic):
709 if not self.__initialized: 710 raise RuntimeError("Thread.__init__() not called") 711 if self.__started.is_set(): 712 raise RuntimeError("cannot set daemon status of active thread"); 713 self.__daemonic = daemonic
714
715 - def isDaemon(self):
716 return self.daemon
717
718 - def setDaemon(self, daemonic):
719 self.daemon = daemonic
720
721 - def getName(self):
722 return self.name
723
724 - def setName(self, name):
725 self.name = name
726
727 # The timer class was contributed by Itamar Shtull-Trauring 728 729 -def Timer(*args, **kwargs):
730 return _Timer(*args, **kwargs)
731
732 -class _Timer(Thread):
733 """Call a function after a specified number of seconds: 734 735 t = Timer(30.0, f, args=[], kwargs={}) 736 t.start() 737 t.cancel() # stop the timer's action if it's still waiting 738 """ 739
740 - def __init__(self, interval, function, args=[], kwargs={}):
741 Thread.__init__(self) 742 self.interval = interval 743 self.function = function 744 self.args = args 745 self.kwargs = kwargs 746 self.finished = Event()
747
748 - def cancel(self):
749 """Stop the timer if it hasn't finished yet""" 750 self.finished.set()
751
752 - def run(self):
753 self.finished.wait(self.interval) 754 if not self.finished.is_set(): 755 self.function(*self.args, **self.kwargs) 756 self.finished.set()
757
758 # Special thread class to represent the main thread 759 # This is garbage collected through an exit handler 760 761 -class _MainThread(Thread):
762
763 - def __init__(self):
764 Thread.__init__(self, name="MainThread") 765 self._Thread__started.set() 766 self._set_ident() 767 with _active_limbo_lock: 768 _active[_get_ident()] = self
769
770 - def _set_daemon(self):
771 return False
772
773 - def _exitfunc(self):
774 self._Thread__stop() 775 t = _pickSomeNonDaemonThread() 776 if t: 777 if __debug__: 778 self._note("%s: waiting for other threads", self) 779 while t: 780 t.join() 781 t = _pickSomeNonDaemonThread() 782 if __debug__: 783 self._note("%s: exiting", self) 784 self._Thread__delete()
785
786 -def _pickSomeNonDaemonThread():
787 for t in enumerate(): 788 if not t.daemon and t.is_alive(): 789 return t 790 return None
791
792 793 # Dummy thread class to represent threads not started here. 794 # These aren't garbage collected when they die, nor can they be waited for. 795 # If they invoke anything in threading.py that calls current_thread(), they 796 # leave an entry in the _active dict forever after. 797 # Their purpose is to return *something* from current_thread(). 798 # They are marked as daemon threads so we won't wait for them 799 # when we exit (conform previous semantics). 800 801 -class _DummyThread(Thread):
802
803 - def __init__(self):
804 Thread.__init__(self, name=_newname("Dummy-%d")) 805 806 # Thread.__block consumes an OS-level locking primitive, which 807 # can never be used by a _DummyThread. Since a _DummyThread 808 # instance is immortal, that's bad, so release this resource. 809 del self._Thread__block 810 811 self._Thread__started.set() 812 self._set_ident() 813 with _active_limbo_lock: 814 _active[_get_ident()] = self
815
816 - def _set_daemon(self):
817 return True
818
819 - def join(self, timeout=None):
820 assert False, "cannot join a dummy thread"
821
822 823 # Global API functions 824 825 -def currentThread():
826 try: 827 return _active[_get_ident()] 828 except KeyError: 829 ##print "current_thread(): no current thread for", _get_ident() 830 return _DummyThread()
831 832 current_thread = currentThread
833 834 -def activeCount():
835 with _active_limbo_lock: 836 return len(_active) + len(_limbo)
837 838 active_count = activeCount
839 840 -def _enumerate():
841 # Same as enumerate(), but without the lock. Internal use only. 842 return _active.values() + _limbo.values()
843
844 -def enumerate():
845 with _active_limbo_lock: 846 return _active.values() + _limbo.values()
847 848 from thread import stack_size 849 850 # Create the main thread object, 851 # and make it available for the interpreter 852 # (Py_Main) as threading._shutdown. 853 854 _shutdown = _MainThread()._exitfunc 855 856 # get thread-local implementation, either from the thread 857 # module, or from the python fallback 858 859 try: 860 from thread import _local as local 861 except ImportError: 862 from _threading_local import local
863 864 865 -def _after_fork():
866 # This function is called by Python/ceval.c:PyEval_ReInitThreads which 867 # is called from PyOS_AfterFork. Here we cleanup threading module state 868 # that should not exist after a fork. 869 870 # Reset _active_limbo_lock, in case we forked while the lock was held 871 # by another (non-forked) thread. http://bugs.python.org/issue874900 872 global _active_limbo_lock 873 _active_limbo_lock = _allocate_lock() 874 875 # fork() only copied the current thread; clear references to others. 876 new_active = {} 877 current = current_thread() 878 with _active_limbo_lock: 879 for thread in _active.itervalues(): 880 # Any lock/condition variable may be currently locked or in an 881 # invalid state, so we reinitialize them. 882 if hasattr(thread, '_reset_internal_locks'): 883 thread._reset_internal_locks() 884 if thread is current: 885 # There is only one active thread. We reset the ident to 886 # its new value since it can have changed. 887 ident = _get_ident() 888 thread._Thread__ident = ident 889 new_active[ident] = thread 890 else: 891 # All the others are already stopped. 892 thread._Thread__stop() 893 894 _limbo.clear() 895 _active.clear() 896 _active.update(new_active) 897 assert len(_active) == 1
898
899 900 # Self-test code 901 902 -def _test():
903 904 class BoundedQueue(_Verbose): 905 906 def __init__(self, limit): 907 _Verbose.__init__(self) 908 self.mon = RLock() 909 self.rc = Condition(self.mon) 910 self.wc = Condition(self.mon) 911 self.limit = limit 912 self.queue = deque()
913 914 def put(self, item): 915 self.mon.acquire() 916 while len(self.queue) >= self.limit: 917 self._note("put(%s): queue full", item) 918 self.wc.wait() 919 self.queue.append(item) 920 self._note("put(%s): appended, length now %d", 921 item, len(self.queue)) 922 self.rc.notify() 923 self.mon.release() 924 925 def get(self): 926 self.mon.acquire() 927 while not self.queue: 928 self._note("get(): queue empty") 929 self.rc.wait() 930 item = self.queue.popleft() 931 self._note("get(): got %s, %d left", item, len(self.queue)) 932 self.wc.notify() 933 self.mon.release() 934 return item 935 936 class ProducerThread(Thread): 937 938 def __init__(self, queue, quota): 939 Thread.__init__(self, name="Producer") 940 self.queue = queue 941 self.quota = quota 942 943 def run(self): 944 from random import random 945 counter = 0 946 while counter < self.quota: 947 counter = counter + 1 948 self.queue.put("%s.%d" % (self.name, counter)) 949 _sleep(random() * 0.00001) 950 951 952 class ConsumerThread(Thread): 953 954 def __init__(self, queue, count): 955 Thread.__init__(self, name="Consumer") 956 self.queue = queue 957 self.count = count 958 959 def run(self): 960 while self.count > 0: 961 item = self.queue.get() 962 print item 963 self.count = self.count - 1 964 965 NP = 3 966 QL = 4 967 NI = 5 968 969 Q = BoundedQueue(QL) 970 P = [] 971 for i in range(NP): 972 t = ProducerThread(Q, NI) 973 t.name = ("Producer-%d" % (i+1)) 974 P.append(t) 975 C = ConsumerThread(Q, NI*NP) 976 for t in P: 977 t.start() 978 _sleep(0.000001) 979 C.start() 980 for t in P: 981 t.join() 982 C.join() 983 984 if __name__ == '__main__': 985 _test() 986