1 """Thread module emulating a subset of Java's threading model."""
2
3 import sys as _sys
4
5 try:
6 import thread
7 except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11 import warnings
12
13 from functools import wraps
14 from time import time as _time, sleep as _sleep
15 from traceback import format_exc as _format_exc
16 from collections import deque
17
18
19
20
21
22
23
24
25
26
27
28
29
30 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
31 'current_thread', 'enumerate', 'Event',
32 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34
35 _start_new_thread = thread.start_new_thread
36 _allocate_lock = thread.allocate_lock
37 _get_ident = thread.get_ident
38 ThreadError = thread.error
39 del thread
40
41
42
43
44 warnings.filterwarnings('ignore', category=DeprecationWarning,
45 module='threading', message='sys.exc_clear')
46
47
48
49
50
51
52
53 _VERBOSE = False
54
55 if __debug__:
58
60 if verbose is None:
61 verbose = _VERBOSE
62 self.__verbose = verbose
63
64 - def _note(self, format, *args):
70
71 else:
78
79
80
81 _profile_hook = None
82 _trace_hook = None
87
91
92
93
94 Lock = _allocate_lock
95
96 -def RLock(*args, **kwargs):
97 return _RLock(*args, **kwargs)
98
100
102 _Verbose.__init__(self, verbose)
103 self.__block = _allocate_lock()
104 self.__owner = None
105 self.__count = 0
106
108 owner = self.__owner
109 try:
110 owner = _active[owner].name
111 except KeyError:
112 pass
113 return "<%s owner=%r count=%d>" % (
114 self.__class__.__name__, owner, self.__count)
115
117 me = _get_ident()
118 if self.__owner == me:
119 self.__count = self.__count + 1
120 if __debug__:
121 self._note("%s.acquire(%s): recursive success", self, blocking)
122 return 1
123 rc = self.__block.acquire(blocking)
124 if rc:
125 self.__owner = me
126 self.__count = 1
127 if __debug__:
128 self._note("%s.acquire(%s): initial success", self, blocking)
129 else:
130 if __debug__:
131 self._note("%s.acquire(%s): failure", self, blocking)
132 return rc
133
134 __enter__ = acquire
135
137 if self.__owner != _get_ident():
138 raise RuntimeError("cannot release un-acquired lock")
139 self.__count = count = self.__count - 1
140 if not count:
141 self.__owner = None
142 self.__block.release()
143 if __debug__:
144 self._note("%s.release(): final release", self)
145 else:
146 if __debug__:
147 self._note("%s.release(): non-final release", self)
148
151
152
153
155 count, owner = count_owner
156 self.__block.acquire()
157 self.__count = count
158 self.__owner = owner
159 if __debug__:
160 self._note("%s._acquire_restore()", self)
161
163 if __debug__:
164 self._note("%s._release_save()", self)
165 count = self.__count
166 self.__count = 0
167 owner = self.__owner
168 self.__owner = None
169 self.__block.release()
170 return (count, owner)
171
173 return self.__owner == _get_ident()
174
178
180
181 - def __init__(self, lock=None, verbose=None):
182 _Verbose.__init__(self, verbose)
183 if lock is None:
184 lock = RLock()
185 self.__lock = lock
186
187 self.acquire = lock.acquire
188 self.release = lock.release
189
190
191
192 try:
193 self._release_save = lock._release_save
194 except AttributeError:
195 pass
196 try:
197 self._acquire_restore = lock._acquire_restore
198 except AttributeError:
199 pass
200 try:
201 self._is_owned = lock._is_owned
202 except AttributeError:
203 pass
204 self.__waiters = []
205
208
211
213 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
214
216 self.__lock.release()
217
219 self.__lock.acquire()
220
222
223
224 if self.__lock.acquire(0):
225 self.__lock.release()
226 return False
227 else:
228 return True
229
230 - def wait(self, timeout=None):
231 if not self._is_owned():
232 raise RuntimeError("cannot wait on un-acquired lock")
233 waiter = _allocate_lock()
234 waiter.acquire()
235 self.__waiters.append(waiter)
236 saved_state = self._release_save()
237 try:
238 if timeout is None:
239 waiter.acquire()
240 if __debug__:
241 self._note("%s.wait(): got it", self)
242 else:
243
244
245
246
247
248 endtime = _time() + timeout
249 delay = 0.0005
250 while True:
251 gotit = waiter.acquire(0)
252 if gotit:
253 break
254 remaining = endtime - _time()
255 if remaining <= 0:
256 break
257 delay = min(delay * 2, remaining, .05)
258 _sleep(delay)
259 if not gotit:
260 if __debug__:
261 self._note("%s.wait(%s): timed out", self, timeout)
262 try:
263 self.__waiters.remove(waiter)
264 except ValueError:
265 pass
266 else:
267 if __debug__:
268 self._note("%s.wait(%s): got it", self, timeout)
269 finally:
270 self._acquire_restore(saved_state)
271
273 if not self._is_owned():
274 raise RuntimeError("cannot notify on un-acquired lock")
275 __waiters = self.__waiters
276 waiters = __waiters[:n]
277 if not waiters:
278 if __debug__:
279 self._note("%s.notify(): no waiters", self)
280 return
281 self._note("%s.notify(): notifying %d waiter%s", self, n,
282 n!=1 and "s" or "")
283 for waiter in waiters:
284 waiter.release()
285 try:
286 __waiters.remove(waiter)
287 except ValueError:
288 pass
289
291 self.notify(len(self.__waiters))
292
293 notify_all = notifyAll
294
298
300
301
302
303 - def __init__(self, value=1, verbose=None):
304 if value < 0:
305 raise ValueError("semaphore initial value must be >= 0")
306 _Verbose.__init__(self, verbose)
307 self.__cond = Condition(Lock())
308 self.__value = value
309
311 rc = False
312 self.__cond.acquire()
313 while self.__value == 0:
314 if not blocking:
315 break
316 if __debug__:
317 self._note("%s.acquire(%s): blocked waiting, value=%s",
318 self, blocking, self.__value)
319 self.__cond.wait()
320 else:
321 self.__value = self.__value - 1
322 if __debug__:
323 self._note("%s.acquire: success, value=%s",
324 self, self.__value)
325 rc = True
326 self.__cond.release()
327 return rc
328
329 __enter__ = acquire
330
332 self.__cond.acquire()
333 self.__value = self.__value + 1
334 if __debug__:
335 self._note("%s.release: success, value=%s",
336 self, self.__value)
337 self.__cond.notify()
338 self.__cond.release()
339
342
346
348 """Semaphore that checks that # releases is <= # acquires"""
349 - def __init__(self, value=1, verbose=None):
352
354 if self._Semaphore__value >= self._initial_value:
355 raise ValueError, "Semaphore released too many times"
356 return _Semaphore.release(self)
357
358
359 -def Event(*args, **kwargs):
360 return _Event(*args, **kwargs)
361
363
364
365
367 _Verbose.__init__(self, verbose)
368 self.__cond = Condition(Lock())
369 self.__flag = False
370
373
374 is_set = isSet
375
377 self.__cond.acquire()
378 try:
379 self.__flag = True
380 self.__cond.notify_all()
381 finally:
382 self.__cond.release()
383
385 self.__cond.acquire()
386 try:
387 self.__flag = False
388 finally:
389 self.__cond.release()
390
391 - def wait(self, timeout=None):
392 self.__cond.acquire()
393 try:
394 if not self.__flag:
395 self.__cond.wait(timeout)
396 finally:
397 self.__cond.release()
398
399
400 _counter = 0
405
406
407 _active_limbo_lock = _allocate_lock()
408 _active = {}
409 _limbo = {}
410
411
412
413
414 -class Thread(_Verbose):
415
416 __initialized = False
417
418
419
420
421 __exc_info = _sys.exc_info
422
423
424 __exc_clear = _sys.exc_clear
425
426 - def __init__(self, group=None, target=None, name=None,
427 args=(), kwargs=None, verbose=None):
428 assert group is None, "group argument must be None for now"
429 _Verbose.__init__(self, verbose)
430 if kwargs is None:
431 kwargs = {}
432 self.__target = target
433 self.__name = str(name or _newname())
434 self.__args = args
435 self.__kwargs = kwargs
436 self.__daemonic = self._set_daemon()
437 self.__ident = None
438 self.__started = Event()
439 self.__stopped = False
440 self.__block = Condition(Lock())
441 self.__initialized = True
442
443
444 self.__stderr = _sys.stderr
445
447
448 return current_thread().daemon
449
451 assert self.__initialized, "Thread.__init__() was not called"
452 status = "initial"
453 if self.__started.is_set():
454 status = "started"
455 if self.__stopped:
456 status = "stopped"
457 if self.__daemonic:
458 status += " daemon"
459 if self.__ident is not None:
460 status += " %s" % self.__ident
461 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
462
464 if not self.__initialized:
465 raise RuntimeError("thread.__init__() not called")
466 if self.__started.is_set():
467 raise RuntimeError("thread already started")
468 if __debug__:
469 self._note("%s.start(): starting thread", self)
470 _active_limbo_lock.acquire()
471 _limbo[self] = self
472 _active_limbo_lock.release()
473 try:
474 _start_new_thread(self.__bootstrap, ())
475 except Exception:
476 with _active_limbo_lock:
477 del _limbo[self]
478 raise
479 self.__started.wait()
480
482 try:
483 if self.__target:
484 self.__target(*self.__args, **self.__kwargs)
485 finally:
486
487
488 del self.__target, self.__args, self.__kwargs
489
491
492
493
494
495
496
497
498
499
500
501
502
503 try:
504 self.__bootstrap_inner()
505 except:
506 if self.__daemonic and _sys is None:
507 return
508 raise
509
511 self.__ident = _get_ident()
512
514 try:
515 self._set_ident()
516 self.__started.set()
517 _active_limbo_lock.acquire()
518 _active[self.__ident] = self
519 del _limbo[self]
520 _active_limbo_lock.release()
521 if __debug__:
522 self._note("%s.__bootstrap(): thread started", self)
523
524 if _trace_hook:
525 self._note("%s.__bootstrap(): registering trace hook", self)
526 _sys.settrace(_trace_hook)
527 if _profile_hook:
528 self._note("%s.__bootstrap(): registering profile hook", self)
529 _sys.setprofile(_profile_hook)
530
531 try:
532 self.run()
533 except SystemExit:
534 if __debug__:
535 self._note("%s.__bootstrap(): raised SystemExit", self)
536 except:
537 if __debug__:
538 self._note("%s.__bootstrap(): unhandled exception", self)
539
540
541
542
543 if _sys:
544 _sys.stderr.write("Exception in thread %s:\n%s\n" %
545 (self.name, _format_exc()))
546 else:
547
548
549
550 exc_type, exc_value, exc_tb = self.__exc_info()
551 try:
552 print>>self.__stderr, (
553 "Exception in thread " + self.name +
554 " (most likely raised during interpreter shutdown):")
555 print>>self.__stderr, (
556 "Traceback (most recent call last):")
557 while exc_tb:
558 print>>self.__stderr, (
559 ' File "%s", line %s, in %s' %
560 (exc_tb.tb_frame.f_code.co_filename,
561 exc_tb.tb_lineno,
562 exc_tb.tb_frame.f_code.co_name))
563 exc_tb = exc_tb.tb_next
564 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
565
566
567 finally:
568 del exc_type, exc_value, exc_tb
569 else:
570 if __debug__:
571 self._note("%s.__bootstrap(): normal return", self)
572 finally:
573
574
575
576
577 self.__exc_clear()
578 finally:
579 with _active_limbo_lock:
580 self.__stop()
581 try:
582
583
584 del _active[_get_ident()]
585 except:
586 pass
587
589 self.__block.acquire()
590 self.__stopped = True
591 self.__block.notify_all()
592 self.__block.release()
593
595 "Remove current thread from the dict of currently running threads."
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618 try:
619 with _active_limbo_lock:
620 del _active[_get_ident()]
621
622
623
624
625 except KeyError:
626 if 'dummy_threading' not in _sys.modules:
627 raise
628
629 - def join(self, timeout=None):
630 if not self.__initialized:
631 raise RuntimeError("Thread.__init__() not called")
632 if not self.__started.is_set():
633 raise RuntimeError("cannot join thread before it is started")
634 if self is current_thread():
635 raise RuntimeError("cannot join current thread")
636
637 if __debug__:
638 if not self.__stopped:
639 self._note("%s.join(): waiting until thread stops", self)
640 self.__block.acquire()
641 try:
642 if timeout is None:
643 while not self.__stopped:
644 self.__block.wait()
645 if __debug__:
646 self._note("%s.join(): thread stopped", self)
647 else:
648 deadline = _time() + timeout
649 while not self.__stopped:
650 delay = deadline - _time()
651 if delay <= 0:
652 if __debug__:
653 self._note("%s.join(): timed out", self)
654 break
655 self.__block.wait(delay)
656 else:
657 if __debug__:
658 self._note("%s.join(): thread stopped", self)
659 finally:
660 self.__block.release()
661
662 @property
664 assert self.__initialized, "Thread.__init__() not called"
665 return self.__name
666
667 @name.setter
668 - def name(self, name):
669 assert self.__initialized, "Thread.__init__() not called"
670 self.__name = str(name)
671
672 @property
674 assert self.__initialized, "Thread.__init__() not called"
675 return self.__ident
676
678 assert self.__initialized, "Thread.__init__() not called"
679 return self.__started.is_set() and not self.__stopped
680
681 is_alive = isAlive
682
683 @property
685 assert self.__initialized, "Thread.__init__() not called"
686 return self.__daemonic
687
688 @daemon.setter
690 if not self.__initialized:
691 raise RuntimeError("Thread.__init__() not called")
692 if self.__started.is_set():
693 raise RuntimeError("cannot set daemon status of active thread");
694 self.__daemonic = daemonic
695
698
700 self.daemon = daemonic
701
704
707
708
709
710 -def Timer(*args, **kwargs):
711 return _Timer(*args, **kwargs)
712
714 """Call a function after a specified number of seconds:
715
716 t = Timer(30.0, f, args=[], kwargs={})
717 t.start()
718 t.cancel() # stop the timer's action if it's still waiting
719 """
720
721 - def __init__(self, interval, function, args=[], kwargs={}):
722 Thread.__init__(self)
723 self.interval = interval
724 self.function = function
725 self.args = args
726 self.kwargs = kwargs
727 self.finished = Event()
728
730 """Stop the timer if it hasn't finished yet"""
731 self.finished.set()
732
734 self.finished.wait(self.interval)
735 if not self.finished.is_set():
736 self.function(*self.args, **self.kwargs)
737 self.finished.set()
738
739
740
741
742 -class _MainThread(Thread):
743
744 - def __init__(self):
745 Thread.__init__(self, name="MainThread")
746 self._Thread__started.set()
747 self._set_ident()
748 _active_limbo_lock.acquire()
749 _active[_get_ident()] = self
750 _active_limbo_lock.release()
751
752 - def _set_daemon(self):
754
755 - def _exitfunc(self):
756 self._Thread__stop()
757 t = _pickSomeNonDaemonThread()
758 if t:
759 if __debug__:
760 self._note("%s: waiting for other threads", self)
761 while t:
762 t.join()
763 t = _pickSomeNonDaemonThread()
764 if __debug__:
765 self._note("%s: exiting", self)
766 self._Thread__delete()
767
769 for t in enumerate():
770 if not t.daemon and t.is_alive():
771 return t
772 return None
773
774
775
776
777
778
779
780
781
782
783 -class _DummyThread(Thread):
784
798
801
802 - def join(self, timeout=None):
803 assert False, "cannot join a dummy thread"
804
814
815 current_thread = currentThread
822
823 active_count = activeCount
828
834
835 from thread import stack_size
836
837
838
839
840
841 _shutdown = _MainThread()._exitfunc
842
843
844
845
846 try:
847 from thread import _local as local
848 except ImportError:
849 from _threading_local import local
853
854
855
856
857
858
859 global _active_limbo_lock
860 _active_limbo_lock = _allocate_lock()
861
862
863 new_active = {}
864 current = current_thread()
865 with _active_limbo_lock:
866 for thread in _active.itervalues():
867 if thread is current:
868
869
870 ident = _get_ident()
871 thread._Thread__ident = ident
872 new_active[ident] = thread
873 else:
874
875
876
877
878 thread._Thread__stopped = True
879
880 _limbo.clear()
881 _active.clear()
882 _active.update(new_active)
883 assert len(_active) == 1
884
889
890 class BoundedQueue(_Verbose):
891
892 def __init__(self, limit):
893 _Verbose.__init__(self)
894 self.mon = RLock()
895 self.rc = Condition(self.mon)
896 self.wc = Condition(self.mon)
897 self.limit = limit
898 self.queue = deque()
899
900 def put(self, item):
901 self.mon.acquire()
902 while len(self.queue) >= self.limit:
903 self._note("put(%s): queue full", item)
904 self.wc.wait()
905 self.queue.append(item)
906 self._note("put(%s): appended, length now %d",
907 item, len(self.queue))
908 self.rc.notify()
909 self.mon.release()
910
911 def get(self):
912 self.mon.acquire()
913 while not self.queue:
914 self._note("get(): queue empty")
915 self.rc.wait()
916 item = self.queue.popleft()
917 self._note("get(): got %s, %d left", item, len(self.queue))
918 self.wc.notify()
919 self.mon.release()
920 return item
921
922 class ProducerThread(Thread):
923
924 def __init__(self, queue, quota):
925 Thread.__init__(self, name="Producer")
926 self.queue = queue
927 self.quota = quota
928
929 def run(self):
930 from random import random
931 counter = 0
932 while counter < self.quota:
933 counter = counter + 1
934 self.queue.put("%s.%d" % (self.name, counter))
935 _sleep(random() * 0.00001)
936
937
938 class ConsumerThread(Thread):
939
940 def __init__(self, queue, count):
941 Thread.__init__(self, name="Consumer")
942 self.queue = queue
943 self.count = count
944
945 def run(self):
946 while self.count > 0:
947 item = self.queue.get()
948 print item
949 self.count = self.count - 1
950
951 NP = 3
952 QL = 4
953 NI = 5
954
955 Q = BoundedQueue(QL)
956 P = []
957 for i in range(NP):
958 t = ProducerThread(Q, NI)
959 t.name = ("Producer-%d" % (i+1))
960 P.append(t)
961 C = ConsumerThread(Q, NI*NP)
962 for t in P:
963 t.start()
964 _sleep(0.000001)
965 C.start()
966 for t in P:
967 t.join()
968 C.join()
969
970 if __name__ == '__main__':
971 _test()
972