plumpy.process_comms module¶
Module for process level communication functions and classes
-
class
plumpy.process_comms.
ProcessLauncher
(loop: Optional[asyncio.events.AbstractEventLoop] = None, persister: Optional[plumpy.persistence.Persister] = None, load_context: Optional[plumpy.persistence.LoadSaveContext] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]¶ Bases:
object
Takes incoming task messages and uses them to launch processes.
Expected format of task:
For launch:
{ 'task': <LAUNCH_TASK> 'process_class': <Process class to launch> 'args': <tuple of positional args for process constructor> 'kwargs': <dict of keyword args for process constructor>. 'nowait': True or False }
For continue:
{ 'task': <CONTINUE_TASK> 'pid': <Process ID> 'nowait': True or False }
-
async
_continue
(_communicator: kiwipy.communications.Communicator, pid: Hashable, nowait: bool, tag: Optional[str] = None) → Union[Hashable, Any][source]¶ Continue the process
- Parameters
_communicator – the communicator
pid – the pid of the process to continue
nowait – if True don’t wait for the process to complete
tag – the checkpoint tag to continue from
-
async
_create
(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Hashable[source]¶ Create the process
- Parameters
_communicator – the communicator
process_class – the process class to create
persist – should the process be persisted
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns
the pid of the created process
-
async
_launch
(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, nowait: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Union[Hashable, Any][source]¶ Launch the process
- Parameters
_communicator – the communicator
process_class – the process class to launch
persist – should the process be persisted
nowait – if True only return when the process finishes
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns
the pid of the created process or the outputs (if nowait=False)
-
async
-
class
plumpy.process_comms.
RemoteProcessController
(communicator: kiwipy.communications.Communicator)[source]¶ Bases:
object
Control remote processes using coroutines that will send messages and wait (in a non-blocking way) for their response
-
async
continue_process
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Optional[Any][source]¶ Continue the process
- Parameters
_communicator – the communicator
pid – the pid of the process to continue
tag – the checkpoint tag to continue from
-
async
execute_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]¶ Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of executing the process
-
async
get_status
(pid: Hashable) → Any[source]¶ Get the status of a process with the given PID :param pid: the process id :return: the status response from the process
-
async
kill_process
(pid: Hashable, msg: Optional[Any] = None) → Any[source]¶ Kill the process
- Parameters
pid – the pid of the process to kill
msg – optional kill message
- Returns
True if killed, False otherwise
-
async
launch_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]¶ Launch a process given the class and constructor arguments
- Parameters
process_class – the class of the process to launch
init_args – the constructor positional arguments
init_kwargs – the constructor keyword arguments
persist – should the process be persisted
loader – the classloader to use
nowait – if True, don’t wait for the process to send a response, just return the pid
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of launching the process
-
async
-
class
plumpy.process_comms.
RemoteProcessThreadController
(communicator: kiwipy.communications.Communicator)[source]¶ Bases:
object
A class that can be used to control and launch remote processes
-
continue_process
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶
-
execute_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶ Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of executing the process
-
get_status
(pid: Hashable) → concurrent.futures._base.Future[source]¶ Get the status of a process with the given PID.
- Parameters
pid – the process id
- Returns
the status response from the process
-
kill_all
(msg: Optional[Any]) → None[source]¶ Kill all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
kill_process
(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]¶ Kill the process
- Parameters
pid – the pid of the process to kill
msg – optional kill message
- Returns
a response future from the process to be killed
-
launch_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶ Launch the process
- Parameters
process_class – the process class to launch
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
persist – should the process be persisted
loader – the class loader to use
nowait – if True only return when the process finishes
no_reply – don’t send a reply to the sender
- Returns
the pid of the created process or the outputs (if nowait=False)
-
pause_all
(msg: Any) → None[source]¶ Pause all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
pause_process
(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]¶ Pause the process
- Parameters
pid – the pid of the process to pause
msg – optional pause message
- Returns
a response future from the process to be paused
-
-
plumpy.process_comms.
create_continue_body
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False) → Dict[str, Any][source]¶ Create a message body to continue an existing process :param pid: the pid of the existing process :param tag: the optional persistence tag :param nowait: wait for the process to finish before completing the task, otherwise just return the PID :return: a dictionary with the body of the message to continue the process
-
plumpy.process_comms.
create_launch_body
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = True) → Dict[str, Any][source]¶ Create a message body for the launch action
- Parameters
process_class – the class of the process to launch
init_args – any initialisation positional arguments
init_kwargs – any initialisation keyword arguments
persist – persist this process if True, otherwise don’t
loader – the loader to use to load the persisted process
nowait – wait for the process to finish before completing the task, otherwise just return the PID
- Returns
a dictionary with the body of the message to launch the process
- Return type