1 """Thread module emulating a subset of Java's threading model."""
2
3 import sys as _sys
4
5 try:
6 import thread
7 except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11 import warnings
12
13 from time import time as _time, sleep as _sleep
14 from traceback import format_exc as _format_exc
15
16
17
18
19
20
21
22
23
24
25
26
27
28 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
29 'current_thread', 'enumerate', 'Event',
30 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
31 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
32
33 _start_new_thread = thread.start_new_thread
34 _allocate_lock = thread.allocate_lock
35 _get_ident = thread.get_ident
36 ThreadError = thread.error
37 del thread
38
39
40
41
42 warnings.filterwarnings('ignore', category=DeprecationWarning,
43 module='threading', message='sys.exc_clear')
44
45
46
47
48
49
50
51 _VERBOSE = False
52
53 if __debug__:
56
58 if verbose is None:
59 verbose = _VERBOSE
60 self.__verbose = verbose
61
62 - def _note(self, format, *args):
74
75 else:
82
83
84
85 _profile_hook = None
86 _trace_hook = None
91
95
96
97
98 Lock = _allocate_lock
99
100 -def RLock(*args, **kwargs):
101 return _RLock(*args, **kwargs)
102
104
106 _Verbose.__init__(self, verbose)
107 self.__block = _allocate_lock()
108 self.__owner = None
109 self.__count = 0
110
112 owner = self.__owner
113 try:
114 owner = _active[owner].name
115 except KeyError:
116 pass
117 return "<%s owner=%r count=%d>" % (
118 self.__class__.__name__, owner, self.__count)
119
121 me = _get_ident()
122 if self.__owner == me:
123 self.__count = self.__count + 1
124 if __debug__:
125 self._note("%s.acquire(%s): recursive success", self, blocking)
126 return 1
127 rc = self.__block.acquire(blocking)
128 if rc:
129 self.__owner = me
130 self.__count = 1
131 if __debug__:
132 self._note("%s.acquire(%s): initial success", self, blocking)
133 else:
134 if __debug__:
135 self._note("%s.acquire(%s): failure", self, blocking)
136 return rc
137
138 __enter__ = acquire
139
141 if self.__owner != _get_ident():
142 raise RuntimeError("cannot release un-acquired lock")
143 self.__count = count = self.__count - 1
144 if not count:
145 self.__owner = None
146 self.__block.release()
147 if __debug__:
148 self._note("%s.release(): final release", self)
149 else:
150 if __debug__:
151 self._note("%s.release(): non-final release", self)
152
155
156
157
159 count, owner = count_owner
160 self.__block.acquire()
161 self.__count = count
162 self.__owner = owner
163 if __debug__:
164 self._note("%s._acquire_restore()", self)
165
167 if __debug__:
168 self._note("%s._release_save()", self)
169 count = self.__count
170 self.__count = 0
171 owner = self.__owner
172 self.__owner = None
173 self.__block.release()
174 return (count, owner)
175
177 return self.__owner == _get_ident()
178
182
184
185 - def __init__(self, lock=None, verbose=None):
186 _Verbose.__init__(self, verbose)
187 if lock is None:
188 lock = RLock()
189 self.__lock = lock
190
191 self.acquire = lock.acquire
192 self.release = lock.release
193
194
195
196 try:
197 self._release_save = lock._release_save
198 except AttributeError:
199 pass
200 try:
201 self._acquire_restore = lock._acquire_restore
202 except AttributeError:
203 pass
204 try:
205 self._is_owned = lock._is_owned
206 except AttributeError:
207 pass
208 self.__waiters = []
209
212
215
217 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
218
220 self.__lock.release()
221
223 self.__lock.acquire()
224
226
227
228 if self.__lock.acquire(0):
229 self.__lock.release()
230 return False
231 else:
232 return True
233
234 - def wait(self, timeout=None):
235 if not self._is_owned():
236 raise RuntimeError("cannot wait on un-acquired lock")
237 waiter = _allocate_lock()
238 waiter.acquire()
239 self.__waiters.append(waiter)
240 saved_state = self._release_save()
241 try:
242 if timeout is None:
243 waiter.acquire()
244 if __debug__:
245 self._note("%s.wait(): got it", self)
246 else:
247
248
249
250
251
252 endtime = _time() + timeout
253 delay = 0.0005
254 while True:
255 gotit = waiter.acquire(0)
256 if gotit:
257 break
258 remaining = endtime - _time()
259 if remaining <= 0:
260 break
261 delay = min(delay * 2, remaining, .05)
262 _sleep(delay)
263 if not gotit:
264 if __debug__:
265 self._note("%s.wait(%s): timed out", self, timeout)
266 try:
267 self.__waiters.remove(waiter)
268 except ValueError:
269 pass
270 else:
271 if __debug__:
272 self._note("%s.wait(%s): got it", self, timeout)
273 finally:
274 self._acquire_restore(saved_state)
275
277 if not self._is_owned():
278 raise RuntimeError("cannot notify on un-acquired lock")
279 __waiters = self.__waiters
280 waiters = __waiters[:n]
281 if not waiters:
282 if __debug__:
283 self._note("%s.notify(): no waiters", self)
284 return
285 self._note("%s.notify(): notifying %d waiter%s", self, n,
286 n!=1 and "s" or "")
287 for waiter in waiters:
288 waiter.release()
289 try:
290 __waiters.remove(waiter)
291 except ValueError:
292 pass
293
295 self.notify(len(self.__waiters))
296
297 notify_all = notifyAll
298
302
304
305
306
307 - def __init__(self, value=1, verbose=None):
308 if value < 0:
309 raise ValueError("semaphore initial value must be >= 0")
310 _Verbose.__init__(self, verbose)
311 self.__cond = Condition(Lock())
312 self.__value = value
313
315 rc = False
316 self.__cond.acquire()
317 while self.__value == 0:
318 if not blocking:
319 break
320 if __debug__:
321 self._note("%s.acquire(%s): blocked waiting, value=%s",
322 self, blocking, self.__value)
323 self.__cond.wait()
324 else:
325 self.__value = self.__value - 1
326 if __debug__:
327 self._note("%s.acquire: success, value=%s",
328 self, self.__value)
329 rc = True
330 self.__cond.release()
331 return rc
332
333 __enter__ = acquire
334
336 self.__cond.acquire()
337 self.__value = self.__value + 1
338 if __debug__:
339 self._note("%s.release: success, value=%s",
340 self, self.__value)
341 self.__cond.notify()
342 self.__cond.release()
343
346
350
352 """Semaphore that checks that # releases is <= # acquires"""
353 - def __init__(self, value=1, verbose=None):
356
358 if self._Semaphore__value >= self._initial_value:
359 raise ValueError, "Semaphore released too many times"
360 return _Semaphore.release(self)
361
362
363 -def Event(*args, **kwargs):
364 return _Event(*args, **kwargs)
365
367
368
369
371 _Verbose.__init__(self, verbose)
372 self.__cond = Condition(Lock())
373 self.__flag = False
374
378
381
382 is_set = isSet
383
385 self.__cond.acquire()
386 try:
387 self.__flag = True
388 self.__cond.notify_all()
389 finally:
390 self.__cond.release()
391
393 self.__cond.acquire()
394 try:
395 self.__flag = False
396 finally:
397 self.__cond.release()
398
399 - def wait(self, timeout=None):
400 self.__cond.acquire()
401 try:
402 if not self.__flag:
403 self.__cond.wait(timeout)
404 return self.__flag
405 finally:
406 self.__cond.release()
407
408
409 _counter = 0
414
415
416 _active_limbo_lock = _allocate_lock()
417 _active = {}
418 _limbo = {}
419
420
421
422
423 -class Thread(_Verbose):
424
425 __initialized = False
426
427
428
429
430 __exc_info = _sys.exc_info
431
432
433 __exc_clear = _sys.exc_clear
434
435 - def __init__(self, group=None, target=None, name=None,
436 args=(), kwargs=None, verbose=None):
437 assert group is None, "group argument must be None for now"
438 _Verbose.__init__(self, verbose)
439 if kwargs is None:
440 kwargs = {}
441 self.__target = target
442 self.__name = str(name or _newname())
443 self.__args = args
444 self.__kwargs = kwargs
445 self.__daemonic = self._set_daemon()
446 self.__ident = None
447 self.__started = Event()
448 self.__stopped = False
449 self.__block = Condition(Lock())
450 self.__initialized = True
451
452
453 self.__stderr = _sys.stderr
454
456
457
458 if hasattr(self, '_Thread__block'):
459 self.__block.__init__()
460 self.__started._reset_internal_locks()
461
462 @property
464
465 return self.__block
466
468
469 return current_thread().daemon
470
472 assert self.__initialized, "Thread.__init__() was not called"
473 status = "initial"
474 if self.__started.is_set():
475 status = "started"
476 if self.__stopped:
477 status = "stopped"
478 if self.__daemonic:
479 status += " daemon"
480 if self.__ident is not None:
481 status += " %s" % self.__ident
482 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
483
485 if not self.__initialized:
486 raise RuntimeError("thread.__init__() not called")
487 if self.__started.is_set():
488 raise RuntimeError("threads can only be started once")
489 if __debug__:
490 self._note("%s.start(): starting thread", self)
491 with _active_limbo_lock:
492 _limbo[self] = self
493 try:
494 _start_new_thread(self.__bootstrap, ())
495 except Exception:
496 with _active_limbo_lock:
497 del _limbo[self]
498 raise
499 self.__started.wait()
500
502 try:
503 if self.__target:
504 self.__target(*self.__args, **self.__kwargs)
505 finally:
506
507
508 del self.__target, self.__args, self.__kwargs
509
511
512
513
514
515
516
517
518
519
520
521
522
523 try:
524 self.__bootstrap_inner()
525 except:
526 if self.__daemonic and _sys is None:
527 return
528 raise
529
531 self.__ident = _get_ident()
532
534 try:
535 self._set_ident()
536 self.__started.set()
537 with _active_limbo_lock:
538 _active[self.__ident] = self
539 del _limbo[self]
540 if __debug__:
541 self._note("%s.__bootstrap(): thread started", self)
542
543 if _trace_hook:
544 self._note("%s.__bootstrap(): registering trace hook", self)
545 _sys.settrace(_trace_hook)
546 if _profile_hook:
547 self._note("%s.__bootstrap(): registering profile hook", self)
548 _sys.setprofile(_profile_hook)
549
550 try:
551 self.run()
552 except SystemExit:
553 if __debug__:
554 self._note("%s.__bootstrap(): raised SystemExit", self)
555 except:
556 if __debug__:
557 self._note("%s.__bootstrap(): unhandled exception", self)
558
559
560
561
562 if _sys:
563 _sys.stderr.write("Exception in thread %s:\n%s\n" %
564 (self.name, _format_exc()))
565 else:
566
567
568
569 exc_type, exc_value, exc_tb = self.__exc_info()
570 try:
571 print>>self.__stderr, (
572 "Exception in thread " + self.name +
573 " (most likely raised during interpreter shutdown):")
574 print>>self.__stderr, (
575 "Traceback (most recent call last):")
576 while exc_tb:
577 print>>self.__stderr, (
578 ' File "%s", line %s, in %s' %
579 (exc_tb.tb_frame.f_code.co_filename,
580 exc_tb.tb_lineno,
581 exc_tb.tb_frame.f_code.co_name))
582 exc_tb = exc_tb.tb_next
583 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
584
585
586 finally:
587 del exc_type, exc_value, exc_tb
588 else:
589 if __debug__:
590 self._note("%s.__bootstrap(): normal return", self)
591 finally:
592
593
594
595
596 self.__exc_clear()
597 finally:
598 with _active_limbo_lock:
599 self.__stop()
600 try:
601
602
603 del _active[_get_ident()]
604 except:
605 pass
606
608 self.__block.acquire()
609 self.__stopped = True
610 self.__block.notify_all()
611 self.__block.release()
612
614 "Remove current thread from the dict of currently running threads."
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637 try:
638 with _active_limbo_lock:
639 del _active[_get_ident()]
640
641
642
643
644 except KeyError:
645 if 'dummy_threading' not in _sys.modules:
646 raise
647
648 - def join(self, timeout=None):
649 if not self.__initialized:
650 raise RuntimeError("Thread.__init__() not called")
651 if not self.__started.is_set():
652 raise RuntimeError("cannot join thread before it is started")
653 if self is current_thread():
654 raise RuntimeError("cannot join current thread")
655
656 if __debug__:
657 if not self.__stopped:
658 self._note("%s.join(): waiting until thread stops", self)
659 self.__block.acquire()
660 try:
661 if timeout is None:
662 while not self.__stopped:
663 self.__block.wait()
664 if __debug__:
665 self._note("%s.join(): thread stopped", self)
666 else:
667 deadline = _time() + timeout
668 while not self.__stopped:
669 delay = deadline - _time()
670 if delay <= 0:
671 if __debug__:
672 self._note("%s.join(): timed out", self)
673 break
674 self.__block.wait(delay)
675 else:
676 if __debug__:
677 self._note("%s.join(): thread stopped", self)
678 finally:
679 self.__block.release()
680
681 @property
683 assert self.__initialized, "Thread.__init__() not called"
684 return self.__name
685
686 @name.setter
687 - def name(self, name):
688 assert self.__initialized, "Thread.__init__() not called"
689 self.__name = str(name)
690
691 @property
693 assert self.__initialized, "Thread.__init__() not called"
694 return self.__ident
695
697 assert self.__initialized, "Thread.__init__() not called"
698 return self.__started.is_set() and not self.__stopped
699
700 is_alive = isAlive
701
702 @property
704 assert self.__initialized, "Thread.__init__() not called"
705 return self.__daemonic
706
707 @daemon.setter
709 if not self.__initialized:
710 raise RuntimeError("Thread.__init__() not called")
711 if self.__started.is_set():
712 raise RuntimeError("cannot set daemon status of active thread");
713 self.__daemonic = daemonic
714
717
719 self.daemon = daemonic
720
723
726
727
728
729 -def Timer(*args, **kwargs):
730 return _Timer(*args, **kwargs)
731
733 """Call a function after a specified number of seconds:
734
735 t = Timer(30.0, f, args=[], kwargs={})
736 t.start()
737 t.cancel() # stop the timer's action if it's still waiting
738 """
739
740 - def __init__(self, interval, function, args=[], kwargs={}):
741 Thread.__init__(self)
742 self.interval = interval
743 self.function = function
744 self.args = args
745 self.kwargs = kwargs
746 self.finished = Event()
747
749 """Stop the timer if it hasn't finished yet"""
750 self.finished.set()
751
753 self.finished.wait(self.interval)
754 if not self.finished.is_set():
755 self.function(*self.args, **self.kwargs)
756 self.finished.set()
757
758
759
760
761 -class _MainThread(Thread):
762
763 - def __init__(self):
764 Thread.__init__(self, name="MainThread")
765 self._Thread__started.set()
766 self._set_ident()
767 with _active_limbo_lock:
768 _active[_get_ident()] = self
769
770 - def _set_daemon(self):
772
773 - def _exitfunc(self):
774 self._Thread__stop()
775 t = _pickSomeNonDaemonThread()
776 if t:
777 if __debug__:
778 self._note("%s: waiting for other threads", self)
779 while t:
780 t.join()
781 t = _pickSomeNonDaemonThread()
782 if __debug__:
783 self._note("%s: exiting", self)
784 self._Thread__delete()
785
787 for t in enumerate():
788 if not t.daemon and t.is_alive():
789 return t
790 return None
791
792
793
794
795
796
797
798
799
800
801 -class _DummyThread(Thread):
802
815
818
819 - def join(self, timeout=None):
820 assert False, "cannot join a dummy thread"
821
831
832 current_thread = currentThread
837
838 active_count = activeCount
843
847
848 from thread import stack_size
849
850
851
852
853
854 _shutdown = _MainThread()._exitfunc
855
856
857
858
859 try:
860 from thread import _local as local
861 except ImportError:
862 from _threading_local import local
866
867
868
869
870
871
872 global _active_limbo_lock
873 _active_limbo_lock = _allocate_lock()
874
875
876 new_active = {}
877 current = current_thread()
878 with _active_limbo_lock:
879 for thread in _active.itervalues():
880
881
882 if hasattr(thread, '_reset_internal_locks'):
883 thread._reset_internal_locks()
884 if thread is current:
885
886
887 ident = _get_ident()
888 thread._Thread__ident = ident
889 new_active[ident] = thread
890 else:
891
892 thread._Thread__stop()
893
894 _limbo.clear()
895 _active.clear()
896 _active.update(new_active)
897 assert len(_active) == 1
898
903
904 class BoundedQueue(_Verbose):
905
906 def __init__(self, limit):
907 _Verbose.__init__(self)
908 self.mon = RLock()
909 self.rc = Condition(self.mon)
910 self.wc = Condition(self.mon)
911 self.limit = limit
912 self.queue = deque()
913
914 def put(self, item):
915 self.mon.acquire()
916 while len(self.queue) >= self.limit:
917 self._note("put(%s): queue full", item)
918 self.wc.wait()
919 self.queue.append(item)
920 self._note("put(%s): appended, length now %d",
921 item, len(self.queue))
922 self.rc.notify()
923 self.mon.release()
924
925 def get(self):
926 self.mon.acquire()
927 while not self.queue:
928 self._note("get(): queue empty")
929 self.rc.wait()
930 item = self.queue.popleft()
931 self._note("get(): got %s, %d left", item, len(self.queue))
932 self.wc.notify()
933 self.mon.release()
934 return item
935
936 class ProducerThread(Thread):
937
938 def __init__(self, queue, quota):
939 Thread.__init__(self, name="Producer")
940 self.queue = queue
941 self.quota = quota
942
943 def run(self):
944 from random import random
945 counter = 0
946 while counter < self.quota:
947 counter = counter + 1
948 self.queue.put("%s.%d" % (self.name, counter))
949 _sleep(random() * 0.00001)
950
951
952 class ConsumerThread(Thread):
953
954 def __init__(self, queue, count):
955 Thread.__init__(self, name="Consumer")
956 self.queue = queue
957 self.count = count
958
959 def run(self):
960 while self.count > 0:
961 item = self.queue.get()
962 print item
963 self.count = self.count - 1
964
965 NP = 3
966 QL = 4
967 NI = 5
968
969 Q = BoundedQueue(QL)
970 P = []
971 for i in range(NP):
972 t = ProducerThread(Q, NI)
973 t.name = ("Producer-%d" % (i+1))
974 P.append(t)
975 C = ConsumerThread(Q, NI*NP)
976 for t in P:
977 t.start()
978 _sleep(0.000001)
979 C.start()
980 for t in P:
981 t.join()
982 C.join()
983
984 if __name__ == '__main__':
985 _test()
986