plumpy.processes module

Contents

plumpy.processes module#

The main Process module

class plumpy.processes.BundleKeys[source]#

Bases: object

String keys used by the process to save its state in the state bundle.

See plumpy.processes.Process.save_instance_state() and plumpy.processes.Process.load_instance_state().

INPUTS_PARSED = 'INPUTS_PARSED'#
INPUTS_RAW = 'INPUTS_RAW'#
OUTPUTS = 'OUTPUTS'#
class plumpy.processes.Process(*args: Any, **kwargs: Any)[source]#

Bases: StateMachine, Savable

The Process class is the base for any unit of work in plumpy.

A process can be in one of the following states:

  • CREATED

  • RUNNING

  • WAITING

  • FINISHED

  • EXCEPTED

  • KILLED

as defined in the ProcessState enum.

                  ___
                 |   v
CREATED (x) --- RUNNING (x) --- FINISHED (o)
                 |   ^          /
                 v   |         /
                WAITING (x) --
                 |   ^
                  ---

* -- EXCEPTED (o)
* -- KILLED (o)
  • (o): terminal state

  • (x): non terminal state

When a Process enters a state is always gets a corresponding message, e.g. on entering RUNNING it will receive the on_run message. These are always called immediately after that state is entered but before being executed.

__called: bool = False#
_abc_impl = <_abc._abc_data object>#
_auto_persist: Set[str] | None = {'_creation_time', '_event_helper', '_future', '_paused', '_pid', '_pre_paused_status', '_status'}#
_cleanups: List[Callable[[], None]] | None = None#
_closed = False#
_create_interrupt_action(exception: Interruption) CancellableAction[source]#

Create an interrupt action from the corresponding interrupt exception

Parameters:

exception – The interrupt exception

Returns:

The interrupt action

_creation_time: float | None#
_do_pause(state_msg: Dict[str, Any] | None, next_state: State | None = None) bool[source]#

Carry out the pause procedure, optionally transitioning to the next state first

_event_callbacks: Dict[Hashable, List[EVENT_CALLBACK_TYPE]]#
_fire_event(evt: Callable[[...], Any], *args: Any, **kwargs: Any) None[source]#
_interrupt_action: CancellableAction | None = None#
_killing: CancellableAction | None = None#
_outputs: Dict[str, Any]#
_parsed_inputs: utils.AttributesFrozendict | None#
_paused: SavableFuture | None = None#
_pausing: CancellableAction | None = None#
_pre_paused_status: str | None#
_process_scope() Generator[None, None, None][source]#

This context manager function is used to make sure the process stack is correct meaning that globally someone can ask for Process.current() to get the last process that is on the call stack.

async _run_task(callback: Callable[[...], T], *args: Any, **kwargs: Any) T[source]#

This method should be used to run all Process related functions and coroutines. If there is an exception the process will enter the EXCEPTED state.

Parameters:
  • callback – A function or coroutine

  • args – Optional positional arguments passed to fn

  • kwargs – Optional keyword arguments passed to fn

Returns:

The value as returned by fn

_schedule_rpc(callback: Callable[[...], Any], *args: Any, **kwargs: Any) Future[source]#

Schedule a call to a callback as a result of an RPC communication call, this will return a future that resolves to the final result (even after one or more layer of futures being returned) of the callback.

The callback will be scheduled at the working thread where the process event loop runs.

Parameters:
  • callback – the callback function or coroutine

  • args – the positional arguments to the callback

  • kwargs – the keyword arguments to the callback

Returns:

a kiwi future that resolves to the outcome of the callback

_set_interrupt_action(new_action: CancellableAction | None) None[source]#

Set the interrupt action cancelling the current one if it exists :param new_action: The new interrupt action to set

_set_interrupt_action_from_exception(interrupt_exception: Interruption) None[source]#

Set an interrupt action from the corresponding interrupt exception

_setup_event_hooks() None[source]#

Set the event hooks to process, when it is created or loaded(recreated).

_spec_class#

alias of ProcessSpec

_state: State | None#
_status: str | None#
_stepping = False#
_uuid: uuid.UUID | None#
add_cleanup(cleanup: Callable[[], None]) None[source]#

Add callback, which will be run when the process is being closed.

add_process_listener(listener: ProcessListener) None[source]#

Add a process listener to the process.

The listener defines the actions to take when the process is triggering the specific state condition.

broadcast_receive(_comm: Communicator, msg: Dict[str, Any], sender: Any, subject: Any, correlation_id: Any) Future | None[source]#

Coroutine called when the process receives a message from the communicator

Parameters:
  • _comm – the communicator that sent the message

  • msg – the message

call_soon(callback: Callable[[...], Any], *args: Any, **kwargs: Any) ProcessCallback[source]#

Schedule a callback to what is considered an internal process function (this needn’t be a method). If it raises an exception it will cause the process to fail.

callback_excepted(_callback: Callable[[...], Any], exception: BaseException | None, trace: TracebackType | None) None[source]#
close() None[source]#

Calling this method indicates that this process should not ran anymore and will trigger any runtime resources (such as the communicator connection) to be cleaned up. The state of the process will still be accessible.

It is safe to call this method multiple times.

create_initial_state() State[source]#

This method is here to override its superclass.

Automatically enter the CREATED state when the process is created.

Returns:

A Created state

property creation_time: float | None#

The creation time of this Process as returned by time.time() when instantiated :return: The creation time

classmethod current() Process | None[source]#

Get the currently running process i.e. the one at the top of the stack

Returns:

the currently running process

decode_input_args(encoded: Any) Any[source]#

Decode saved input arguments as they came from the saved instance state plumpy.persistence.Bundle. The decoded inputs should contain no reference to the encoded inputs that were passed in. This often will mean making a deepcopy of the encoded input dictionary.

Parameters:

encoded

Returns:

The decoded input args

classmethod define(_spec: ProcessSpec) None[source]#

Define the specification of the process.

Normally should be overridden by subclasses.

done() bool[source]#

Return True if the call was successfully killed or finished running.

Deprecated since version 0.18.6: Use the has_terminated method instead

encode_input_args(inputs: Any) Any[source]#

Encode input arguments such that they may be saved in a plumpy.persistence.Bundle. The encoded inputs should contain no reference to the inputs that were passed in. This often will mean making a deepcopy of the input dictionary.

Parameters:

inputs – A mapping of the inputs as passed to the process

Returns:

The encoded inputs

exception() BaseException | None[source]#

Return exception, if the process is terminated in excepted state.

execute() Dict[str, Any] | None[source]#

Execute the process synchronously.

Returns:

None if not terminated, otherwise self.outputs

fail(exception: BaseException | None, trace_back: TracebackType | None) None[source]#

Fail the process in response to an exception :param exception: The exception that caused the failure :param trace_back: Optional exception traceback

future() SavableFuture[source]#

Return a savable future representing an eventual result of an asynchronous operation.

The result is set at the terminal state.

classmethod get_description() Dict[str, Any][source]#

Get a human readable description of what this Process does.

Returns:

The description.

classmethod get_name() str[source]#

Return the process class name.

classmethod get_state_classes() Dict[Hashable, Type[State]][source]#
classmethod get_states() Sequence[Type[State]][source]#

Return all allowed states of the process.

get_status_info(out_status_info: dict) None[source]#

Return updated status information of process.

Parameters:

out_status_info – the old status

has_terminated() bool[source]#

Return whether the process was terminated.

init(*args: Any, **kwargs: Any) None#
property inputs: AttributesFrozendict | None#

Return the parsed inputs.

property is_excepted: bool#

Return whether the process excepted.

Returns:

boolean, True if the process is in EXCEPTED state.

property is_killing: bool#

Return if the process is already being killed.

property is_successful: bool#

Return whether the result of the process is considered successful.

Returns:

boolean, True if the process is in Finished state with successful attribute set to True

kill(msg_text: str | None = None, force_kill: bool = False) bool | Future[source]#

Kill the process :param msg: An optional kill message

killed() bool[source]#

Return whether the process is killed.

killed_msg() Dict[str, Any] | None[source]#

Return the killed message.

launch(process_class: Type[Process], inputs: dict | None = None, pid: Hashable | None = None, logger: Logger | None = None) Process[source]#

Start running the nested process.

The process is started asynchronously, without blocking other task in the event loop.

load_instance_state(saved_state: MutableMapping[str, Any], load_context: LoadSaveContext) None[source]#

Load the process from its saved instance state.

Parameters:
  • saved_state – A bundle to load the state from

  • load_context – The load context

log_with_pid(level: int, msg: str) None[source]#

Log the message with the process pid.

property logger: Logger#

Return the logger for this class.

If not set, return the default logger.

Returns:

The logger.

property loop: AbstractEventLoop#

Return the event loop of the process.

message_receive(_comm: Communicator, msg: Dict[str, Any]) Any[source]#

Coroutine called when the process receives a message from the communicator

Parameters:
  • _comm – the communicator that sent the message

  • msg – the message

Returns:

the outcome of processing the message, the return value will be sent back as a response to the sender

on_close(*args: Any, **kwargs: Any) None#
on_create(*args: Any, **kwargs: Any) None#
on_entered(from_state: State | None) None[source]#
on_entering(state: State) None[source]#
on_except(*args: Any, **kwargs: Any) None#
on_excepted(*args: Any, **kwargs: Any) None#
on_exit_running(*args: Any, **kwargs: Any) None#
on_exit_waiting(*args: Any, **kwargs: Any) None#
on_exiting() None[source]#
on_finish(*args: Any, **kwargs: Any) None#
on_finished(*args: Any, **kwargs: Any) None#
on_kill(*args: Any, **kwargs: Any) None#
on_killed(*args: Any, **kwargs: Any) None#
on_output_emitted(output_port: str, value: Any, dynamic: bool) None[source]#
on_output_emitting(output_port: str, value: Any) None[source]#

Output is about to be emitted.

on_paused(*args: Any, **kwargs: Any) None#
on_pausing(*args: Any, **kwargs: Any) None#
on_playing(*args: Any, **kwargs: Any) None#
on_run(*args: Any, **kwargs: Any) None#
on_running(*args: Any, **kwargs: Any) None#
on_terminated() None[source]#

Call when a terminal state is reached.

on_wait(*args: Any, **kwargs: Any) None#
on_waiting(*args: Any, **kwargs: Any) None#
out(output_port: str, value: Any) None[source]#

Record an output value for a specific output port. If the output port matches an explicitly defined Port it will be validated against that. If not it will be validated against the PortNamespace, which means it will be checked for dynamicity and whether the type of the value is valid

Parameters:
  • output_port – the name of the output port, can be namespaced

  • value – the value for the output port

Raises:

ValueError if the output value is not validated against the port

property outputs: Dict[str, Any]#

Get the current outputs emitted by the Process. These may grow over time as the process runs.

Returns:

A mapping of {output_port: value} outputs

pause(msg_text: str | None = None) bool | CancellableAction[source]#

Pause the process.

Parameters:

msg – an optional message to set as the status. The current status will be saved in the private _pre_paused_status attribute, such that it can be restored when the process is played again.

Returns:

False if process is already terminated, True if already paused or pausing, a CancellableAction to pause if the process was running steps

property paused: bool#

Return whether the process was being paused.

property pid: Hashable | None#

Return the pid of the process.

play() bool[source]#

Play a process. Returns True if after this call the process is playing, False otherwise

Returns:

True if playing, False otherwise

property raw_inputs: AttributesFrozendict | None#

The AttributesFrozendict of inputs (if not None).

classmethod recreate_from(saved_state: MutableMapping[str, Any], load_context: LoadSaveContext | None = None) Process[source]#

Recreate a process from a saved state, passing any positional and keyword arguments on to load_instance_state

Parameters:
  • saved_state – The saved state to load from

  • load_context – The load context to use

Returns:

An instance of the object with its state loaded from the save state.

recreate_state(saved_state: Bundle) State[source]#

Create a state object from a saved state

Parameters:

saved_state – The saved state

Returns:

An instance of the object with its state loaded from the save state.

remove_process_listener(listener: ProcessListener) None[source]#

Remove a process listener from the process.

result() Any[source]#

Get the result from the process if it is finished. If the process was killed then a KilledError will be raise. If the process has excepted then the failing exception will be raised. If in any other state this will raise an InvalidStateError. :return: The result of the process

resume(*args: Any) None[source]#

Start running the process again.

async run() Any[source]#

This function will be run when the process is triggered. It should be overridden by a subclass.

save_instance_state(out_state: MutableMapping[str, Any], save_context: LoadSaveContext | None) None[source]#

Ask the process to save its current instance state.

Parameters:
  • out_state – A bundle to save the state to

  • save_context – The save context

set_logger(logger: Logger) None[source]#

Set the logger of the process.

set_status(status: str | None) None[source]#

Set the status message of the process.

classmethod spec() ProcessSpec[source]#
property status: str | None#

Return the status massage of the process.

step() None[source]#

Run a step.

The step is run synchronously with steps in its own process, and asynchronously with steps in other processes.

The execute function running in this method is dependent on the state of the process.

async step_until_terminated() None[source]#

If the process has not terminated, run the current step and wait until the step finished.

This is the function run by the event loop (not step).

successful() bool[source]#

Returns whether the result of the process is considered successful Will raise if the process is not in the FINISHED state

transition_failed(initial_state: Hashable, final_state: Hashable, exception: Exception, trace: TracebackType) None[source]#

Called when a state transitions fails.

This method can be overwritten to change the default behaviour which is to raise the exception.

Parameters:

exception – The transition failed exception.

property uuid: UUID | None#

Return the UUID of the process

class plumpy.processes.ProcessSpec[source]#

Bases: object

A class that defines the specifications of a plumpy.Process, this includes what its inputs, outputs, etc are.

All methods to modify the spec should have declarative names describe the spec e.g.: input, output

Every Process class has one of these.

INPUT_PORT_TYPE#

alias of InputPort

NAME_INPUTS_PORT_NAMESPACE: str = 'inputs'#
NAME_OUTPUTS_PORT_NAMESPACE: str = 'outputs'#
OUTPUT_PORT_TYPE#

alias of OutputPort

PORT_NAMESPACE_TYPE#

alias of PortNamespace

_create_port(port_namespace: PortNamespace, port_class: Type[Port | PortNamespace], name: str, **kwargs: Any) None[source]#

Create a new Port of a given class and name in a given PortNamespace

Parameters:
  • port_namespace – PortNamespace to which to add the port

  • port_class – class of the Port to create

  • name – name of the port to create

  • kwargs – options for the port

static _expose_ports(process_class: Type[Process], source: PortNamespace, destination: PortNamespace, expose_memory: Dict[str | None, Dict[Type[Process], Sequence[str]]], namespace: str | None, exclude: Sequence[str] | None, include: Sequence[str] | None, namespace_options: dict | None = None) None[source]#

Expose ports from a source PortNamespace of the ProcessSpec of a Process class into the destination PortNamespace of this ProcessSpec. If the namespace is specified, the ports will be exposed in that sub namespace. The set of ports can be restricted using the mutually exclusive exclude and include tuples. The namespace_options will be used to override the properties of the PortNamespace into which the ports are exposed, whether that has been newly created or existed already.

Parameters:
  • process_class – the Process class whose outputs to expose

  • source – the PortNamespace whose ports are to be exposed

  • destination – the PortNamespace into which the ports are to be exposed

  • namespace – a namespace in which to place PortNamespace with the exposed outputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

_exposed_inputs: EXPOSED_TYPE#
_exposed_outputs: EXPOSED_TYPE#
_ports: PortNamespace#
_sealed: bool#
expose_inputs(process_class: Type[Process], namespace: str | None = None, exclude: Sequence[str] | None = None, include: Sequence[str] | None = None, namespace_options: dict | None = None) None[source]#

This method allows one to automatically add the inputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed inputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.

Parameters:
  • process_class – the Process class whose inputs to expose

  • namespace – a namespace in which to place the exposed inputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

expose_outputs(process_class: Type[Process], namespace: str | None = None, exclude: Sequence[str] | None = None, include: Sequence[str] | None = None, namespace_options: dict | None = None) None[source]#

This method allows one to automatically add the ouputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed outputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.

Parameters:
  • process_class – the Process class whose outputs to expose

  • namespace – a namespace in which to place the exposed outputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

get_description() Dict[str, Any][source]#

Get a description of this process specification

Returns:

a dictionary with the descriptions of the input and output port namespaces

has_input(name: str) bool[source]#

Return whether the input port namespace contains a port with the given name

Parameters:

name – key of the port in the input port namespace

has_output(name: str) bool[source]#

Return whether the output port namespace contains a port with the given name

Parameters:

name – key of the port in the output port namespace

input(name: str, **kwargs: Any) None[source]#

Define an input port in the input port namespace

Parameters:
  • name – name of the input port to create

  • kwargs – options for the input port

input_namespace(name: str, **kwargs: Any) None[source]#

Create a new PortNamespace in the input port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults

Parameters:
  • name – namespace of the new port namespace

  • kwargs – keyword arguments for the PortNamespace constructor

property inputs: PortNamespace#

Get the input port namespace of the process specification

Returns:

the input PortNamespace

property logger: Logger#
property namespace_separator: str#
output(name: str, **kwargs: Any) None[source]#

Define an output port in the output port namespace

Parameters:
  • name – name of the output port to create

  • kwargs – options for the output port

output_namespace(name: str, **kwargs: Any) None[source]#

Create a new PortNamespace in the output port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults

Parameters:
  • name – namespace of the new port namespace

  • kwargs – keyword arguments for the PortNamespace constructor

property outputs: PortNamespace#

Get the output port namespace of the process specification

Returns:

the outputs PortNamespace

property ports: PortNamespace#
seal() None[source]#

Seal this specification disallowing any further changes

property sealed: bool#

Indicates if the spec is sealed or not

Returns:

True if sealed, False otherwise

Return type:

bool

exception plumpy.processes.TransitionFailed(initial_state: State, final_state: State | None = None, traceback_str: str | None = None)[source]#

Bases: Exception

A state transition failed

_format_msg() str[source]#