plumpy.communications module

Module for general kiwipy communication methods

class plumpy.communications.Communicator[source]

Bases: object

The interface for a communicator used to both send and receive various types of message.

abstract add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) → Any[source]

Add a broadcast subscriber that will receive all broadcast messages

  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber


an identifier for the subscriber and can be subsequently used to remove it

abstract add_rpc_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

abstract add_task_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

  • subscriber – The task callback function

  • identifier – the subscriber identifier

abstract broadcast_send(body, sender=None, subject=None, correlation_id=None)bool[source]

Broadcast a message to all subscribers

abstract close()[source]

Close a communicator, free up all resources and do not allow any further operations

abstract is_closed()bool[source]

Return True if the communicator was closed

abstract remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber


identifier – the identifier of the subscriber to remove

abstract remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.


identifier – The RPC subscriber identifier

abstract remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.


identifier – the subscriber to remove


ValueError if identifier does not correspond to a known subscriber

abstract rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

  • recipient_id – The recipient identifier

  • msg – The body of the message


A future corresponding to the outcome of the call

Return type


abstract task_send(task, no_reply=False) → concurrent.futures._base.Future[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task


A future corresponding to the outcome of the task

exception plumpy.communications.DeliveryFailed[source]

Bases: Exception

Failed to deliver a message

exception plumpy.communications.RemoteException[source]

Bases: Exception

An exception occurred at the remote end of the call

exception plumpy.communications.TaskRejected[source]

Bases: Exception

A task was rejected at the remote end

plumpy.communications.plum_to_kiwi_future(plum_future: _asyncio.Future) → concurrent.futures._base.Future[source]

Return a kiwi future that resolves to the outcome of the plum future


plum_future – the plum future


the kiwipy future

plumpy.communications.wrap_communicator(communicator: kiwipy.communications.Communicator, loop: Optional[] = None) → plumpy.communications.LoopCommunicator[source]

Wrap a communicator such that all callbacks made to any subscribers are scheduled on the given event loop.

If the communicator is already an equivalent communicator wrapper then it will not be wrapped again.

  • communicator – the communicator to wrap

  • loop – the event loop to schedule callbacks on


a communicator wrapper