Source code for plumpy.communications

# -*- coding: utf-8 -*-
"""Module for general kiwipy communication methods"""
import asyncio
import functools
from typing import TYPE_CHECKING, Any, Callable, Hashable, Optional

import kiwipy

from . import futures
from .utils import ensure_coroutine

__all__ = [
    'Communicator', 'RemoteException', 'DeliveryFailed', 'TaskRejected', 'plum_to_kiwi_future', 'wrap_communicator'
]

RemoteException = kiwipy.RemoteException
DeliveryFailed = kiwipy.DeliveryFailed
TaskRejected = kiwipy.TaskRejected
Communicator = kiwipy.Communicator

if TYPE_CHECKING:
    # identifiers for subscribers
    ID_TYPE = Hashable  # pylint: disable=invalid-name
    Subscriber = Callable[..., Any]
    # RPC subscriber params: communicator, msg
    RpcSubscriber = Callable[[kiwipy.Communicator, Any], Any]
    # Task subscriber params: communicator, task
    TaskSubscriber = Callable[[kiwipy.Communicator, Any], Any]
    # Broadcast subscribers params: communicator, body, sender, subject, correlation id
    BroadcastSubscriber = Callable[[kiwipy.Communicator, Any, Any, Any, ID_TYPE], Any]


[docs]def plum_to_kiwi_future(plum_future: futures.Future) -> kiwipy.Future: """ Return a kiwi future that resolves to the outcome of the plum future :param plum_future: the plum future :return: the kiwipy future """ kiwi_future = kiwipy.Future() def on_done(_plum_future: futures.Future) -> None: with kiwipy.capture_exceptions(kiwi_future): if plum_future.cancelled(): kiwi_future.cancel() else: result = plum_future.result() # Did we get another future? In which case convert it too if isinstance(result, futures.Future): result = plum_to_kiwi_future(result) kiwi_future.set_result(result) plum_future.add_done_callback(on_done) return kiwi_future
def convert_to_comm(callback: 'Subscriber', loop: Optional[asyncio.AbstractEventLoop] = None) -> Callable[..., kiwipy.Future]: """ Take a callback function and converted it to one that will schedule a callback on the given even loop and return a kiwi future representing the future outcome of the original method. :param callback: the function to convert :param loop: the even loop to schedule the callback in :return: a new callback function that returns a future """ if isinstance(callback, kiwipy.BroadcastFilter): # if the broadcast is filtered for this callback, # we don't want to go through the (costly) process # of setting up async tasks and callbacks def _passthrough(*args: Any, **kwargs: Any) -> bool: sender = kwargs.get('sender', args[1]) subject = kwargs.get('subject', args[2]) return callback.is_filtered(sender, subject) # type: ignore[attr-defined] else: def _passthrough(*args: Any, **kwargs: Any) -> bool: # pylint: disable=unused-argument return False coro = ensure_coroutine(callback) def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> kiwipy.Future: if _passthrough(*args, **kwargs): kiwi_future = kiwipy.Future() kiwi_future.set_result(None) return kiwi_future msg_fn = functools.partial(coro, communicator, *args, **kwargs) task_future = futures.create_task(msg_fn, loop) return plum_to_kiwi_future(task_future) return converted
[docs]def wrap_communicator( communicator: kiwipy.Communicator, loop: Optional[asyncio.AbstractEventLoop] = None ) -> 'LoopCommunicator': """ Wrap a communicator such that all callbacks made to any subscribers are scheduled on the given event loop. If the communicator is already an equivalent communicator wrapper then it will not be wrapped again. :param communicator: the communicator to wrap :param loop: the event loop to schedule callbacks on :return: a communicator wrapper """ if isinstance(communicator, LoopCommunicator) and communicator.loop() is loop: return communicator return LoopCommunicator(communicator, loop)
class LoopCommunicator(kiwipy.Communicator): """Wrapper around a `kiwipy.Communicator` that schedules any subscriber messages on a given event loop.""" def __init__(self, communicator: kiwipy.Communicator, loop: Optional[asyncio.AbstractEventLoop] = None): """ :param communicator: The kiwipy communicator :param loop: The event loop to schedule callbacks on """ assert communicator is not None self._communicator = communicator self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop() def loop(self) -> asyncio.AbstractEventLoop: return self._loop def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: Optional['ID_TYPE'] = None) -> 'ID_TYPE': converted = convert_to_comm(subscriber, self._loop) return self._communicator.add_rpc_subscriber(converted, identifier) def remove_rpc_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_rpc_subscriber(identifier) def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: Optional['ID_TYPE'] = None) -> 'ID_TYPE': converted = convert_to_comm(subscriber, self._loop) return self._communicator.add_task_subscriber(converted, identifier) def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_task_subscriber(identifier) def add_broadcast_subscriber( self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None ) -> 'ID_TYPE': converted = convert_to_comm(subscriber, self._loop) return self._communicator.add_broadcast_subscriber(converted, identifier) def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_broadcast_subscriber(identifier) def task_send(self, task: Any, no_reply: bool = False) -> kiwipy.Future: return self._communicator.task_send(task, no_reply) def rpc_send(self, recipient_id: 'ID_TYPE', msg: Any) -> kiwipy.Future: return self._communicator.rpc_send(recipient_id, msg) def broadcast_send( self, body: Optional[Any], sender: Optional[str] = None, subject: Optional[str] = None, correlation_id: Optional['ID_TYPE'] = None ) -> futures.Future: return self._communicator.broadcast_send(body, sender, subject, correlation_id) def is_closed(self) -> bool: """Return `True` if the communicator was closed""" return self._communicator.is_closed() def close(self) -> None: """Close a communicator, free up all resources and do not allow any further operations""" self._communicator.close()