Source code for plumpy.futures

# -*- coding: utf-8 -*-
"""
Module containing future related methods and classes
"""
import asyncio
from typing import Any, Callable, Coroutine, Optional

import kiwipy

__all__ = ['Future', 'gather', 'chain', 'copy_future', 'CancelledError', 'create_task']

CancelledError = kiwipy.CancelledError


class InvalidStateError(Exception):
    """Exception for when a future or action is in an invalid state"""


copy_future = kiwipy.copy_future  # pylint: disable=invalid-name
chain = kiwipy.chain  # pylint: disable=invalid-name
gather = asyncio.gather  # pylint: disable=invalid-name

Future = asyncio.Future  # pylint: disable=invalid-name


class CancellableAction(Future):
    """
    An action that can be launched and potentially cancelled
    """

    def __init__(self, action: Callable[..., Any], cookie: Any = None):
        super().__init__()
        self._action = action
        self._cookie = cookie

    @property
    def cookie(self) -> Any:
        """ A cookie that can be used to correlate the actions with something """
        return self._cookie

    def run(self, *args: Any, **kwargs: Any) -> None:
        """Run the action

        :param args: the positional arguments to the action
        :param kwargs: the keyword arguments to the action
        """
        if self.done():
            raise InvalidStateError('Action has already been ran')

        try:
            with kiwipy.capture_exceptions(self):
                self.set_result(self._action(*args, **kwargs))
        finally:
            self._action = None  # type: ignore


[docs]def create_task(coro: Callable[[], Coroutine], loop: Optional[asyncio.AbstractEventLoop] = None) -> Future: """ Schedule a call to a coro in the event loop and wrap the outcome in a future. :param coro: a function which creates the coroutine to schedule :param loop: the event loop to schedule it in :return: the future representing the outcome of the coroutine """ loop = loop or asyncio.get_event_loop() future = loop.create_future() async def run_task() -> None: with kiwipy.capture_exceptions(future): res = await coro() future.set_result(res) asyncio.run_coroutine_threadsafe(run_task(), loop) return future
def unwrap_kiwi_future(future: kiwipy.Future) -> kiwipy.Future: """ Create a kiwi future that represents the final results of a nested series of futures, meaning that if the futures provided itself resolves to a future the returned future will not resolve to a value until the final chain of futures is not a future but a concrete value. If at any point in the chain a future resolves to an exception then the returned future will also resolve to that exception. :param future: the future to unwrap :return: the unwrapping future """ unwrapping = kiwipy.Future() def unwrap(fut: kiwipy.Future) -> None: if fut.cancelled(): unwrapping.cancel() else: with kiwipy.capture_exceptions(unwrapping): result = fut.result() if isinstance(result, kiwipy.Future): result.add_done_callback(unwrap) else: unwrapping.set_result(result) future.add_done_callback(unwrap) return unwrapping