Source code for kiwipy.communications

# -*- coding: utf-8 -*-
import abc
import sys
from typing import Any, Callable

import shortuuid

from . import exceptions
# For backwards compatibility import exceptions too
from .exceptions import *  # pylint: disable=wildcard-import, redefined-builtin, unused-wildcard-import
from . import futures

__all__ = 'Communicator', 'CommunicatorHelper'

# RPC subscriber params: communicator, msg
RpcSubscriber = Callable[['Communicator', Any], Any]
# Task subscriber params: communicator, task
TaskSubscriber = Callable[['Communicator', Any], Any]
# Broadcast subscribers params: communicator, body, sender, subject, correlation id
BroadcastSubscriber = Callable[['Communicator', Any, Any, Any, Any], Any]


[docs]class Communicator: """ The interface for a communicator used to both send and receive various types of message. """ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] @abc.abstractmethod def is_closed(self) -> bool: """Return `True` if the communicator was closed"""
[docs] @abc.abstractmethod def close(self): """Close a communicator, free up all resources and do not allow any further operations"""
[docs] @abc.abstractmethod def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any: """Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned."""
[docs] @abc.abstractmethod def remove_rpc_subscriber(self, identifier): """ Remove an RPC subscriber given the identifier. Raises a `ValueError` if there is no such subscriber. :param identifier: The RPC subscriber identifier """
[docs] @abc.abstractmethod def add_task_subscriber(self, subscriber: TaskSubscriber, identifier=None) -> Any: """Add a task subscriber to the communicator's default queue. Returns the identifier. :param subscriber: The task callback function :param identifier: the subscriber identifier """
[docs] @abc.abstractmethod def remove_task_subscriber(self, identifier): """Remove a task subscriber from the communicator's default queue. :param identifier: the subscriber to remove :raises: ValueError if identifier does not correspond to a known subscriber """
[docs] @abc.abstractmethod def add_broadcast_subscriber(self, subscriber: BroadcastSubscriber, identifier=None) -> Any: """ Add a broadcast subscriber that will receive all broadcast messages :param subscriber: the subscriber function to be called :param identifier: an optional identifier for the subscriber :return: an identifier for the subscriber and can be subsequently used to remove it """
[docs] @abc.abstractmethod def remove_broadcast_subscriber(self, identifier): """ Remove a broadcast subscriber :param identifier: the identifier of the subscriber to remove """
[docs] @abc.abstractmethod def task_send(self, task, no_reply=False) -> futures.Future: """ Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task. :param task: The task message :param no_reply: Do not send a reply containing the result of the task :type no_reply: bool :return: A future corresponding to the outcome of the task """
[docs] @abc.abstractmethod def rpc_send(self, recipient_id, msg): """ Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call. :param recipient_id: The recipient identifier :param msg: The body of the message :return: A future corresponding to the outcome of the call :rtype: :class:`kiwipy.Future` """
[docs] @abc.abstractmethod def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: """Broadcast a message to all subscribers"""
class CommunicatorHelper(Communicator): # Have to disable this linter because this class remains abstract and it is # just used by classes that will themselves be concrete # pylint: disable=abstract-method def __init__(self): self._task_subscribers = {} self._broadcast_subscribers = {} self._rpc_subscribers = {} self._closed = False def is_closed(self) -> bool: return self._closed def close(self): if self._closed: return self._closed = True del self._task_subscribers del self._broadcast_subscribers del self._rpc_subscribers def add_rpc_subscriber(self, subscriber, identifier=None) -> Any: self._ensure_open() identifier = identifier or shortuuid.uuid() if identifier in self._rpc_subscribers: raise exceptions.DuplicateSubscriberIdentifier(f"RPC identifier '{identifier}'") self._rpc_subscribers[identifier] = subscriber return identifier def remove_rpc_subscriber(self, identifier): self._ensure_open() try: self._rpc_subscribers.pop(identifier) except KeyError: raise ValueError(f"Unknown subscriber '{identifier}'") def add_task_subscriber(self, subscriber, identifier=None): """ Register a task subscriber :param subscriber: The task callback function :param identifier: the subscriber identifier """ self._ensure_open() identifier = identifier or shortuuid.uuid() if identifier in self._rpc_subscribers: raise exceptions.DuplicateSubscriberIdentifier(f"RPC identifier '{identifier}'") self._task_subscribers[identifier] = subscriber return identifier def remove_task_subscriber(self, identifier): """ Remove a task subscriber :param identifier: the subscriber to remove :raises: ValueError if identifier does not correspond to a known subscriber """ self._ensure_open() try: self._task_subscribers.pop(identifier) except KeyError: raise ValueError(f"Unknown subscriber: '{identifier}'") def add_broadcast_subscriber(self, subscriber: BroadcastSubscriber, identifier=None) -> Any: self._ensure_open() identifier = identifier or shortuuid.uuid() if identifier in self._broadcast_subscribers: raise exceptions.DuplicateSubscriberIdentifier(f"Broadcast identifier '{identifier}'") self._broadcast_subscribers[identifier] = subscriber return identifier def remove_broadcast_subscriber(self, identifier): self._ensure_open() try: del self._broadcast_subscribers[identifier] except KeyError: raise ValueError(f"Broadcast subscriber '{identifier}' unknown") def fire_task(self, msg, no_reply=False): self._ensure_open() future = futures.Future() for subscriber in self._task_subscribers.values(): try: result = subscriber(self, msg) future.set_result(result) break except exceptions.TaskRejected: pass except Exception: # pylint: disable=broad-except future.set_exception(exceptions.RemoteException(sys.exc_info())) break if no_reply: return None return future def fire_rpc(self, recipient_id, msg): self._ensure_open() try: subscriber = self._rpc_subscribers[recipient_id] except KeyError: raise exceptions.UnroutableError(f"Unknown rpc recipient '{recipient_id}'") else: future = futures.Future() try: future.set_result(subscriber(self, msg)) except Exception: # pylint: disable=broad-except future.set_exception(exceptions.RemoteException(sys.exc_info())) return future def fire_broadcast(self, body, sender=None, subject=None, correlation_id=None): self._ensure_open() for subscriber in self._broadcast_subscribers.values(): subscriber(self, body=body, sender=sender, subject=subject, correlation_id=correlation_id) return True def _ensure_open(self): if self.is_closed(): raise exceptions.CommunicatorClosed