1 """Thread module emulating a subset of Java's threading model."""
2
3 import sys as _sys
4
5 try:
6 import thread
7 except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11 import warnings
12
13 from collections import deque as _deque
14 from itertools import count as _count
15 from time import time as _time, sleep as _sleep
16 from traceback import format_exc as _format_exc
17
18
19
20
21
22
23
24
25
26
27
28
29
30 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
31 'current_thread', 'enumerate', 'Event',
32 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34
35 _start_new_thread = thread.start_new_thread
36 _allocate_lock = thread.allocate_lock
37 _get_ident = thread.get_ident
38 ThreadError = thread.error
39 del thread
40
41
42
43
44 warnings.filterwarnings('ignore', category=DeprecationWarning,
45 module='threading', message='sys.exc_clear')
46
47
48
49
50
51
52
53 _VERBOSE = False
54
55 if __debug__:
58
60 if verbose is None:
61 verbose = _VERBOSE
62 self.__verbose = verbose
63
64 - def _note(self, format, *args):
76
77 else:
84
85
86
87 _profile_hook = None
88 _trace_hook = None
91 """Set a profile function for all threads started from the threading module.
92
93 The func will be passed to sys.setprofile() for each thread, before its
94 run() method is called.
95
96 """
97 global _profile_hook
98 _profile_hook = func
99
101 """Set a trace function for all threads started from the threading module.
102
103 The func will be passed to sys.settrace() for each thread, before its run()
104 method is called.
105
106 """
107 global _trace_hook
108 _trace_hook = func
109
110
111
112 Lock = _allocate_lock
113
114 -def RLock(*args, **kwargs):
115 """Factory function that returns a new reentrant lock.
116
117 A reentrant lock must be released by the thread that acquired it. Once a
118 thread has acquired a reentrant lock, the same thread may acquire it again
119 without blocking; the thread must release it once for each time it has
120 acquired it.
121
122 """
123 return _RLock(*args, **kwargs)
124
126 """A reentrant lock must be released by the thread that acquired it. Once a
127 thread has acquired a reentrant lock, the same thread may acquire it
128 again without blocking; the thread must release it once for each time it
129 has acquired it.
130 """
131
133 _Verbose.__init__(self, verbose)
134 self.__block = _allocate_lock()
135 self.__owner = None
136 self.__count = 0
137
139 owner = self.__owner
140 try:
141 owner = _active[owner].name
142 except KeyError:
143 pass
144 return "<%s owner=%r count=%d>" % (
145 self.__class__.__name__, owner, self.__count)
146
148 """Acquire a lock, blocking or non-blocking.
149
150 When invoked without arguments: if this thread already owns the lock,
151 increment the recursion level by one, and return immediately. Otherwise,
152 if another thread owns the lock, block until the lock is unlocked. Once
153 the lock is unlocked (not owned by any thread), then grab ownership, set
154 the recursion level to one, and return. If more than one thread is
155 blocked waiting until the lock is unlocked, only one at a time will be
156 able to grab ownership of the lock. There is no return value in this
157 case.
158
159 When invoked with the blocking argument set to true, do the same thing
160 as when called without arguments, and return true.
161
162 When invoked with the blocking argument set to false, do not block. If a
163 call without an argument would block, return false immediately;
164 otherwise, do the same thing as when called without arguments, and
165 return true.
166
167 """
168 me = _get_ident()
169 if self.__owner == me:
170 self.__count = self.__count + 1
171 if __debug__:
172 self._note("%s.acquire(%s): recursive success", self, blocking)
173 return 1
174 rc = self.__block.acquire(blocking)
175 if rc:
176 self.__owner = me
177 self.__count = 1
178 if __debug__:
179 self._note("%s.acquire(%s): initial success", self, blocking)
180 else:
181 if __debug__:
182 self._note("%s.acquire(%s): failure", self, blocking)
183 return rc
184
185 __enter__ = acquire
186
188 """Release a lock, decrementing the recursion level.
189
190 If after the decrement it is zero, reset the lock to unlocked (not owned
191 by any thread), and if any other threads are blocked waiting for the
192 lock to become unlocked, allow exactly one of them to proceed. If after
193 the decrement the recursion level is still nonzero, the lock remains
194 locked and owned by the calling thread.
195
196 Only call this method when the calling thread owns the lock. A
197 RuntimeError is raised if this method is called when the lock is
198 unlocked.
199
200 There is no return value.
201
202 """
203 if self.__owner != _get_ident():
204 raise RuntimeError("cannot release un-acquired lock")
205 self.__count = count = self.__count - 1
206 if not count:
207 self.__owner = None
208 self.__block.release()
209 if __debug__:
210 self._note("%s.release(): final release", self)
211 else:
212 if __debug__:
213 self._note("%s.release(): non-final release", self)
214
217
218
219
221 count, owner = count_owner
222 self.__block.acquire()
223 self.__count = count
224 self.__owner = owner
225 if __debug__:
226 self._note("%s._acquire_restore()", self)
227
229 if __debug__:
230 self._note("%s._release_save()", self)
231 count = self.__count
232 self.__count = 0
233 owner = self.__owner
234 self.__owner = None
235 self.__block.release()
236 return (count, owner)
237
239 return self.__owner == _get_ident()
240
243 """Factory function that returns a new condition variable object.
244
245 A condition variable allows one or more threads to wait until they are
246 notified by another thread.
247
248 If the lock argument is given and not None, it must be a Lock or RLock
249 object, and it is used as the underlying lock. Otherwise, a new RLock object
250 is created and used as the underlying lock.
251
252 """
253 return _Condition(*args, **kwargs)
254
256 """Condition variables allow one or more threads to wait until they are
257 notified by another thread.
258 """
259
260 - def __init__(self, lock=None, verbose=None):
261 _Verbose.__init__(self, verbose)
262 if lock is None:
263 lock = RLock()
264 self.__lock = lock
265
266 self.acquire = lock.acquire
267 self.release = lock.release
268
269
270
271 try:
272 self._release_save = lock._release_save
273 except AttributeError:
274 pass
275 try:
276 self._acquire_restore = lock._acquire_restore
277 except AttributeError:
278 pass
279 try:
280 self._is_owned = lock._is_owned
281 except AttributeError:
282 pass
283 self.__waiters = []
284
287
290
292 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
293
295 self.__lock.release()
296
298 self.__lock.acquire()
299
301
302
303 if self.__lock.acquire(0):
304 self.__lock.release()
305 return False
306 else:
307 return True
308
309 - def wait(self, timeout=None):
310 """Wait until notified or until a timeout occurs.
311
312 If the calling thread has not acquired the lock when this method is
313 called, a RuntimeError is raised.
314
315 This method releases the underlying lock, and then blocks until it is
316 awakened by a notify() or notifyAll() call for the same condition
317 variable in another thread, or until the optional timeout occurs. Once
318 awakened or timed out, it re-acquires the lock and returns.
319
320 When the timeout argument is present and not None, it should be a
321 floating point number specifying a timeout for the operation in seconds
322 (or fractions thereof).
323
324 When the underlying lock is an RLock, it is not released using its
325 release() method, since this may not actually unlock the lock when it
326 was acquired multiple times recursively. Instead, an internal interface
327 of the RLock class is used, which really unlocks it even when it has
328 been recursively acquired several times. Another internal interface is
329 then used to restore the recursion level when the lock is reacquired.
330
331 """
332 if not self._is_owned():
333 raise RuntimeError("cannot wait on un-acquired lock")
334 waiter = _allocate_lock()
335 waiter.acquire()
336 self.__waiters.append(waiter)
337 saved_state = self._release_save()
338 try:
339 if timeout is None:
340 waiter.acquire()
341 if __debug__:
342 self._note("%s.wait(): got it", self)
343 else:
344
345
346
347
348
349 endtime = _time() + timeout
350 delay = 0.0005
351 while True:
352 gotit = waiter.acquire(0)
353 if gotit:
354 break
355 remaining = endtime - _time()
356 if remaining <= 0:
357 break
358 delay = min(delay * 2, remaining, .05)
359 _sleep(delay)
360 if not gotit:
361 if __debug__:
362 self._note("%s.wait(%s): timed out", self, timeout)
363 try:
364 self.__waiters.remove(waiter)
365 except ValueError:
366 pass
367 else:
368 if __debug__:
369 self._note("%s.wait(%s): got it", self, timeout)
370 finally:
371 self._acquire_restore(saved_state)
372
374 """Wake up one or more threads waiting on this condition, if any.
375
376 If the calling thread has not acquired the lock when this method is
377 called, a RuntimeError is raised.
378
379 This method wakes up at most n of the threads waiting for the condition
380 variable; it is a no-op if no threads are waiting.
381
382 """
383 if not self._is_owned():
384 raise RuntimeError("cannot notify on un-acquired lock")
385 __waiters = self.__waiters
386 waiters = __waiters[:n]
387 if not waiters:
388 if __debug__:
389 self._note("%s.notify(): no waiters", self)
390 return
391 self._note("%s.notify(): notifying %d waiter%s", self, n,
392 n!=1 and "s" or "")
393 for waiter in waiters:
394 waiter.release()
395 try:
396 __waiters.remove(waiter)
397 except ValueError:
398 pass
399
401 """Wake up all threads waiting on this condition.
402
403 If the calling thread has not acquired the lock when this method
404 is called, a RuntimeError is raised.
405
406 """
407 self.notify(len(self.__waiters))
408
409 notify_all = notifyAll
410
413 """A factory function that returns a new semaphore.
414
415 Semaphores manage a counter representing the number of release() calls minus
416 the number of acquire() calls, plus an initial value. The acquire() method
417 blocks if necessary until it can return without making the counter
418 negative. If not given, value defaults to 1.
419
420 """
421 return _Semaphore(*args, **kwargs)
422
424 """Semaphores manage a counter representing the number of release() calls
425 minus the number of acquire() calls, plus an initial value. The acquire()
426 method blocks if necessary until it can return without making the counter
427 negative. If not given, value defaults to 1.
428
429 """
430
431
432
433 - def __init__(self, value=1, verbose=None):
434 if value < 0:
435 raise ValueError("semaphore initial value must be >= 0")
436 _Verbose.__init__(self, verbose)
437 self.__cond = Condition(Lock())
438 self.__value = value
439
441 """Acquire a semaphore, decrementing the internal counter by one.
442
443 When invoked without arguments: if the internal counter is larger than
444 zero on entry, decrement it by one and return immediately. If it is zero
445 on entry, block, waiting until some other thread has called release() to
446 make it larger than zero. This is done with proper interlocking so that
447 if multiple acquire() calls are blocked, release() will wake exactly one
448 of them up. The implementation may pick one at random, so the order in
449 which blocked threads are awakened should not be relied on. There is no
450 return value in this case.
451
452 When invoked with blocking set to true, do the same thing as when called
453 without arguments, and return true.
454
455 When invoked with blocking set to false, do not block. If a call without
456 an argument would block, return false immediately; otherwise, do the
457 same thing as when called without arguments, and return true.
458
459 """
460 rc = False
461 with self.__cond:
462 while self.__value == 0:
463 if not blocking:
464 break
465 if __debug__:
466 self._note("%s.acquire(%s): blocked waiting, value=%s",
467 self, blocking, self.__value)
468 self.__cond.wait()
469 else:
470 self.__value = self.__value - 1
471 if __debug__:
472 self._note("%s.acquire: success, value=%s",
473 self, self.__value)
474 rc = True
475 return rc
476
477 __enter__ = acquire
478
480 """Release a semaphore, incrementing the internal counter by one.
481
482 When the counter is zero on entry and another thread is waiting for it
483 to become larger than zero again, wake up that thread.
484
485 """
486 with self.__cond:
487 self.__value = self.__value + 1
488 if __debug__:
489 self._note("%s.release: success, value=%s",
490 self, self.__value)
491 self.__cond.notify()
492
495
498 """A factory function that returns a new bounded semaphore.
499
500 A bounded semaphore checks to make sure its current value doesn't exceed its
501 initial value. If it does, ValueError is raised. In most situations
502 semaphores are used to guard resources with limited capacity.
503
504 If the semaphore is released too many times it's a sign of a bug. If not
505 given, value defaults to 1.
506
507 Like regular semaphores, bounded semaphores manage a counter representing
508 the number of release() calls minus the number of acquire() calls, plus an
509 initial value. The acquire() method blocks if necessary until it can return
510 without making the counter negative. If not given, value defaults to 1.
511
512 """
513 return _BoundedSemaphore(*args, **kwargs)
514
516 """A bounded semaphore checks to make sure its current value doesn't exceed
517 its initial value. If it does, ValueError is raised. In most situations
518 semaphores are used to guard resources with limited capacity.
519 """
520
521 - def __init__(self, value=1, verbose=None):
524
526 """Release a semaphore, incrementing the internal counter by one.
527
528 When the counter is zero on entry and another thread is waiting for it
529 to become larger than zero again, wake up that thread.
530
531 If the number of releases exceeds the number of acquires,
532 raise a ValueError.
533
534 """
535 with self._Semaphore__cond:
536 if self._Semaphore__value >= self._initial_value:
537 raise ValueError("Semaphore released too many times")
538 self._Semaphore__value += 1
539 self._Semaphore__cond.notify()
540
541
542 -def Event(*args, **kwargs):
543 """A factory function that returns a new event.
544
545 Events manage a flag that can be set to true with the set() method and reset
546 to false with the clear() method. The wait() method blocks until the flag is
547 true.
548
549 """
550 return _Event(*args, **kwargs)
551
553 """A factory function that returns a new event object. An event manages a
554 flag that can be set to true with the set() method and reset to false
555 with the clear() method. The wait() method blocks until the flag is true.
556
557 """
558
559
560
562 _Verbose.__init__(self, verbose)
563 self.__cond = Condition(Lock())
564 self.__flag = False
565
569
571 'Return true if and only if the internal flag is true.'
572 return self.__flag
573
574 is_set = isSet
575
577 """Set the internal flag to true.
578
579 All threads waiting for the flag to become true are awakened. Threads
580 that call wait() once the flag is true will not block at all.
581
582 """
583 with self.__cond:
584 self.__flag = True
585 self.__cond.notify_all()
586
588 """Reset the internal flag to false.
589
590 Subsequently, threads calling wait() will block until set() is called to
591 set the internal flag to true again.
592
593 """
594 with self.__cond:
595 self.__flag = False
596
597 - def wait(self, timeout=None):
598 """Block until the internal flag is true.
599
600 If the internal flag is true on entry, return immediately. Otherwise,
601 block until another thread calls set() to set the flag to true, or until
602 the optional timeout occurs.
603
604 When the timeout argument is present and not None, it should be a
605 floating point number specifying a timeout for the operation in seconds
606 (or fractions thereof).
607
608 This method returns the internal flag on exit, so it will always return
609 True except if a timeout is given and the operation times out.
610
611 """
612 with self.__cond:
613 if not self.__flag:
614 self.__cond.wait(timeout)
615 return self.__flag
616
617
618 _counter = _count().next
619 _counter()
622
623
624 _active_limbo_lock = _allocate_lock()
625 _active = {}
626 _limbo = {}
627
628
629
630
631 -class Thread(_Verbose):
632 """A class that represents a thread of control.
633
634 This class can be safely subclassed in a limited fashion.
635
636 """
637 __initialized = False
638
639
640
641
642 __exc_info = _sys.exc_info
643
644
645 __exc_clear = _sys.exc_clear
646
647 - def __init__(self, group=None, target=None, name=None,
648 args=(), kwargs=None, verbose=None):
649 """This constructor should always be called with keyword arguments. Arguments are:
650
651 *group* should be None; reserved for future extension when a ThreadGroup
652 class is implemented.
653
654 *target* is the callable object to be invoked by the run()
655 method. Defaults to None, meaning nothing is called.
656
657 *name* is the thread name. By default, a unique name is constructed of
658 the form "Thread-N" where N is a small decimal number.
659
660 *args* is the argument tuple for the target invocation. Defaults to ().
661
662 *kwargs* is a dictionary of keyword arguments for the target
663 invocation. Defaults to {}.
664
665 If a subclass overrides the constructor, it must make sure to invoke
666 the base class constructor (Thread.__init__()) before doing anything
667 else to the thread.
668
669 """
670 assert group is None, "group argument must be None for now"
671 _Verbose.__init__(self, verbose)
672 if kwargs is None:
673 kwargs = {}
674 self.__target = target
675 self.__name = str(name or _newname())
676 self.__args = args
677 self.__kwargs = kwargs
678 self.__daemonic = self._set_daemon()
679 self.__ident = None
680 self.__started = Event()
681 self.__stopped = False
682 self.__block = Condition(Lock())
683 self.__initialized = True
684
685
686 self.__stderr = _sys.stderr
687
689
690
691 if hasattr(self, '_Thread__block'):
692 self.__block.__init__()
693 self.__started._reset_internal_locks()
694
695 @property
697
698 return self.__block
699
701
702 return current_thread().daemon
703
705 assert self.__initialized, "Thread.__init__() was not called"
706 status = "initial"
707 if self.__started.is_set():
708 status = "started"
709 if self.__stopped:
710 status = "stopped"
711 if self.__daemonic:
712 status += " daemon"
713 if self.__ident is not None:
714 status += " %s" % self.__ident
715 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
716
718 """Start the thread's activity.
719
720 It must be called at most once per thread object. It arranges for the
721 object's run() method to be invoked in a separate thread of control.
722
723 This method will raise a RuntimeError if called more than once on the
724 same thread object.
725
726 """
727 if not self.__initialized:
728 raise RuntimeError("thread.__init__() not called")
729 if self.__started.is_set():
730 raise RuntimeError("threads can only be started once")
731 if __debug__:
732 self._note("%s.start(): starting thread", self)
733 with _active_limbo_lock:
734 _limbo[self] = self
735 try:
736 _start_new_thread(self.__bootstrap, ())
737 except Exception:
738 with _active_limbo_lock:
739 del _limbo[self]
740 raise
741 self.__started.wait()
742
744 """Method representing the thread's activity.
745
746 You may override this method in a subclass. The standard run() method
747 invokes the callable object passed to the object's constructor as the
748 target argument, if any, with sequential and keyword arguments taken
749 from the args and kwargs arguments, respectively.
750
751 """
752 try:
753 if self.__target:
754 self.__target(*self.__args, **self.__kwargs)
755 finally:
756
757
758 del self.__target, self.__args, self.__kwargs
759
761
762
763
764
765
766
767
768
769
770
771
772
773 try:
774 self.__bootstrap_inner()
775 except:
776 if self.__daemonic and _sys is None:
777 return
778 raise
779
781 self.__ident = _get_ident()
782
784 try:
785 self._set_ident()
786 self.__started.set()
787 with _active_limbo_lock:
788 _active[self.__ident] = self
789 del _limbo[self]
790 if __debug__:
791 self._note("%s.__bootstrap(): thread started", self)
792
793 if _trace_hook:
794 self._note("%s.__bootstrap(): registering trace hook", self)
795 _sys.settrace(_trace_hook)
796 if _profile_hook:
797 self._note("%s.__bootstrap(): registering profile hook", self)
798 _sys.setprofile(_profile_hook)
799
800 try:
801 self.run()
802 except SystemExit:
803 if __debug__:
804 self._note("%s.__bootstrap(): raised SystemExit", self)
805 except:
806 if __debug__:
807 self._note("%s.__bootstrap(): unhandled exception", self)
808
809
810
811
812 if _sys and _sys.stderr is not None:
813 print>>_sys.stderr, ("Exception in thread %s:\n%s" %
814 (self.name, _format_exc()))
815 elif self.__stderr is not None:
816
817
818
819 exc_type, exc_value, exc_tb = self.__exc_info()
820 try:
821 print>>self.__stderr, (
822 "Exception in thread " + self.name +
823 " (most likely raised during interpreter shutdown):")
824 print>>self.__stderr, (
825 "Traceback (most recent call last):")
826 while exc_tb:
827 print>>self.__stderr, (
828 ' File "%s", line %s, in %s' %
829 (exc_tb.tb_frame.f_code.co_filename,
830 exc_tb.tb_lineno,
831 exc_tb.tb_frame.f_code.co_name))
832 exc_tb = exc_tb.tb_next
833 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
834
835
836 finally:
837 del exc_type, exc_value, exc_tb
838 else:
839 if __debug__:
840 self._note("%s.__bootstrap(): normal return", self)
841 finally:
842
843
844
845
846 self.__exc_clear()
847 finally:
848 with _active_limbo_lock:
849 self.__stop()
850 try:
851
852
853 del _active[_get_ident()]
854 except:
855 pass
856
858
859
860 if not hasattr(self, '_Thread__block'):
861 return
862 self.__block.acquire()
863 self.__stopped = True
864 self.__block.notify_all()
865 self.__block.release()
866
868 "Remove current thread from the dict of currently running threads."
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891 try:
892 with _active_limbo_lock:
893 del _active[_get_ident()]
894
895
896
897
898 except KeyError:
899 if 'dummy_threading' not in _sys.modules:
900 raise
901
902 - def join(self, timeout=None):
903 """Wait until the thread terminates.
904
905 This blocks the calling thread until the thread whose join() method is
906 called terminates -- either normally or through an unhandled exception
907 or until the optional timeout occurs.
908
909 When the timeout argument is present and not None, it should be a
910 floating point number specifying a timeout for the operation in seconds
911 (or fractions thereof). As join() always returns None, you must call
912 isAlive() after join() to decide whether a timeout happened -- if the
913 thread is still alive, the join() call timed out.
914
915 When the timeout argument is not present or None, the operation will
916 block until the thread terminates.
917
918 A thread can be join()ed many times.
919
920 join() raises a RuntimeError if an attempt is made to join the current
921 thread as that would cause a deadlock. It is also an error to join() a
922 thread before it has been started and attempts to do so raises the same
923 exception.
924
925 """
926 if not self.__initialized:
927 raise RuntimeError("Thread.__init__() not called")
928 if not self.__started.is_set():
929 raise RuntimeError("cannot join thread before it is started")
930 if self is current_thread():
931 raise RuntimeError("cannot join current thread")
932
933 if __debug__:
934 if not self.__stopped:
935 self._note("%s.join(): waiting until thread stops", self)
936 self.__block.acquire()
937 try:
938 if timeout is None:
939 while not self.__stopped:
940 self.__block.wait()
941 if __debug__:
942 self._note("%s.join(): thread stopped", self)
943 else:
944 deadline = _time() + timeout
945 while not self.__stopped:
946 delay = deadline - _time()
947 if delay <= 0:
948 if __debug__:
949 self._note("%s.join(): timed out", self)
950 break
951 self.__block.wait(delay)
952 else:
953 if __debug__:
954 self._note("%s.join(): thread stopped", self)
955 finally:
956 self.__block.release()
957
958 @property
960 """A string used for identification purposes only.
961
962 It has no semantics. Multiple threads may be given the same name. The
963 initial name is set by the constructor.
964
965 """
966 assert self.__initialized, "Thread.__init__() not called"
967 return self.__name
968
969 @name.setter
970 - def name(self, name):
973
974 @property
976 """Thread identifier of this thread or None if it has not been started.
977
978 This is a nonzero integer. See the thread.get_ident() function. Thread
979 identifiers may be recycled when a thread exits and another thread is
980 created. The identifier is available even after the thread has exited.
981
982 """
983 assert self.__initialized, "Thread.__init__() not called"
984 return self.__ident
985
987 """Return whether the thread is alive.
988
989 This method returns True just before the run() method starts until just
990 after the run() method terminates. The module function enumerate()
991 returns a list of all alive threads.
992
993 """
994 assert self.__initialized, "Thread.__init__() not called"
995 return self.__started.is_set() and not self.__stopped
996
997 is_alive = isAlive
998
999 @property
1001 """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
1002
1003 This must be set before start() is called, otherwise RuntimeError is
1004 raised. Its initial value is inherited from the creating thread; the
1005 main thread is not a daemon thread and therefore all threads created in
1006 the main thread default to daemon = False.
1007
1008 The entire Python program exits when no alive non-daemon threads are
1009 left.
1010
1011 """
1012 assert self.__initialized, "Thread.__init__() not called"
1013 return self.__daemonic
1014
1015 @daemon.setter
1017 if not self.__initialized:
1018 raise RuntimeError("Thread.__init__() not called")
1019 if self.__started.is_set():
1020 raise RuntimeError("cannot set daemon status of active thread");
1021 self.__daemonic = daemonic
1022
1025
1027 self.daemon = daemonic
1028
1031
1034
1035
1036
1037 -def Timer(*args, **kwargs):
1038 """Factory function to create a Timer object.
1039
1040 Timers call a function after a specified number of seconds:
1041
1042 t = Timer(30.0, f, args=[], kwargs={})
1043 t.start()
1044 t.cancel() # stop the timer's action if it's still waiting
1045
1046 """
1047 return _Timer(*args, **kwargs)
1048
1050 """Call a function after a specified number of seconds:
1051
1052 t = Timer(30.0, f, args=[], kwargs={})
1053 t.start()
1054 t.cancel() # stop the timer's action if it's still waiting
1055
1056 """
1057
1058 - def __init__(self, interval, function, args=[], kwargs={}):
1059 Thread.__init__(self)
1060 self.interval = interval
1061 self.function = function
1062 self.args = args
1063 self.kwargs = kwargs
1064 self.finished = Event()
1065
1067 """Stop the timer if it hasn't finished yet"""
1068 self.finished.set()
1069
1071 self.finished.wait(self.interval)
1072 if not self.finished.is_set():
1073 self.function(*self.args, **self.kwargs)
1074 self.finished.set()
1075
1076
1077
1078
1079 -class _MainThread(Thread):
1080
1081 - def __init__(self):
1082 Thread.__init__(self, name="MainThread")
1083 self._Thread__started.set()
1084 self._set_ident()
1085 with _active_limbo_lock:
1086 _active[_get_ident()] = self
1087
1088 - def _set_daemon(self):
1090
1091 - def _exitfunc(self):
1092 self._Thread__stop()
1093 t = _pickSomeNonDaemonThread()
1094 if t:
1095 if __debug__:
1096 self._note("%s: waiting for other threads", self)
1097 while t:
1098 t.join()
1099 t = _pickSomeNonDaemonThread()
1100 if __debug__:
1101 self._note("%s: exiting", self)
1102 self._Thread__delete()
1103
1105 for t in enumerate():
1106 if not t.daemon and t.is_alive():
1107 return t
1108 return None
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119 -class _DummyThread(Thread):
1120
1133
1136
1137 - def join(self, timeout=None):
1138 assert False, "cannot join a dummy thread"
1139
1144 """Return the current Thread object, corresponding to the caller's thread of control.
1145
1146 If the caller's thread of control was not created through the threading
1147 module, a dummy thread object with limited functionality is returned.
1148
1149 """
1150 try:
1151 return _active[_get_ident()]
1152 except KeyError:
1153
1154 return _DummyThread()
1155
1156 current_thread = currentThread
1159 """Return the number of Thread objects currently alive.
1160
1161 The returned count is equal to the length of the list returned by
1162 enumerate().
1163
1164 """
1165 with _active_limbo_lock:
1166 return len(_active) + len(_limbo)
1167
1168 active_count = activeCount
1173
1175 """Return a list of all Thread objects currently alive.
1176
1177 The list includes daemonic threads, dummy thread objects created by
1178 current_thread(), and the main thread. It excludes terminated threads and
1179 threads that have not yet been started.
1180
1181 """
1182 with _active_limbo_lock:
1183 return _active.values() + _limbo.values()
1184
1185 from thread import stack_size
1186
1187
1188
1189
1190
1191 _shutdown = _MainThread()._exitfunc
1192
1193
1194
1195
1196 try:
1197 from thread import _local as local
1198 except ImportError:
1199 from _threading_local import local
1203
1204
1205
1206
1207
1208
1209 global _active_limbo_lock
1210 _active_limbo_lock = _allocate_lock()
1211
1212
1213 new_active = {}
1214 current = current_thread()
1215 with _active_limbo_lock:
1216 for thread in _enumerate():
1217
1218
1219 if hasattr(thread, '_reset_internal_locks'):
1220 thread._reset_internal_locks()
1221 if thread is current:
1222
1223
1224 ident = _get_ident()
1225 thread._Thread__ident = ident
1226 new_active[ident] = thread
1227 else:
1228
1229 thread._Thread__stop()
1230
1231 _limbo.clear()
1232 _active.clear()
1233 _active.update(new_active)
1234 assert len(_active) == 1
1235
1236
1237
1238
1239 -def _test():
1240
1241 class BoundedQueue(_Verbose):
1242
1243 def __init__(self, limit):
1244 _Verbose.__init__(self)
1245 self.mon = RLock()
1246 self.rc = Condition(self.mon)
1247 self.wc = Condition(self.mon)
1248 self.limit = limit
1249 self.queue = _deque()
1250
1251 def put(self, item):
1252 self.mon.acquire()
1253 while len(self.queue) >= self.limit:
1254 self._note("put(%s): queue full", item)
1255 self.wc.wait()
1256 self.queue.append(item)
1257 self._note("put(%s): appended, length now %d",
1258 item, len(self.queue))
1259 self.rc.notify()
1260 self.mon.release()
1261
1262 def get(self):
1263 self.mon.acquire()
1264 while not self.queue:
1265 self._note("get(): queue empty")
1266 self.rc.wait()
1267 item = self.queue.popleft()
1268 self._note("get(): got %s, %d left", item, len(self.queue))
1269 self.wc.notify()
1270 self.mon.release()
1271 return item
1272
1273 class ProducerThread(Thread):
1274
1275 def __init__(self, queue, quota):
1276 Thread.__init__(self, name="Producer")
1277 self.queue = queue
1278 self.quota = quota
1279
1280 def run(self):
1281 from random import random
1282 counter = 0
1283 while counter < self.quota:
1284 counter = counter + 1
1285 self.queue.put("%s.%d" % (self.name, counter))
1286 _sleep(random() * 0.00001)
1287
1288
1289 class ConsumerThread(Thread):
1290
1291 def __init__(self, queue, count):
1292 Thread.__init__(self, name="Consumer")
1293 self.queue = queue
1294 self.count = count
1295
1296 def run(self):
1297 while self.count > 0:
1298 item = self.queue.get()
1299 print item
1300 self.count = self.count - 1
1301
1302 NP = 3
1303 QL = 4
1304 NI = 5
1305
1306 Q = BoundedQueue(QL)
1307 P = []
1308 for i in range(NP):
1309 t = ProducerThread(Q, NI)
1310 t.name = ("Producer-%d" % (i+1))
1311 P.append(t)
1312 C = ConsumerThread(Q, NI*NP)
1313 for t in P:
1314 t.start()
1315 _sleep(0.000001)
1316 C.start()
1317 for t in P:
1318 t.join()
1319 C.join()
1320
1321 if __name__ == '__main__':
1322 _test()
1323