Source code for helpers.runners

from threading import Lock, Thread, Event
import traceback
import sys

class BooleanEvent(object):
    def __init__(self):
       self._e = Event()

    def __bool__(self):
        return self._e.isSet()

    __nonzero__ = __bool__

    def set(self, state):
        if state:
            self._e.set()
        else:
            self._e.clear()

[docs]class Oneshot(object): """Oneshot runner for callables. Each instance of Oneshot will only run once, unless reset. You can query on whether the runner has finished, and whether it's still running. Args: * ``func``: callable to be run * ``*args``: positional arguments for the callable * ``**kwargs``: keyword arguments for the callable""" _running = False _finished = False def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs self.active_lock = Lock()
[docs] def run(self): """Run the callable. Sets the ``running`` and ``finished`` attributes as the function progresses. This function doesn't handle exceptions. Passes the return value through.""" with self.active_lock: if self.running or self.finished: return self._running = True value = self.func(*self.args, **self.kwargs) with self.active_lock: self._running = False self._finished = True return value
[docs] def reset(self): """Resets all flags, allowing the callable to be run once again. Will raise an Exception if the callable is still running.""" if self.running: raise Exception("Runner can't be reset while still running") self._running = False self._finished = False
@property def running(self): """Shows whether the callable is still running after it has been launched (assuming it has been launched).""" with self.active_lock: value = self._running return value @property def finished(self): """Shows whether the callable has finished running after it has been launched (assuming it has been launched).""" with self.active_lock: value = self._finished return value
[docs]class BackgroundRunner(object): """Background runner for callables. Once launched, it'll run in background until it's done.. You can query on whether the runner has finished, and whether it's still running. Args: * ``func``: function to be run * ``*args``: positional arguments for the function * ``**kwargs``: keyword arguments for the function""" exc_info = None return_value = None def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs self.active_lock = Lock() self._running = BooleanEvent() self._finished = BooleanEvent() self._failed = BooleanEvent() @property def running(self): """Shows whether the callable is still running after it has been launched (assuming it has been launched).""" with self.active_lock: value = bool(self._running) return value @property def finished(self): """Shows whether the callable has finished running after it has been launched (assuming it has been launched).""" with self.active_lock: value = bool(self._finished) return value @property def failed(self): """Shows whether the callable has thrown an exception during execution (assuming it has been launched). The exception info will be stored in ``self.exc_info``.""" with self.active_lock: value = bool(self._failed) return value
[docs] def threaded_runner(self, print_exc=True): """Actually runs the callable. Sets the ``running`` and ``finished`` attributes as the callable progresses. This method catches exceptions, stores ``sys.exc_info`` in ``self.exc_info``, unsets ``self.running`` and re-raises the exception. Function's return value is stored as ``self.return_value``. Not to be called directly!""" with self.active_lock: self._running.set(True) try: self.return_value = self.func(*self.args, **self.kwargs) except: self.exc_info = sys.exc_info if print_exc: traceback.print_exc() with self.active_lock: self._running.set(False) self._failed.set(True) else: with self.active_lock: self._running.set(False) self._finished.set(True)
[docs] def run(self, daemonize=True): """Starts a thread that will run the callable.""" if self.running: return self.thread = Thread(target=self.threaded_runner) self.thread.daemon=daemonize self.thread.start()
[docs] def reset(self): """Resets all flags, restoring a clean state of the runner.""" self._running.set(False) self._finished.set(False) self._failed.set(False) self.exc_info = None self.return_value = None