plumpy.process_comms module#
Module for process level communication functions and classes
- class plumpy.process_comms.MessageBuilder[source]#
Bases:
objectMessageBuilder will construct different messages that can passing over communicator.
- classmethod kill(text: str | None = None, force_kill: bool = False) Dict[str, Any][source]#
The kill message send over communicator.
- classmethod pause(text: str | None = None) Dict[str, Any][source]#
The pause message send over communicator.
- class plumpy.process_comms.ProcessLauncher(loop: AbstractEventLoop | None = None, persister: Persister | None = None, load_context: LoadSaveContext | None = None, loader: ObjectLoader | None = None)[source]#
Bases:
objectTakes incoming task messages and uses them to launch processes.
Expected format of task:
For launch:
{ 'task': <LAUNCH_TASK> 'process_class': <Process class to launch> 'args': <tuple of positional args for process constructor> 'kwargs': <dict of keyword args for process constructor>. 'nowait': True or False }
For continue:
{ 'task': <CONTINUE_TASK> 'pid': <Process ID> 'nowait': True or False }
- async _continue(_communicator: Communicator, pid: Hashable, nowait: bool, tag: str | None = None) Hashable | Any[source]#
Continue the process
- Parameters:
_communicator – the communicator
pid – the pid of the process to continue
nowait – if True don’t wait for the process to complete
tag – the checkpoint tag to continue from
- async _create(_communicator: Communicator, process_class: str, persist: bool, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None) Hashable[source]#
Create the process
- Parameters:
_communicator – the communicator
process_class – the process class to create
persist – should the process be persisted
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns:
the pid of the created process
- async _launch(_communicator: Communicator, process_class: str, persist: bool, nowait: bool, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None) Hashable | Any[source]#
Launch the process
- Parameters:
_communicator – the communicator
process_class – the process class to launch
persist – should the process be persisted
nowait – if True only return when the process finishes
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns:
the pid of the created process or the outputs (if nowait=False)
- class plumpy.process_comms.RemoteProcessController(communicator: Communicator)[source]#
Bases:
objectControl remote processes using coroutines that will send messages and wait (in a non-blocking way) for their response
- async continue_process(pid: Hashable, tag: str | None = None, nowait: bool = False, no_reply: bool = False) Any | None[source]#
Continue the process
- Parameters:
_communicator – the communicator
pid – the pid of the process to continue
tag – the checkpoint tag to continue from
- async execute_process(process_class: str, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None, loader: ObjectLoader | None = None, nowait: bool = False, no_reply: bool = False) Any[source]#
Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters:
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns:
the result of executing the process
- async get_status(pid: Hashable) Any[source]#
Get the status of a process with the given PID :param pid: the process id :return: the status response from the process
- async kill_process(pid: Hashable, msg_text: str | None = None, force_kill: bool = False) Any[source]#
Kill the process
- Parameters:
pid – the pid of the process to kill
msg – optional kill message
- Returns:
True if killed, False otherwise
- async launch_process(process_class: str, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None, persist: bool = False, loader: ObjectLoader | None = None, nowait: bool = False, no_reply: bool = False) Any[source]#
Launch a process given the class and constructor arguments
- Parameters:
process_class – the class of the process to launch
init_args – the constructor positional arguments
init_kwargs – the constructor keyword arguments
persist – should the process be persisted
loader – the classloader to use
nowait – if True, don’t wait for the process to send a response, just return the pid
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns:
the result of launching the process
- class plumpy.process_comms.RemoteProcessThreadController(communicator: Communicator)[source]#
Bases:
objectA class that can be used to control and launch remote processes
- continue_process(pid: Hashable, tag: str | None = None, nowait: bool = False, no_reply: bool = False) None | Hashable | Any[source]#
- execute_process(process_class: str, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None, loader: ObjectLoader | None = None, nowait: bool = False, no_reply: bool = False) None | Hashable | Any[source]#
Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters:
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns:
the result of executing the process
- get_status(pid: Hashable) Future[source]#
Get the status of a process with the given PID.
- Parameters:
pid – the process id
- Returns:
the status response from the process
- kill_all(msg_text: str | None) None[source]#
Kill all processes that are subscribed to the same communicator
- Parameters:
msg – an optional pause message
- kill_process(pid: Hashable, msg_text: str | None = None, force_kill: bool = False) Future[source]#
Kill the process
- Parameters:
pid – the pid of the process to kill
msg – optional kill message
- Returns:
a response future from the process to be killed
- launch_process(process_class: str, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None, persist: bool = False, loader: ObjectLoader | None = None, nowait: bool = False, no_reply: bool = False) None | Hashable | Any[source]#
Launch the process
- Parameters:
process_class – the process class to launch
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
persist – should the process be persisted
loader – the class loader to use
nowait – if True only return when the process finishes
no_reply – don’t send a reply to the sender
- Returns:
the pid of the created process or the outputs (if nowait=False)
- pause_all(msg_text: str | None) None[source]#
Pause all processes that are subscribed to the same communicator
- Parameters:
msg – an optional pause message
- pause_process(pid: Hashable, msg_text: str | None = None) Future[source]#
Pause the process
- Parameters:
pid – the pid of the process to pause
msg – optional pause message
- Returns:
a response future from the process to be paused
- plumpy.process_comms.create_continue_body(pid: Hashable, tag: str | None = None, nowait: bool = False) Dict[str, Any][source]#
Create a message body to continue an existing process :param pid: the pid of the existing process :param tag: the optional persistence tag :param nowait: wait for the process to finish before completing the task, otherwise just return the PID :return: a dictionary with the body of the message to continue the process
- plumpy.process_comms.create_launch_body(process_class: str, init_args: Sequence[Any] | None = None, init_kwargs: Dict[str, Any] | None = None, persist: bool = False, loader: ObjectLoader | None = None, nowait: bool = True) Dict[str, Any][source]#
Create a message body for the launch action
- Parameters:
process_class – the class of the process to launch
init_args – any initialisation positional arguments
init_kwargs – any initialisation keyword arguments
persist – persist this process if True, otherwise don’t
loader – the loader to use to load the persisted process
nowait – wait for the process to finish before completing the task, otherwise just return the PID
- Returns:
a dictionary with the body of the message to launch the process
- Return type: