You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

threading.py 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. """Proposed new threading module, emulating a subset of Java's threading model."""
  2. import sys
  3. import time
  4. import thread
  5. import traceback
  6. import StringIO
  7. # Rename some stuff so "from threading import *" is safe
  8. _sys = sys
  9. del sys
  10. _time = time.time
  11. _sleep = time.sleep
  12. del time
  13. _start_new_thread = thread.start_new_thread
  14. _allocate_lock = thread.allocate_lock
  15. _get_ident = thread.get_ident
  16. ThreadError = thread.error
  17. del thread
  18. _print_exc = traceback.print_exc
  19. del traceback
  20. _StringIO = StringIO.StringIO
  21. del StringIO
  22. # Debug support (adapted from ihooks.py)
  23. _VERBOSE = 0
  24. if __debug__:
  25. class _Verbose:
  26. def __init__(self, verbose=None):
  27. if verbose is None:
  28. verbose = _VERBOSE
  29. self.__verbose = verbose
  30. def _note(self, format, *args):
  31. if self.__verbose:
  32. format = format % args
  33. format = "%s: %s\n" % (
  34. currentThread().getName(), format)
  35. _sys.stderr.write(format)
  36. else:
  37. # Disable this when using "python -O"
  38. class _Verbose:
  39. def __init__(self, verbose=None):
  40. pass
  41. def _note(self, *args):
  42. pass
  43. # Synchronization classes
  44. Lock = _allocate_lock
  45. def RLock(*args, **kwargs):
  46. return apply(_RLock, args, kwargs)
  47. class _RLock(_Verbose):
  48. def __init__(self, verbose=None):
  49. _Verbose.__init__(self, verbose)
  50. self.__block = _allocate_lock()
  51. self.__owner = None
  52. self.__count = 0
  53. def __repr__(self):
  54. return "<%s(%s, %d)>" % (
  55. self.__class__.__name__,
  56. self.__owner and self.__owner.getName(),
  57. self.__count)
  58. def acquire(self, blocking=1):
  59. me = currentThread()
  60. if self.__owner is me:
  61. self.__count = self.__count + 1
  62. if __debug__:
  63. self._note("%s.acquire(%s): recursive success", self, blocking)
  64. return 1
  65. rc = self.__block.acquire(blocking)
  66. if rc:
  67. self.__owner = me
  68. self.__count = 1
  69. if __debug__:
  70. self._note("%s.acquire(%s): initial succes", self, blocking)
  71. else:
  72. if __debug__:
  73. self._note("%s.acquire(%s): failure", self, blocking)
  74. return rc
  75. def release(self):
  76. me = currentThread()
  77. assert self.__owner is me, "release() of un-acquire()d lock"
  78. self.__count = count = self.__count - 1
  79. if not count:
  80. self.__owner = None
  81. self.__block.release()
  82. if __debug__:
  83. self._note("%s.release(): final release", self)
  84. else:
  85. if __debug__:
  86. self._note("%s.release(): non-final release", self)
  87. # Internal methods used by condition variables
  88. def _acquire_restore(self, (count, owner)):
  89. self.__block.acquire()
  90. self.__count = count
  91. self.__owner = owner
  92. if __debug__:
  93. self._note("%s._acquire_restore()", self)
  94. def _release_save(self):
  95. if __debug__:
  96. self._note("%s._release_save()", self)
  97. count = self.__count
  98. self.__count = 0
  99. owner = self.__owner
  100. self.__owner = None
  101. self.__block.release()
  102. return (count, owner)
  103. def _is_owned(self):
  104. return self.__owner is currentThread()
  105. def Condition(*args, **kwargs):
  106. return apply(_Condition, args, kwargs)
  107. class _Condition(_Verbose):
  108. def __init__(self, lock=None, verbose=None):
  109. _Verbose.__init__(self, verbose)
  110. if lock is None:
  111. lock = RLock()
  112. self.__lock = lock
  113. # Export the lock's acquire() and release() methods
  114. self.acquire = lock.acquire
  115. self.release = lock.release
  116. # If the lock defines _release_save() and/or _acquire_restore(),
  117. # these override the default implementations (which just call
  118. # release() and acquire() on the lock). Ditto for _is_owned().
  119. try:
  120. self._release_save = lock._release_save
  121. except AttributeError:
  122. pass
  123. try:
  124. self._acquire_restore = lock._acquire_restore
  125. except AttributeError:
  126. pass
  127. try:
  128. self._is_owned = lock._is_owned
  129. except AttributeError:
  130. pass
  131. self.__waiters = []
  132. def __repr__(self):
  133. return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
  134. def _release_save(self):
  135. self.__lock.release() # No state to save
  136. def _acquire_restore(self, x):
  137. self.__lock.acquire() # Ignore saved state
  138. def _is_owned(self):
  139. if self.__lock.acquire(0):
  140. self.__lock.release()
  141. return 0
  142. else:
  143. return 1
  144. def wait(self, timeout=None):
  145. me = currentThread()
  146. assert self._is_owned(), "wait() of un-acquire()d lock"
  147. waiter = _allocate_lock()
  148. waiter.acquire()
  149. self.__waiters.append(waiter)
  150. saved_state = self._release_save()
  151. try: # restore state no matter what (e.g., KeyboardInterrupt)
  152. if timeout is None:
  153. waiter.acquire()
  154. if __debug__:
  155. self._note("%s.wait(): got it", self)
  156. else:
  157. endtime = _time() + timeout
  158. delay = 0.000001 # 1 usec
  159. while 1:
  160. gotit = waiter.acquire(0)
  161. if gotit or _time() >= endtime:
  162. break
  163. _sleep(delay)
  164. if delay < 1.0:
  165. delay = delay * 2.0
  166. if not gotit:
  167. if __debug__:
  168. self._note("%s.wait(%s): timed out", self, timeout)
  169. try:
  170. self.__waiters.remove(waiter)
  171. except ValueError:
  172. pass
  173. else:
  174. if __debug__:
  175. self._note("%s.wait(%s): got it", self, timeout)
  176. finally:
  177. self._acquire_restore(saved_state)
  178. def notify(self, n=1):
  179. me = currentThread()
  180. assert self._is_owned(), "notify() of un-acquire()d lock"
  181. __waiters = self.__waiters
  182. waiters = __waiters[:n]
  183. if not waiters:
  184. if __debug__:
  185. self._note("%s.notify(): no waiters", self)
  186. return
  187. self._note("%s.notify(): notifying %d waiter%s", self, n,
  188. n!=1 and "s" or "")
  189. for waiter in waiters:
  190. waiter.release()
  191. try:
  192. __waiters.remove(waiter)
  193. except ValueError:
  194. pass
  195. def notifyAll(self):
  196. self.notify(len(self.__waiters))
  197. def Semaphore(*args, **kwargs):
  198. return apply(_Semaphore, args, kwargs)
  199. class _Semaphore(_Verbose):
  200. # After Tim Peters' semaphore class, but not quite the same (no maximum)
  201. def __init__(self, value=1, verbose=None):
  202. assert value >= 0, "Semaphore initial value must be >= 0"
  203. _Verbose.__init__(self, verbose)
  204. self.__cond = Condition(Lock())
  205. self.__value = value
  206. def acquire(self, blocking=1):
  207. rc = 0
  208. self.__cond.acquire()
  209. while self.__value == 0:
  210. if not blocking:
  211. break
  212. self.__cond.wait()
  213. else:
  214. self.__value = self.__value - 1
  215. rc = 1
  216. self.__cond.release()
  217. return rc
  218. def release(self):
  219. self.__cond.acquire()
  220. self.__value = self.__value + 1
  221. self.__cond.notify()
  222. self.__cond.release()
  223. def Event(*args, **kwargs):
  224. return apply(_Event, args, kwargs)
  225. class _Event(_Verbose):
  226. # After Tim Peters' event class (without is_posted())
  227. def __init__(self, verbose=None):
  228. _Verbose.__init__(self, verbose)
  229. self.__cond = Condition(Lock())
  230. self.__flag = 0
  231. def isSet(self):
  232. return self.__flag
  233. def set(self):
  234. self.__cond.acquire()
  235. self.__flag = 1
  236. self.__cond.notifyAll()
  237. self.__cond.release()
  238. def clear(self):
  239. self.__cond.acquire()
  240. self.__flag = 0
  241. self.__cond.release()
  242. def wait(self, timeout=None):
  243. self.__cond.acquire()
  244. if not self.__flag:
  245. self.__cond.wait(timeout)
  246. self.__cond.release()
  247. # Helper to generate new thread names
  248. _counter = 0
  249. def _newname(template="Thread-%d"):
  250. global _counter
  251. _counter = _counter + 1
  252. return template % _counter
  253. # Active thread administration
  254. _active_limbo_lock = _allocate_lock()
  255. _active = {}
  256. _limbo = {}
  257. # Main class for threads
  258. class Thread(_Verbose):
  259. __initialized = 0
  260. def __init__(self, group=None, target=None, name=None,
  261. args=(), kwargs={}, verbose=None):
  262. assert group is None, "group argument must be None for now"
  263. _Verbose.__init__(self, verbose)
  264. self.__target = target
  265. self.__name = str(name or _newname())
  266. self.__args = args
  267. self.__kwargs = kwargs
  268. self.__daemonic = self._set_daemon()
  269. self.__started = 0
  270. self.__stopped = 0
  271. self.__block = Condition(Lock())
  272. self.__initialized = 1
  273. def _set_daemon(self):
  274. # Overridden in _MainThread and _DummyThread
  275. return currentThread().isDaemon()
  276. def __repr__(self):
  277. assert self.__initialized, "Thread.__init__() was not called"
  278. status = "initial"
  279. if self.__started:
  280. status = "started"
  281. if self.__stopped:
  282. status = "stopped"
  283. if self.__daemonic:
  284. status = status + " daemon"
  285. return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
  286. def start(self):
  287. assert self.__initialized, "Thread.__init__() not called"
  288. assert not self.__started, "thread already started"
  289. if __debug__:
  290. self._note("%s.start(): starting thread", self)
  291. _active_limbo_lock.acquire()
  292. _limbo[self] = self
  293. _active_limbo_lock.release()
  294. _start_new_thread(self.__bootstrap, ())
  295. self.__started = 1
  296. _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
  297. def run(self):
  298. if self.__target:
  299. apply(self.__target, self.__args, self.__kwargs)
  300. def __bootstrap(self):
  301. try:
  302. self.__started = 1
  303. _active_limbo_lock.acquire()
  304. _active[_get_ident()] = self
  305. del _limbo[self]
  306. _active_limbo_lock.release()
  307. if __debug__:
  308. self._note("%s.__bootstrap(): thread started", self)
  309. try:
  310. self.run()
  311. except SystemExit:
  312. if __debug__:
  313. self._note("%s.__bootstrap(): raised SystemExit", self)
  314. except:
  315. if __debug__:
  316. self._note("%s.__bootstrap(): unhandled exception", self)
  317. s = _StringIO()
  318. _print_exc(file=s)
  319. _sys.stderr.write("Exception in thread %s:\n%s\n" %
  320. (self.getName(), s.getvalue()))
  321. else:
  322. if __debug__:
  323. self._note("%s.__bootstrap(): normal return", self)
  324. finally:
  325. self.__stop()
  326. self.__delete()
  327. def __stop(self):
  328. self.__block.acquire()
  329. self.__stopped = 1
  330. self.__block.notifyAll()
  331. self.__block.release()
  332. def __delete(self):
  333. _active_limbo_lock.acquire()
  334. del _active[_get_ident()]
  335. _active_limbo_lock.release()
  336. def join(self, timeout=None):
  337. assert self.__initialized, "Thread.__init__() not called"
  338. assert self.__started, "cannot join thread before it is started"
  339. assert self is not currentThread(), "cannot join current thread"
  340. if __debug__:
  341. if not self.__stopped:
  342. self._note("%s.join(): waiting until thread stops", self)
  343. self.__block.acquire()
  344. if timeout is None:
  345. while not self.__stopped:
  346. self.__block.wait()
  347. if __debug__:
  348. self._note("%s.join(): thread stopped", self)
  349. else:
  350. deadline = _time() + timeout
  351. while not self.__stopped:
  352. delay = deadline - _time()
  353. if delay <= 0:
  354. if __debug__:
  355. self._note("%s.join(): timed out", self)
  356. break
  357. self.__block.wait(delay)
  358. else:
  359. if __debug__:
  360. self._note("%s.join(): thread stopped", self)
  361. self.__block.release()
  362. def getName(self):
  363. assert self.__initialized, "Thread.__init__() not called"
  364. return self.__name
  365. def setName(self, name):
  366. assert self.__initialized, "Thread.__init__() not called"
  367. self.__name = str(name)
  368. def isAlive(self):
  369. assert self.__initialized, "Thread.__init__() not called"
  370. return self.__started and not self.__stopped
  371. def isDaemon(self):
  372. assert self.__initialized, "Thread.__init__() not called"
  373. return self.__daemonic
  374. def setDaemon(self, daemonic):
  375. assert self.__initialized, "Thread.__init__() not called"
  376. assert not self.__started, "cannot set daemon status of active thread"
  377. self.__daemonic = daemonic
  378. # Special thread class to represent the main thread
  379. # This is garbage collected through an exit handler
  380. class _MainThread(Thread):
  381. def __init__(self):
  382. Thread.__init__(self, name="MainThread")
  383. self._Thread__started = 1
  384. _active_limbo_lock.acquire()
  385. _active[_get_ident()] = self
  386. _active_limbo_lock.release()
  387. import atexit
  388. atexit.register(self.__exitfunc)
  389. def _set_daemon(self):
  390. return 0
  391. def __exitfunc(self):
  392. self._Thread__stop()
  393. t = _pickSomeNonDaemonThread()
  394. if t:
  395. if __debug__:
  396. self._note("%s: waiting for other threads", self)
  397. while t:
  398. t.join()
  399. t = _pickSomeNonDaemonThread()
  400. if __debug__:
  401. self._note("%s: exiting", self)
  402. self._Thread__delete()
  403. def _pickSomeNonDaemonThread():
  404. for t in enumerate():
  405. if not t.isDaemon() and t.isAlive():
  406. return t
  407. return None
  408. # Dummy thread class to represent threads not started here.
  409. # These aren't garbage collected when they die,
  410. # nor can they be waited for.
  411. # Their purpose is to return *something* from currentThread().
  412. # They are marked as daemon threads so we won't wait for them
  413. # when we exit (conform previous semantics).
  414. class _DummyThread(Thread):
  415. def __init__(self):
  416. Thread.__init__(self, name=_newname("Dummy-%d"))
  417. self._Thread__started = 1
  418. _active_limbo_lock.acquire()
  419. _active[_get_ident()] = self
  420. _active_limbo_lock.release()
  421. def _set_daemon(self):
  422. return 1
  423. def join(self):
  424. assert 0, "cannot join a dummy thread"
  425. # Global API functions
  426. def currentThread():
  427. try:
  428. return _active[_get_ident()]
  429. except KeyError:
  430. ##print "currentThread(): no current thread for", _get_ident()
  431. return _DummyThread()
  432. def activeCount():
  433. _active_limbo_lock.acquire()
  434. count = len(_active) + len(_limbo)
  435. _active_limbo_lock.release()
  436. return count
  437. def enumerate():
  438. _active_limbo_lock.acquire()
  439. active = _active.values() + _limbo.values()
  440. _active_limbo_lock.release()
  441. return active
  442. # Create the main thread object
  443. _MainThread()
  444. # Self-test code
  445. def _test():
  446. import random
  447. class BoundedQueue(_Verbose):
  448. def __init__(self, limit):
  449. _Verbose.__init__(self)
  450. self.mon = RLock()
  451. self.rc = Condition(self.mon)
  452. self.wc = Condition(self.mon)
  453. self.limit = limit
  454. self.queue = []
  455. def put(self, item):
  456. self.mon.acquire()
  457. while len(self.queue) >= self.limit:
  458. self._note("put(%s): queue full", item)
  459. self.wc.wait()
  460. self.queue.append(item)
  461. self._note("put(%s): appended, length now %d",
  462. item, len(self.queue))
  463. self.rc.notify()
  464. self.mon.release()
  465. def get(self):
  466. self.mon.acquire()
  467. while not self.queue:
  468. self._note("get(): queue empty")
  469. self.rc.wait()
  470. item = self.queue[0]
  471. del self.queue[0]
  472. self._note("get(): got %s, %d left", item, len(self.queue))
  473. self.wc.notify()
  474. self.mon.release()
  475. return item
  476. class ProducerThread(Thread):
  477. def __init__(self, queue, quota):
  478. Thread.__init__(self, name="Producer")
  479. self.queue = queue
  480. self.quota = quota
  481. def run(self):
  482. from random import random
  483. counter = 0
  484. while counter < self.quota:
  485. counter = counter + 1
  486. self.queue.put("%s.%d" % (self.getName(), counter))
  487. _sleep(random() * 0.00001)
  488. class ConsumerThread(Thread):
  489. def __init__(self, queue, count):
  490. Thread.__init__(self, name="Consumer")
  491. self.queue = queue
  492. self.count = count
  493. def run(self):
  494. while self.count > 0:
  495. item = self.queue.get()
  496. print item
  497. self.count = self.count - 1
  498. import time
  499. NP = 3
  500. QL = 4
  501. NI = 5
  502. Q = BoundedQueue(QL)
  503. P = []
  504. for i in range(NP):
  505. t = ProducerThread(Q, NI)
  506. t.setName("Producer-%d" % (i+1))
  507. P.append(t)
  508. C = ConsumerThread(Q, NI*NP)
  509. for t in P:
  510. t.start()
  511. _sleep(0.000001)
  512. C.start()
  513. for t in P:
  514. t.join()
  515. C.join()
  516. if __name__ == '__main__':
  517. _test()