1 """Thread module emulating a subset of Java's threading model."""
2
3 import sys as _sys
4
5 try:
6 import thread
7 except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11 import warnings
12
13 from functools import wraps
14 from time import time as _time, sleep as _sleep
15 from traceback import format_exc as _format_exc
16 from collections import deque
17
18
19
20
21
22
23
24
25
26
27
28
29
30 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
31 'current_thread', 'enumerate', 'Event',
32 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34
35 _start_new_thread = thread.start_new_thread
36 _allocate_lock = thread.allocate_lock
37 _get_ident = thread.get_ident
38 ThreadError = thread.error
39 del thread
40
41
42
43
44 warnings.filterwarnings('ignore', category=DeprecationWarning,
45 module='threading', message='sys.exc_clear')
46
47
48
49
50
51
52
53 _VERBOSE = False
54
55 if __debug__:
58
60 if verbose is None:
61 verbose = _VERBOSE
62 self.__verbose = verbose
63
64 - def _note(self, format, *args):
70
71 else:
78
79
80
81 _profile_hook = None
82 _trace_hook = None
87
91
92
93
94 Lock = _allocate_lock
95
96 -def RLock(*args, **kwargs):
97 return _RLock(*args, **kwargs)
98
100
102 _Verbose.__init__(self, verbose)
103 self.__block = _allocate_lock()
104 self.__owner = None
105 self.__count = 0
106
108 owner = self.__owner
109 try:
110 owner = _active[owner].name
111 except KeyError:
112 pass
113 return "<%s owner=%r count=%d>" % (
114 self.__class__.__name__, owner, self.__count)
115
117 me = _get_ident()
118 if self.__owner == me:
119 self.__count = self.__count + 1
120 if __debug__:
121 self._note("%s.acquire(%s): recursive success", self, blocking)
122 return 1
123 rc = self.__block.acquire(blocking)
124 if rc:
125 self.__owner = me
126 self.__count = 1
127 if __debug__:
128 self._note("%s.acquire(%s): initial success", self, blocking)
129 else:
130 if __debug__:
131 self._note("%s.acquire(%s): failure", self, blocking)
132 return rc
133
134 __enter__ = acquire
135
137 if self.__owner != _get_ident():
138 raise RuntimeError("cannot release un-acquired lock")
139 self.__count = count = self.__count - 1
140 if not count:
141 self.__owner = None
142 self.__block.release()
143 if __debug__:
144 self._note("%s.release(): final release", self)
145 else:
146 if __debug__:
147 self._note("%s.release(): non-final release", self)
148
151
152
153
155 count, owner = count_owner
156 self.__block.acquire()
157 self.__count = count
158 self.__owner = owner
159 if __debug__:
160 self._note("%s._acquire_restore()", self)
161
163 if __debug__:
164 self._note("%s._release_save()", self)
165 count = self.__count
166 self.__count = 0
167 owner = self.__owner
168 self.__owner = None
169 self.__block.release()
170 return (count, owner)
171
173 return self.__owner == _get_ident()
174
178
180
181 - def __init__(self, lock=None, verbose=None):
182 _Verbose.__init__(self, verbose)
183 if lock is None:
184 lock = RLock()
185 self.__lock = lock
186
187 self.acquire = lock.acquire
188 self.release = lock.release
189
190
191
192 try:
193 self._release_save = lock._release_save
194 except AttributeError:
195 pass
196 try:
197 self._acquire_restore = lock._acquire_restore
198 except AttributeError:
199 pass
200 try:
201 self._is_owned = lock._is_owned
202 except AttributeError:
203 pass
204 self.__waiters = []
205
208
211
213 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
214
216 self.__lock.release()
217
219 self.__lock.acquire()
220
222
223
224 if self.__lock.acquire(0):
225 self.__lock.release()
226 return False
227 else:
228 return True
229
230 - def wait(self, timeout=None):
231 if not self._is_owned():
232 raise RuntimeError("cannot wait on un-acquired lock")
233 waiter = _allocate_lock()
234 waiter.acquire()
235 self.__waiters.append(waiter)
236 saved_state = self._release_save()
237 try:
238 if timeout is None:
239 waiter.acquire()
240 if __debug__:
241 self._note("%s.wait(): got it", self)
242 else:
243
244
245
246
247
248 endtime = _time() + timeout
249 delay = 0.0005
250 while True:
251 gotit = waiter.acquire(0)
252 if gotit:
253 break
254 remaining = endtime - _time()
255 if remaining <= 0:
256 break
257 delay = min(delay * 2, remaining, .05)
258 _sleep(delay)
259 if not gotit:
260 if __debug__:
261 self._note("%s.wait(%s): timed out", self, timeout)
262 try:
263 self.__waiters.remove(waiter)
264 except ValueError:
265 pass
266 else:
267 if __debug__:
268 self._note("%s.wait(%s): got it", self, timeout)
269 finally:
270 self._acquire_restore(saved_state)
271
273 if not self._is_owned():
274 raise RuntimeError("cannot notify on un-acquired lock")
275 __waiters = self.__waiters
276 waiters = __waiters[:n]
277 if not waiters:
278 if __debug__:
279 self._note("%s.notify(): no waiters", self)
280 return
281 self._note("%s.notify(): notifying %d waiter%s", self, n,
282 n!=1 and "s" or "")
283 for waiter in waiters:
284 waiter.release()
285 try:
286 __waiters.remove(waiter)
287 except ValueError:
288 pass
289
291 self.notify(len(self.__waiters))
292
293 notify_all = notifyAll
294
298
300
301
302
303 - def __init__(self, value=1, verbose=None):
304 if value < 0:
305 raise ValueError("semaphore initial value must be >= 0")
306 _Verbose.__init__(self, verbose)
307 self.__cond = Condition(Lock())
308 self.__value = value
309
311 rc = False
312 self.__cond.acquire()
313 while self.__value == 0:
314 if not blocking:
315 break
316 if __debug__:
317 self._note("%s.acquire(%s): blocked waiting, value=%s",
318 self, blocking, self.__value)
319 self.__cond.wait()
320 else:
321 self.__value = self.__value - 1
322 if __debug__:
323 self._note("%s.acquire: success, value=%s",
324 self, self.__value)
325 rc = True
326 self.__cond.release()
327 return rc
328
329 __enter__ = acquire
330
332 self.__cond.acquire()
333 self.__value = self.__value + 1
334 if __debug__:
335 self._note("%s.release: success, value=%s",
336 self, self.__value)
337 self.__cond.notify()
338 self.__cond.release()
339
342
346
348 """Semaphore that checks that # releases is <= # acquires"""
349 - def __init__(self, value=1, verbose=None):
352
354 if self._Semaphore__value >= self._initial_value:
355 raise ValueError, "Semaphore released too many times"
356 return _Semaphore.release(self)
357
358
359 -def Event(*args, **kwargs):
360 return _Event(*args, **kwargs)
361
363
364
365
367 _Verbose.__init__(self, verbose)
368 self.__cond = Condition(Lock())
369 self.__flag = False
370
373
374 is_set = isSet
375
377 self.__cond.acquire()
378 try:
379 self.__flag = True
380 self.__cond.notify_all()
381 finally:
382 self.__cond.release()
383
385 self.__cond.acquire()
386 try:
387 self.__flag = False
388 finally:
389 self.__cond.release()
390
391 - def wait(self, timeout=None):
392 self.__cond.acquire()
393 try:
394 if not self.__flag:
395 self.__cond.wait(timeout)
396 finally:
397 self.__cond.release()
398
399
400 _counter = 0
405
406
407 _active_limbo_lock = _allocate_lock()
408 _active = {}
409 _limbo = {}
410
411
412
413
414 -class Thread(_Verbose):
415
416 __initialized = False
417
418
419
420
421 __exc_info = _sys.exc_info
422
423
424 __exc_clear = _sys.exc_clear
425
426 - def __init__(self, group=None, target=None, name=None,
427 args=(), kwargs=None, verbose=None):
428 assert group is None, "group argument must be None for now"
429 _Verbose.__init__(self, verbose)
430 if kwargs is None:
431 kwargs = {}
432 self.__target = target
433 self.__name = str(name or _newname())
434 self.__args = args
435 self.__kwargs = kwargs
436 self.__daemonic = self._set_daemon()
437 self.__ident = None
438 self.__started = Event()
439 self.__stopped = False
440 self.__block = Condition(Lock())
441 self.__initialized = True
442
443
444 self.__stderr = _sys.stderr
445
447
448 return current_thread().daemon
449
451 assert self.__initialized, "Thread.__init__() was not called"
452 status = "initial"
453 if self.__started.is_set():
454 status = "started"
455 if self.__stopped:
456 status = "stopped"
457 if self.__daemonic:
458 status += " daemon"
459 if self.__ident is not None:
460 status += " %s" % self.__ident
461 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
462
464 if not self.__initialized:
465 raise RuntimeError("thread.__init__() not called")
466 if self.__started.is_set():
467 raise RuntimeError("thread already started")
468 if __debug__:
469 self._note("%s.start(): starting thread", self)
470 _active_limbo_lock.acquire()
471 _limbo[self] = self
472 _active_limbo_lock.release()
473 try:
474 _start_new_thread(self.__bootstrap, ())
475 except Exception:
476 with _active_limbo_lock:
477 del _limbo[self]
478 raise
479 self.__started.wait()
480
482 try:
483 if self.__target:
484 self.__target(*self.__args, **self.__kwargs)
485 finally:
486
487
488 del self.__target, self.__args, self.__kwargs
489
491
492
493
494
495
496
497
498
499
500
501
502
503 try:
504 self.__bootstrap_inner()
505 except:
506 if self.__daemonic and _sys is None:
507 return
508 raise
509
511 self.__ident = _get_ident()
512
514 try:
515 self._set_ident()
516 self.__started.set()
517 _active_limbo_lock.acquire()
518 _active[self.__ident] = self
519 del _limbo[self]
520 _active_limbo_lock.release()
521 if __debug__:
522 self._note("%s.__bootstrap(): thread started", self)
523
524 if _trace_hook:
525 self._note("%s.__bootstrap(): registering trace hook", self)
526 _sys.settrace(_trace_hook)
527 if _profile_hook:
528 self._note("%s.__bootstrap(): registering profile hook", self)
529 _sys.setprofile(_profile_hook)
530
531 try:
532 self.run()
533 except SystemExit:
534 if __debug__:
535 self._note("%s.__bootstrap(): raised SystemExit", self)
536 except:
537 if __debug__:
538 self._note("%s.__bootstrap(): unhandled exception", self)
539
540
541
542
543 if _sys:
544 _sys.stderr.write("Exception in thread %s:\n%s\n" %
545 (self.name, _format_exc()))
546 else:
547
548
549
550 exc_type, exc_value, exc_tb = self.__exc_info()
551 try:
552 print>>self.__stderr, (
553 "Exception in thread " + self.name +
554 " (most likely raised during interpreter shutdown):")
555 print>>self.__stderr, (
556 "Traceback (most recent call last):")
557 while exc_tb:
558 print>>self.__stderr, (
559 ' File "%s", line %s, in %s' %
560 (exc_tb.tb_frame.f_code.co_filename,
561 exc_tb.tb_lineno,
562 exc_tb.tb_frame.f_code.co_name))
563 exc_tb = exc_tb.tb_next
564 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
565
566
567 finally:
568 del exc_type, exc_value, exc_tb
569 else:
570 if __debug__:
571 self._note("%s.__bootstrap(): normal return", self)
572 finally:
573
574
575
576
577 self.__exc_clear()
578 finally:
579 with _active_limbo_lock:
580 self.__stop()
581 try:
582
583
584 del _active[_get_ident()]
585 except:
586 pass
587
589 self.__block.acquire()
590 self.__stopped = True
591 self.__block.notify_all()
592 self.__block.release()
593
595 "Remove current thread from the dict of currently running threads."
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618 try:
619 with _active_limbo_lock:
620 del _active[_get_ident()]
621
622
623
624
625 except KeyError:
626 if 'dummy_threading' not in _sys.modules:
627 raise
628
629 - def join(self, timeout=None):
630 if not self.__initialized:
631 raise RuntimeError("Thread.__init__() not called")
632 if not self.__started.is_set():
633 raise RuntimeError("cannot join thread before it is started")
634 if self is current_thread():
635 raise RuntimeError("cannot join current thread")
636
637 if __debug__:
638 if not self.__stopped:
639 self._note("%s.join(): waiting until thread stops", self)
640 self.__block.acquire()
641 try:
642 if timeout is None:
643 while not self.__stopped:
644 self.__block.wait()
645 if __debug__:
646 self._note("%s.join(): thread stopped", self)
647 else:
648 deadline = _time() + timeout
649 while not self.__stopped:
650 delay = deadline - _time()
651 if delay <= 0:
652 if __debug__:
653 self._note("%s.join(): timed out", self)
654 break
655 self.__block.wait(delay)
656 else:
657 if __debug__:
658 self._note("%s.join(): thread stopped", self)
659 finally:
660 self.__block.release()
661
662 @property
664 assert self.__initialized, "Thread.__init__() not called"
665 return self.__name
666
667 @name.setter
668 - def name(self, name):
669 assert self.__initialized, "Thread.__init__() not called"
670 self.__name = str(name)
671
672 @property
674 assert self.__initialized, "Thread.__init__() not called"
675 return self.__ident
676
678 assert self.__initialized, "Thread.__init__() not called"
679 return self.__started.is_set() and not self.__stopped
680
681 is_alive = isAlive
682
683 @property
685 assert self.__initialized, "Thread.__init__() not called"
686 return self.__daemonic
687
688 @daemon.setter
690 if not self.__initialized:
691 raise RuntimeError("Thread.__init__() not called")
692 if self.__started.is_set():
693 raise RuntimeError("cannot set daemon status of active thread");
694 self.__daemonic = daemonic
695
698
700 self.daemon = daemonic
701
704
707
708
709
710 -def Timer(*args, **kwargs):
711 return _Timer(*args, **kwargs)
712
714 """Call a function after a specified number of seconds:
715
716 t = Timer(30.0, f, args=[], kwargs={})
717 t.start()
718 t.cancel() # stop the timer's action if it's still waiting
719 """
720
721 - def __init__(self, interval, function, args=[], kwargs={}):
722 Thread.__init__(self)
723 self.interval = interval
724 self.function = function
725 self.args = args
726 self.kwargs = kwargs
727 self.finished = Event()
728
730 """Stop the timer if it hasn't finished yet"""
731 self.finished.set()
732
734 self.finished.wait(self.interval)
735 if not self.finished.is_set():
736 self.function(*self.args, **self.kwargs)
737 self.finished.set()
738
739
740
741
742 -class _MainThread(Thread):
743
746
747 - def __init__(self):
748 Thread.__init__(self, name="MainThread")
749 self._Thread__started.set()
750 self._set_ident()
751 self.waitForThreadsOnExitFunc = self.waitForThreadsOnExit
752 _active_limbo_lock.acquire()
753 _active[_get_ident()] = self
754 _active_limbo_lock.release()
755
756 - def _set_daemon(self):
758
759 - def _exitfunc(self):
760 self._Thread__stop()
761
762 t = _pickSomeNonDaemonThread()
763 if t:
764 wait = self.waitForThreadsOnExitFunc()
765 if wait and __debug__:
766 self._note("%s: waiting for other threads", self)
767 while t and wait:
768 t.join()
769 t = _pickSomeNonDaemonThread()
770 if __debug__:
771 self._note("%s: exiting", self)
772
773 self._Thread__delete()
774
776 for t in enumerate():
777 if not t.daemon and t.is_alive():
778 return t
779 return None
780
781
782
783
784
785
786
787
788
789
790 -class _DummyThread(Thread):
791
805
808
809 - def join(self, timeout=None):
810 assert False, "cannot join a dummy thread"
811
821
822 current_thread = currentThread
829
830 active_count = activeCount
835
841
842 from thread import stack_size
843
844
845
846
847
848 _shutdown = _MainThread()._exitfunc
849
850
851
852
853 try:
854 from thread import _local as local
855 except ImportError:
856 from _threading_local import local
860
861
862
863
864
865
866 global _active_limbo_lock
867 _active_limbo_lock = _allocate_lock()
868
869
870 new_active = {}
871 current = current_thread()
872 with _active_limbo_lock:
873 for thread in _active.itervalues():
874 if thread is current:
875
876
877 ident = _get_ident()
878 thread._Thread__ident = ident
879 new_active[ident] = thread
880 else:
881
882
883
884
885 thread._Thread__stopped = True
886
887 _limbo.clear()
888 _active.clear()
889 _active.update(new_active)
890 assert len(_active) == 1
891
896
897 class BoundedQueue(_Verbose):
898
899 def __init__(self, limit):
900 _Verbose.__init__(self)
901 self.mon = RLock()
902 self.rc = Condition(self.mon)
903 self.wc = Condition(self.mon)
904 self.limit = limit
905 self.queue = deque()
906
907 def put(self, item):
908 self.mon.acquire()
909 while len(self.queue) >= self.limit:
910 self._note("put(%s): queue full", item)
911 self.wc.wait()
912 self.queue.append(item)
913 self._note("put(%s): appended, length now %d",
914 item, len(self.queue))
915 self.rc.notify()
916 self.mon.release()
917
918 def get(self):
919 self.mon.acquire()
920 while not self.queue:
921 self._note("get(): queue empty")
922 self.rc.wait()
923 item = self.queue.popleft()
924 self._note("get(): got %s, %d left", item, len(self.queue))
925 self.wc.notify()
926 self.mon.release()
927 return item
928
929 class ProducerThread(Thread):
930
931 def __init__(self, queue, quota):
932 Thread.__init__(self, name="Producer")
933 self.queue = queue
934 self.quota = quota
935
936 def run(self):
937 from random import random
938 counter = 0
939 while counter < self.quota:
940 counter = counter + 1
941 self.queue.put("%s.%d" % (self.name, counter))
942 _sleep(random() * 0.00001)
943
944
945 class ConsumerThread(Thread):
946
947 def __init__(self, queue, count):
948 Thread.__init__(self, name="Consumer")
949 self.queue = queue
950 self.count = count
951
952 def run(self):
953 while self.count > 0:
954 item = self.queue.get()
955 print item
956 self.count = self.count - 1
957
958 NP = 3
959 QL = 4
960 NI = 5
961
962 Q = BoundedQueue(QL)
963 P = []
964 for i in range(NP):
965 t = ProducerThread(Q, NI)
966 t.name = ("Producer-%d" % (i+1))
967 P.append(t)
968 C = ConsumerThread(Q, NI*NP)
969 for t in P:
970 t.start()
971 _sleep(0.000001)
972 C.start()
973 for t in P:
974 t.join()
975 C.join()
976
977 if __name__ == '__main__':
978 _test()
979