Source code for president.thread

# This file is placed in the Public Domain.
#
# pylint: disable=C,I,R,W0212,W0718,E0402
# flake8: noqa


"thread"


import functools
import queue
import threading
import time


from .error import Error
from .utils import name


[docs]class Thread(threading.Thread): "" def __init__(self, func, thrname, *args, daemon=True): super().__init__(None, self.run, thrname, (), {}, daemon=daemon) self._result = None self.name = thrname or name(func) self.queue = queue.Queue() self.queue.put_nowait((func, args)) self.sleep = None self.starttime = time.time() def __iter__(self): return self def __next__(self): for k in dir(self): yield k
[docs] def join(self, timeout=None): "" super().join(timeout) return self._result
[docs] def run(self): "" func, args = self.queue.get() try: self._result = func(*args) except Exception as ex: exc = ex.with_traceback(ex.__traceback__) Error.errors.append(exc) if args: try: args[0].ready() except AttributeError: pass
[docs]def launch(func, *args, **kwargs) -> Thread: thrname = kwargs.get('name', '') thread = Thread(func, thrname, *args) thread.start() return thread
[docs]def threaded(func, *args, **kwargs) -> None: @functools.wraps(func) def threadedfunc(*args, **kwargs): thread = launch(func, *args, **kwargs) if args: args[0].thr = thread return thread threadedfunc.args = args threadedfunc.kwargs = kwargs return threadedfunc