earwax.promises package

Module contents

Provides the various promise classes.

class earwax.promises.PromiseStates

Bases: enum.Enum

The possible states of earwax.Promise instances.

Variables:
  • not_ready

    The promise has been created, but a function must still be added.

    How this is done depends on how the promise subclass in question has been implemented, and may not always be used.

  • ready – The promise has been created, and a function registered. The run() method has not yet been called.
  • running – The promise’s run() method has been called, but the function has not yet returned a value, or raised an error.
  • done – The promise has finished, and there was no error. The on_done() and on_finally() events have already been dispatched.
  • error

    The promise completed, but there was an error, which was handled by the on_error() event.

    The on_finally() event has been dispatched.

  • cancelled – The promise has had its cancel() method called, and its on_cancel() event has been dispatched.
cancelled = 5
done = 3
error = 4
not_ready = 0
ready = 1
running = 2
class earwax.promises.ThreadedPromise(thread_pool: concurrent.futures._base.Executor, func: Optional[Callable[[...], T]] = None, future: Optional[concurrent.futures._base.Future] = None)

Bases: earwax.promises.base.Promise

A promise that a value will be available in the future.

Uses an Executor subclass (like ThreadPoolExecutor, or ProcessPoolExecutor for threading).

You can create this class directly, or by using decorators.

Here is an example of the decorator syntax:

from concurrent.futures import ThreadPoolExecutor

promise: ThreadedPromise = ThreadedPromise(ThreadPoolExecutor())

@promise.register_func
def func() -> None:
    # Long-running task...
    return 5

@promise.event
def on_done(value: int) -> None:
    # Do something with the return value.

@promise.event
def on_error(e: Exception) -> None:
    # Do something with an error.

@promise.event
def on_finally():
    print('Done.')

promise.run()

Or you could create the promise manually:

promise = ThreadedPromise(
    ThreadPoolExecutor(), func=predefined_function
)
promise.event('on_done')(print)
promise.run()

Note the use of Pyglet’s own event system.

Variables:
  • thread_pool – The thread pool to use.
  • func – The function to submit to the thread pool.
  • future – The future that is running, or None if the run() method has not yet been called.
cancel() → None

Try to cancel self.future.

If There is no future, RuntimeError will be raised.

check(dt: float) → None

Check state and react accordingly.

Checks to see if self.future has finished or not.

If it has, dispatch the on_done() event with the resulting value.

If an error has been raised, dispatch the on_error() event with the resulting error.

If either of these things have happened, dispatch the on_finally() event.

Parameters:dt

The time since the last run.

This argument is required by pyglet.clock.schedule.

register_func(func: Callable[[...], T]) → Callable[[...], T]

Register promise function.

Registers the function to be called by the run() method.

Parameters:func – The function to use. Will be stored in self.func.
run(*args, **kwargs) → None

Start this promise running.

The result of calling submit on self.thread_pool will be stored on self.future.

If this instance does not have a function registered yet, RuntimeError will be raised.

Parameters:
  • args – The extra positional arguments to pass along to submit.
  • kwargs – The extra keyword arguments to pass along to submit.
class earwax.promises.StaggeredPromise(func: Callable[[...], Generator[float, None, T]])

Bases: earwax.promises.base.Promise

A promise that can suspend itself at will.

I found myself missing the MOO-style suspend() function, so thought I’d make the same capability available in earwax:

@StaggeredPromise.decorate
def promise() -> StaggeredPromiseGeneratorType:
    game.output('Hello.')
    yield 2.0
    game.output('World.')

promise.run()
game.run(window)

This class supports all the promise events found on earwax.Promise, and also has a on_next() event, which will fire whenever a promise suspends:

@promise.event
def on_next(delay: float) -> None:
    print(f'I waited {delay}.')
Variables:
  • func – The function to run.
  • generator – The generator returned by self.func.
cancel() → None

Cancel this promise.

Cancels self.generator, and sets the proper state.

classmethod decorate(func: Callable[[...], Generator[float, None, T]]) → earwax.promises.staggered_promise.StaggeredPromise

Make an instance from a decorated function.

This function acts as a decorator method for returning earwax.StaggeredPromise instances.

Using this function seems to help mypy figure out what type your function is.

Parameters:func – The function to decorate.
do_next(dt: Optional[float]) → None

Advance execution.

Calls next(self.generator), and then suspend for however long the function demands.

If StopIteration is raised, then the args from that exception are sent to the self.on_done event.

If any other exception is raised, then that exception is passed along to the self.on_error event.

Parameters:dt

The time since the last run, as passed by pyglet.clock.schedule_once.

If this is the first time this method is called, dt will be None.

on_next(delay: float) → None

Do something when execution is advanced.

This event is dispatched every time next is called on self.func.

Parameters:delay – The delay that was requested by the function.
run(*args, **kwargs) → None

Run this promise.

Start self.func running, and set the proper state.

Parameters:
  • args – The positional arguments passed to self.func.
  • kwargs – The keyword arguments passed to self.func.
class earwax.promises.Promise

Bases: typing.Generic, earwax.mixins.RegisterEventMixin

The base class for promises.

Instances of this class have a few possible states which are contained in the PromiseStates enumeration.

Variables:state – The state this promise is in (see above).
cancel() → None

Override to provide cancel functionality.

done(value: T) → None

Finish up.

Dispatches the on_done() event with the given value, and set self.state to earwax.PromiseStates.done.

Parameters:value – The value that was returned from whatever function this promise had.
error(e: Exception) → None

Handle an error.

This event dispatches the on_error() event with the passed exception.

Parameters:e – The exception that was raised.
on_cancel() → None

Handle cancellation.

This event is dispatched when this instance has its cancel() method called.

on_done(result: T) → None

Handle return value.

This event is dispatched when this promise completes with no error.

Parameters:result – The value returned by the function.
on_error(e: Exception) → None

Handle an error.

This event is dispatched when this promise raises an error.

Parameters:e – The exception that was raised.
on_finally() → None

Handle this promise comise completing.

This event is dispatched when this promise completes, whether or not it raises an error.

run(*args, **kwargs) → None

Start this promise running.