1  """Thread module emulating a subset of Java's threading model.""" 
  2   
  3  import sys as _sys 
  4   
  5  try: 
  6      import thread 
  7  except ImportError: 
  8      del _sys.modules[__name__] 
  9      raise 
 10   
 11  import warnings 
 12   
 13  from time import time as _time, sleep as _sleep 
 14  from traceback import format_exc as _format_exc 
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28  __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 
 29             'current_thread', 'enumerate', 'Event', 
 30             'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 
 31             'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 
 32   
 33  _start_new_thread = thread.start_new_thread 
 34  _allocate_lock = thread.allocate_lock 
 35  _get_ident = thread.get_ident 
 36  ThreadError = thread.error 
 37  del thread 
 38   
 39   
 40   
 41   
 42  warnings.filterwarnings('ignore', category=DeprecationWarning, 
 43                          module='threading', message='sys.exc_clear') 
 44   
 45   
 46   
 47   
 48   
 49   
 50   
 51  _VERBOSE = False 
 52   
 53  if __debug__: 
 56   
 58              if verbose is None: 
 59                  verbose = _VERBOSE 
 60              self.__verbose = verbose 
  61   
 62 -        def _note(self, format, *args): 
   74   
 75  else: 
 82   
 83   
 84   
 85  _profile_hook = None 
 86  _trace_hook = None 
 91   
 95   
 96   
 97   
 98  Lock = _allocate_lock 
 99   
100 -def RLock(*args, **kwargs): 
 101      return _RLock(*args, **kwargs) 
 102   
104   
106          _Verbose.__init__(self, verbose) 
107          self.__block = _allocate_lock() 
108          self.__owner = None 
109          self.__count = 0 
 110   
112          owner = self.__owner 
113          try: 
114              owner = _active[owner].name 
115          except KeyError: 
116              pass 
117          return "<%s owner=%r count=%d>" % ( 
118                  self.__class__.__name__, owner, self.__count) 
 119   
121          me = _get_ident() 
122          if self.__owner == me: 
123              self.__count = self.__count + 1 
124              if __debug__: 
125                  self._note("%s.acquire(%s): recursive success", self, blocking) 
126              return 1 
127          rc = self.__block.acquire(blocking) 
128          if rc: 
129              self.__owner = me 
130              self.__count = 1 
131              if __debug__: 
132                  self._note("%s.acquire(%s): initial success", self, blocking) 
133          else: 
134              if __debug__: 
135                  self._note("%s.acquire(%s): failure", self, blocking) 
136          return rc 
 137   
138      __enter__ = acquire 
139   
141          if self.__owner != _get_ident(): 
142              raise RuntimeError("cannot release un-acquired lock") 
143          self.__count = count = self.__count - 1 
144          if not count: 
145              self.__owner = None 
146              self.__block.release() 
147              if __debug__: 
148                  self._note("%s.release(): final release", self) 
149          else: 
150              if __debug__: 
151                  self._note("%s.release(): non-final release", self) 
 152   
155   
156       
157   
159          count, owner = count_owner 
160          self.__block.acquire() 
161          self.__count = count 
162          self.__owner = owner 
163          if __debug__: 
164              self._note("%s._acquire_restore()", self) 
 165   
167          if __debug__: 
168              self._note("%s._release_save()", self) 
169          count = self.__count 
170          self.__count = 0 
171          owner = self.__owner 
172          self.__owner = None 
173          self.__block.release() 
174          return (count, owner) 
 175   
177          return self.__owner == _get_ident() 
  178   
182   
184   
185 -    def __init__(self, lock=None, verbose=None): 
 186          _Verbose.__init__(self, verbose) 
187          if lock is None: 
188              lock = RLock() 
189          self.__lock = lock 
190           
191          self.acquire = lock.acquire 
192          self.release = lock.release 
193           
194           
195           
196          try: 
197              self._release_save = lock._release_save 
198          except AttributeError: 
199              pass 
200          try: 
201              self._acquire_restore = lock._acquire_restore 
202          except AttributeError: 
203              pass 
204          try: 
205              self._is_owned = lock._is_owned 
206          except AttributeError: 
207              pass 
208          self.__waiters = [] 
 209   
212   
215   
217          return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) 
 218   
220          self.__lock.release()            
 221   
223          self.__lock.acquire()            
 224   
226           
227           
228          if self.__lock.acquire(0): 
229              self.__lock.release() 
230              return False 
231          else: 
232              return True 
 233   
234 -    def wait(self, timeout=None): 
 235          if not self._is_owned(): 
236              raise RuntimeError("cannot wait on un-acquired lock") 
237          waiter = _allocate_lock() 
238          waiter.acquire() 
239          self.__waiters.append(waiter) 
240          saved_state = self._release_save() 
241          try:     
242              if timeout is None: 
243                  waiter.acquire() 
244                  if __debug__: 
245                      self._note("%s.wait(): got it", self) 
246              else: 
247                   
248                   
249                   
250                   
251                   
252                  endtime = _time() + timeout 
253                  delay = 0.0005  
254                  while True: 
255                      gotit = waiter.acquire(0) 
256                      if gotit: 
257                          break 
258                      remaining = endtime - _time() 
259                      if remaining <= 0: 
260                          break 
261                      delay = min(delay * 2, remaining, .05) 
262                      _sleep(delay) 
263                  if not gotit: 
264                      if __debug__: 
265                          self._note("%s.wait(%s): timed out", self, timeout) 
266                      try: 
267                          self.__waiters.remove(waiter) 
268                      except ValueError: 
269                          pass 
270                  else: 
271                      if __debug__: 
272                          self._note("%s.wait(%s): got it", self, timeout) 
273          finally: 
274              self._acquire_restore(saved_state) 
 275   
277          if not self._is_owned(): 
278              raise RuntimeError("cannot notify on un-acquired lock") 
279          __waiters = self.__waiters 
280          waiters = __waiters[:n] 
281          if not waiters: 
282              if __debug__: 
283                  self._note("%s.notify(): no waiters", self) 
284              return 
285          self._note("%s.notify(): notifying %d waiter%s", self, n, 
286                     n!=1 and "s" or "") 
287          for waiter in waiters: 
288              waiter.release() 
289              try: 
290                  __waiters.remove(waiter) 
291              except ValueError: 
292                  pass 
 293   
295          self.notify(len(self.__waiters)) 
 296   
297      notify_all = notifyAll 
 298   
302   
304   
305       
306   
307 -    def __init__(self, value=1, verbose=None): 
 308          if value < 0: 
309              raise ValueError("semaphore initial value must be >= 0") 
310          _Verbose.__init__(self, verbose) 
311          self.__cond = Condition(Lock()) 
312          self.__value = value 
 313   
315          rc = False 
316          self.__cond.acquire() 
317          while self.__value == 0: 
318              if not blocking: 
319                  break 
320              if __debug__: 
321                  self._note("%s.acquire(%s): blocked waiting, value=%s", 
322                             self, blocking, self.__value) 
323              self.__cond.wait() 
324          else: 
325              self.__value = self.__value - 1 
326              if __debug__: 
327                  self._note("%s.acquire: success, value=%s", 
328                             self, self.__value) 
329              rc = True 
330          self.__cond.release() 
331          return rc 
 332   
333      __enter__ = acquire 
334   
336          self.__cond.acquire() 
337          self.__value = self.__value + 1 
338          if __debug__: 
339              self._note("%s.release: success, value=%s", 
340                         self, self.__value) 
341          self.__cond.notify() 
342          self.__cond.release() 
 343   
 346   
350   
352      """Semaphore that checks that # releases is <= # acquires""" 
353 -    def __init__(self, value=1, verbose=None): 
 356   
358          if self._Semaphore__value >= self._initial_value: 
359              raise ValueError, "Semaphore released too many times" 
360          return _Semaphore.release(self) 
  361   
362   
363 -def Event(*args, **kwargs): 
 364      return _Event(*args, **kwargs) 
 365   
367   
368       
369   
371          _Verbose.__init__(self, verbose) 
372          self.__cond = Condition(Lock()) 
373          self.__flag = False 
 374   
378   
381   
382      is_set = isSet 
383   
385          self.__cond.acquire() 
386          try: 
387              self.__flag = True 
388              self.__cond.notify_all() 
389          finally: 
390              self.__cond.release() 
 391   
393          self.__cond.acquire() 
394          try: 
395              self.__flag = False 
396          finally: 
397              self.__cond.release() 
 398   
399 -    def wait(self, timeout=None): 
 400          self.__cond.acquire() 
401          try: 
402              if not self.__flag: 
403                  self.__cond.wait(timeout) 
404              return self.__flag 
405          finally: 
406              self.__cond.release() 
  407   
408   
409  _counter = 0 
414   
415   
416  _active_limbo_lock = _allocate_lock() 
417  _active = {}     
418  _limbo = {} 
419   
420   
421   
422   
423 -class Thread(_Verbose): 
 424   
425      __initialized = False 
426       
427       
428       
429       
430      __exc_info = _sys.exc_info 
431       
432       
433      __exc_clear = _sys.exc_clear 
434   
435 -    def __init__(self, group=None, target=None, name=None, 
436                   args=(), kwargs=None, verbose=None): 
 437          assert group is None, "group argument must be None for now" 
438          _Verbose.__init__(self, verbose) 
439          if kwargs is None: 
440              kwargs = {} 
441          self.__target = target 
442          self.__name = str(name or _newname()) 
443          self.__args = args 
444          self.__kwargs = kwargs 
445          self.__daemonic = self._set_daemon() 
446          self.__ident = None 
447          self.__started = Event() 
448          self.__stopped = False 
449          self.__block = Condition(Lock()) 
450          self.__initialized = True 
451           
452           
453          self.__stderr = _sys.stderr 
 454   
456           
457           
458          if hasattr(self, '_Thread__block'):   
459              self.__block.__init__() 
460          self.__started._reset_internal_locks() 
 461   
462      @property 
464           
465          return self.__block 
 466   
468           
469          return current_thread().daemon 
 470   
472          assert self.__initialized, "Thread.__init__() was not called" 
473          status = "initial" 
474          if self.__started.is_set(): 
475              status = "started" 
476          if self.__stopped: 
477              status = "stopped" 
478          if self.__daemonic: 
479              status += " daemon" 
480          if self.__ident is not None: 
481              status += " %s" % self.__ident 
482          return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) 
 483   
485          if not self.__initialized: 
486              raise RuntimeError("thread.__init__() not called") 
487          if self.__started.is_set(): 
488              raise RuntimeError("threads can only be started once") 
489          if __debug__: 
490              self._note("%s.start(): starting thread", self) 
491          with _active_limbo_lock: 
492              _limbo[self] = self 
493          try: 
494              _start_new_thread(self.__bootstrap, ()) 
495          except Exception: 
496              with _active_limbo_lock: 
497                  del _limbo[self] 
498              raise 
499          self.__started.wait() 
 500   
502          try: 
503              if self.__target: 
504                  self.__target(*self.__args, **self.__kwargs) 
505          finally: 
506               
507               
508              del self.__target, self.__args, self.__kwargs 
 509   
511           
512           
513           
514           
515           
516           
517           
518           
519           
520           
521           
522           
523          try: 
524              self.__bootstrap_inner() 
525          except: 
526              if self.__daemonic and _sys is None: 
527                  return 
528              raise 
 529   
531          self.__ident = _get_ident() 
 532   
534          try: 
535              self._set_ident() 
536              self.__started.set() 
537              with _active_limbo_lock: 
538                  _active[self.__ident] = self 
539                  del _limbo[self] 
540              if __debug__: 
541                  self._note("%s.__bootstrap(): thread started", self) 
542   
543              if _trace_hook: 
544                  self._note("%s.__bootstrap(): registering trace hook", self) 
545                  _sys.settrace(_trace_hook) 
546              if _profile_hook: 
547                  self._note("%s.__bootstrap(): registering profile hook", self) 
548                  _sys.setprofile(_profile_hook) 
549   
550              try: 
551                  self.run() 
552              except SystemExit: 
553                  if __debug__: 
554                      self._note("%s.__bootstrap(): raised SystemExit", self) 
555              except: 
556                  if __debug__: 
557                      self._note("%s.__bootstrap(): unhandled exception", self) 
558                   
559                   
560                   
561                   
562                  if _sys: 
563                      _sys.stderr.write("Exception in thread %s:\n%s\n" % 
564                                        (self.name, _format_exc())) 
565                  else: 
566                       
567                       
568                       
569                      exc_type, exc_value, exc_tb = self.__exc_info() 
570                      try: 
571                          print>>self.__stderr, ( 
572                              "Exception in thread " + self.name + 
573                              " (most likely raised during interpreter shutdown):") 
574                          print>>self.__stderr, ( 
575                              "Traceback (most recent call last):") 
576                          while exc_tb: 
577                              print>>self.__stderr, ( 
578                                  '  File "%s", line %s, in %s' % 
579                                  (exc_tb.tb_frame.f_code.co_filename, 
580                                      exc_tb.tb_lineno, 
581                                      exc_tb.tb_frame.f_code.co_name)) 
582                              exc_tb = exc_tb.tb_next 
583                          print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 
584                       
585                       
586                      finally: 
587                          del exc_type, exc_value, exc_tb 
588              else: 
589                  if __debug__: 
590                      self._note("%s.__bootstrap(): normal return", self) 
591              finally: 
592                   
593                   
594                   
595                   
596                  self.__exc_clear() 
597          finally: 
598              with _active_limbo_lock: 
599                  self.__stop() 
600                  try: 
601                       
602                       
603                      del _active[_get_ident()] 
604                  except: 
605                      pass 
 606   
608          self.__block.acquire() 
609          self.__stopped = True 
610          self.__block.notify_all() 
611          self.__block.release() 
 612   
614          "Remove current thread from the dict of currently running threads." 
615   
616           
617           
618           
619           
620           
621           
622           
623           
624           
625           
626           
627           
628           
629           
630           
631           
632           
633           
634           
635           
636   
637          try: 
638              with _active_limbo_lock: 
639                  del _active[_get_ident()] 
640                   
641                   
642                   
643                   
644          except KeyError: 
645              if 'dummy_threading' not in _sys.modules: 
646                  raise 
 647   
648 -    def join(self, timeout=None): 
 649          if not self.__initialized: 
650              raise RuntimeError("Thread.__init__() not called") 
651          if not self.__started.is_set(): 
652              raise RuntimeError("cannot join thread before it is started") 
653          if self is current_thread(): 
654              raise RuntimeError("cannot join current thread") 
655   
656          if __debug__: 
657              if not self.__stopped: 
658                  self._note("%s.join(): waiting until thread stops", self) 
659          self.__block.acquire() 
660          try: 
661              if timeout is None: 
662                  while not self.__stopped: 
663                      self.__block.wait() 
664                  if __debug__: 
665                      self._note("%s.join(): thread stopped", self) 
666              else: 
667                  deadline = _time() + timeout 
668                  while not self.__stopped: 
669                      delay = deadline - _time() 
670                      if delay <= 0: 
671                          if __debug__: 
672                              self._note("%s.join(): timed out", self) 
673                          break 
674                      self.__block.wait(delay) 
675                  else: 
676                      if __debug__: 
677                          self._note("%s.join(): thread stopped", self) 
678          finally: 
679              self.__block.release() 
 680   
681      @property 
683          assert self.__initialized, "Thread.__init__() not called" 
684          return self.__name 
 685   
686      @name.setter 
687 -    def name(self, name): 
 688          assert self.__initialized, "Thread.__init__() not called" 
689          self.__name = str(name) 
 690   
691      @property 
693          assert self.__initialized, "Thread.__init__() not called" 
694          return self.__ident 
 695   
697          assert self.__initialized, "Thread.__init__() not called" 
698          return self.__started.is_set() and not self.__stopped 
 699   
700      is_alive = isAlive 
701   
702      @property 
704          assert self.__initialized, "Thread.__init__() not called" 
705          return self.__daemonic 
 706   
707      @daemon.setter 
709          if not self.__initialized: 
710              raise RuntimeError("Thread.__init__() not called") 
711          if self.__started.is_set(): 
712              raise RuntimeError("cannot set daemon status of active thread"); 
713          self.__daemonic = daemonic 
 714   
717   
719          self.daemon = daemonic 
 720   
723   
 726   
727   
728   
729 -def Timer(*args, **kwargs): 
 730      return _Timer(*args, **kwargs) 
 731   
733      """Call a function after a specified number of seconds: 
734   
735      t = Timer(30.0, f, args=[], kwargs={}) 
736      t.start() 
737      t.cancel() # stop the timer's action if it's still waiting 
738      """ 
739   
740 -    def __init__(self, interval, function, args=[], kwargs={}): 
 741          Thread.__init__(self) 
742          self.interval = interval 
743          self.function = function 
744          self.args = args 
745          self.kwargs = kwargs 
746          self.finished = Event() 
 747   
749          """Stop the timer if it hasn't finished yet""" 
750          self.finished.set() 
 751   
753          self.finished.wait(self.interval) 
754          if not self.finished.is_set(): 
755              self.function(*self.args, **self.kwargs) 
756          self.finished.set() 
  757   
758   
759   
760   
761 -class _MainThread(Thread): 
 762   
763 -    def __init__(self): 
 764          Thread.__init__(self, name="MainThread") 
765          self._Thread__started.set() 
766          self._set_ident() 
767          with _active_limbo_lock: 
768              _active[_get_ident()] = self 
 769   
770 -    def _set_daemon(self): 
 772   
773 -    def _exitfunc(self): 
 774          self._Thread__stop() 
775          t = _pickSomeNonDaemonThread() 
776          if t: 
777              if __debug__: 
778                  self._note("%s: waiting for other threads", self) 
779          while t: 
780              t.join() 
781              t = _pickSomeNonDaemonThread() 
782          if __debug__: 
783              self._note("%s: exiting", self) 
784          self._Thread__delete() 
  785   
787      for t in enumerate(): 
788          if not t.daemon and t.is_alive(): 
789              return t 
790      return None 
 791   
792   
793   
794   
795   
796   
797   
798   
799   
800   
801 -class _DummyThread(Thread): 
 802   
815   
818   
819 -    def join(self, timeout=None): 
 820          assert False, "cannot join a dummy thread" 
  821   
831   
832  current_thread = currentThread 
837   
838  active_count = activeCount 
843   
847   
848  from thread import stack_size 
849   
850   
851   
852   
853   
854  _shutdown = _MainThread()._exitfunc 
855   
856   
857   
858   
859  try: 
860      from thread import _local as local 
861  except ImportError: 
862      from _threading_local import local 
866       
867       
868       
869   
870       
871       
872      global _active_limbo_lock 
873      _active_limbo_lock = _allocate_lock() 
874   
875       
876      new_active = {} 
877      current = current_thread() 
878      with _active_limbo_lock: 
879          for thread in _active.itervalues(): 
880               
881               
882              if hasattr(thread, '_reset_internal_locks'): 
883                  thread._reset_internal_locks() 
884              if thread is current: 
885                   
886                   
887                  ident = _get_ident() 
888                  thread._Thread__ident = ident 
889                  new_active[ident] = thread 
890              else: 
891                   
892                  thread._Thread__stop() 
893   
894          _limbo.clear() 
895          _active.clear() 
896          _active.update(new_active) 
897          assert len(_active) == 1 
 898   
903   
904      class BoundedQueue(_Verbose): 
905   
906          def __init__(self, limit): 
907              _Verbose.__init__(self) 
908              self.mon = RLock() 
909              self.rc = Condition(self.mon) 
910              self.wc = Condition(self.mon) 
911              self.limit = limit 
912              self.queue = deque() 
 913   
914          def put(self, item): 
915              self.mon.acquire() 
916              while len(self.queue) >= self.limit: 
917                  self._note("put(%s): queue full", item) 
918                  self.wc.wait() 
919              self.queue.append(item) 
920              self._note("put(%s): appended, length now %d", 
921                         item, len(self.queue)) 
922              self.rc.notify() 
923              self.mon.release() 
924   
925          def get(self): 
926              self.mon.acquire() 
927              while not self.queue: 
928                  self._note("get(): queue empty") 
929                  self.rc.wait() 
930              item = self.queue.popleft() 
931              self._note("get(): got %s, %d left", item, len(self.queue)) 
932              self.wc.notify() 
933              self.mon.release() 
934              return item 
935   
936      class ProducerThread(Thread): 
937   
938          def __init__(self, queue, quota): 
939              Thread.__init__(self, name="Producer") 
940              self.queue = queue 
941              self.quota = quota 
942   
943          def run(self): 
944              from random import random 
945              counter = 0 
946              while counter < self.quota: 
947                  counter = counter + 1 
948                  self.queue.put("%s.%d" % (self.name, counter)) 
949                  _sleep(random() * 0.00001) 
950   
951   
952      class ConsumerThread(Thread): 
953   
954          def __init__(self, queue, count): 
955              Thread.__init__(self, name="Consumer") 
956              self.queue = queue 
957              self.count = count 
958   
959          def run(self): 
960              while self.count > 0: 
961                  item = self.queue.get() 
962                  print item 
963                  self.count = self.count - 1 
964   
965      NP = 3 
966      QL = 4 
967      NI = 5 
968   
969      Q = BoundedQueue(QL) 
970      P = [] 
971      for i in range(NP): 
972          t = ProducerThread(Q, NI) 
973          t.name = ("Producer-%d" % (i+1)) 
974          P.append(t) 
975      C = ConsumerThread(Q, NI*NP) 
976      for t in P: 
977          t.start() 
978          _sleep(0.000001) 
979      C.start() 
980      for t in P: 
981          t.join() 
982      C.join() 
983   
984  if __name__ == '__main__': 
985      _test() 
986