plumpy.process_comms module

Module for process level communication functions and classes

class plumpy.process_comms.ProcessLauncher(loop: Optional[asyncio.events.AbstractEventLoop] = None, persister: Optional[plumpy.persistence.Persister] = None, load_context: Optional[plumpy.persistence.LoadSaveContext] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]

Bases: object

Takes incoming task messages and uses them to launch processes.

Expected format of task:

For launch:

{
    'task': <LAUNCH_TASK>
    'process_class': <Process class to launch>
    'args': <tuple of positional args for process constructor>
    'kwargs': <dict of keyword args for process constructor>.
    'nowait': True or False
}

For continue:

{
    'task': <CONTINUE_TASK>
    'pid': <Process ID>
    'nowait': True or False
}
async _continue(_communicator: kiwipy.communications.Communicator, pid: Hashable, nowait: bool, tag: Optional[str] = None) → Union[Hashable, Any][source]

Continue the process

Parameters
  • _communicator – the communicator

  • pid – the pid of the process to continue

  • nowait – if True don’t wait for the process to complete

  • tag – the checkpoint tag to continue from

async _create(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Hashable[source]

Create the process

Parameters
  • _communicator – the communicator

  • process_class – the process class to create

  • persist – should the process be persisted

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

Returns

the pid of the created process

async _launch(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, nowait: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Union[Hashable, Any][source]

Launch the process

Parameters
  • _communicator – the communicator

  • process_class – the process class to launch

  • persist – should the process be persisted

  • nowait – if True only return when the process finishes

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

Returns

the pid of the created process or the outputs (if nowait=False)

class plumpy.process_comms.RemoteProcessController(communicator: kiwipy.communications.Communicator)[source]

Bases: object

Control remote processes using coroutines that will send messages and wait (in a non-blocking way) for their response

async continue_process(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Optional[Any][source]

Continue the process

Parameters
  • _communicator – the communicator

  • pid – the pid of the process to continue

  • tag – the checkpoint tag to continue from

async execute_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]

Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.

Parameters
  • process_class – the process class to execute

  • init_args – the positional arguments to the class constructor

  • init_kwargs – the keyword arguments to the class constructor

  • loader – the class loader to use

  • nowait – if True, don’t wait for the process to send a response

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of executing the process

async get_status(pid: Hashable) → Any[source]

Get the status of a process with the given PID :param pid: the process id :return: the status response from the process

async kill_process(pid: Hashable, msg: Optional[Any] = None) → Any[source]

Kill the process

Parameters
  • pid – the pid of the process to kill

  • msg – optional kill message

Returns

True if killed, False otherwise

async launch_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]

Launch a process given the class and constructor arguments

Parameters
  • process_class – the class of the process to launch

  • init_args – the constructor positional arguments

  • init_kwargs – the constructor keyword arguments

  • persist – should the process be persisted

  • loader – the classloader to use

  • nowait – if True, don’t wait for the process to send a response, just return the pid

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of launching the process

async pause_process(pid: Hashable, msg: Optional[Any] = None) → Any[source]

Pause the process

Parameters
  • pid – the pid of the process to pause

  • msg – optional pause message

Returns

True if paused, False otherwise

async play_process(pid: Hashable) → Any[source]

Play the process

Parameters

pid – the pid of the process to play

Returns

True if played, False otherwise

class plumpy.process_comms.RemoteProcessThreadController(communicator: kiwipy.communications.Communicator)[source]

Bases: object

A class that can be used to control and launch remote processes

continue_process(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]
execute_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]

Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.

Parameters
  • process_class – the process class to execute

  • init_args – the positional arguments to the class constructor

  • init_kwargs – the keyword arguments to the class constructor

  • loader – the class loader to use

  • nowait – if True, don’t wait for the process to send a response

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of executing the process

get_status(pid: Hashable) → concurrent.futures._base.Future[source]

Get the status of a process with the given PID.

Parameters

pid – the process id

Returns

the status response from the process

kill_all(msg: Optional[Any])None[source]

Kill all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

kill_process(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]

Kill the process

Parameters
  • pid – the pid of the process to kill

  • msg – optional kill message

Returns

a response future from the process to be killed

launch_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]

Launch the process

Parameters
  • process_class – the process class to launch

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

  • persist – should the process be persisted

  • loader – the class loader to use

  • nowait – if True only return when the process finishes

  • no_reply – don’t send a reply to the sender

Returns

the pid of the created process or the outputs (if nowait=False)

pause_all(msg: Any)None[source]

Pause all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

pause_process(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]

Pause the process

Parameters
  • pid – the pid of the process to pause

  • msg – optional pause message

Returns

a response future from the process to be paused

play_all()None[source]

Play all processes that are subscribed to the same communicator

play_process(pid: Hashable) → concurrent.futures._base.Future[source]

Play the process

Parameters

pid – the pid of the process to pause

Returns

a response future from the process to be played

task_send(message: Any, no_reply: bool = False) → Optional[Any][source]

Send a task to be performed using the communicator

Parameters
  • message – the task message

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the response from the remote side (if no_reply=False)

plumpy.process_comms.create_continue_body(pid: Hashable, tag: Optional[str] = None, nowait: bool = False) → Dict[str, Any][source]

Create a message body to continue an existing process :param pid: the pid of the existing process :param tag: the optional persistence tag :param nowait: wait for the process to finish before completing the task, otherwise just return the PID :return: a dictionary with the body of the message to continue the process

plumpy.process_comms.create_launch_body(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = True) → Dict[str, Any][source]

Create a message body for the launch action

Parameters
  • process_class – the class of the process to launch

  • init_args – any initialisation positional arguments

  • init_kwargs – any initialisation keyword arguments

  • persist – persist this process if True, otherwise don’t

  • loader – the loader to use to load the persisted process

  • nowait – wait for the process to finish before completing the task, otherwise just return the PID

Returns

a dictionary with the body of the message to launch the process

Return type

dict