plumpy package

class plumpy.AttributesDict[source]

Bases: types.SimpleNamespace

Works like a dictionary, but items can also be added / accessed as attributes.

For example:

dct = AttributeDict()
dct["key1"] = "value"
dct.key2 = "value"
get(*args: Any, **kwargs: Any) → Any[source]
setdefault(key: str, value: Any) → Any[source]
class plumpy.Bundle(savable: plumpy.persistence.Savable, save_context: Optional[LoadSaveContext] = None, dereference: bool = False)[source]

Bases: dict

unbundle(load_context: Optional[LoadSaveContext] = None)plumpy.persistence.Savable[source]

This method loads the class of the object and calls its recreate_from method passing the positional and keyword arguments.

Parameters

load_context – The optional load context

Returns

An instance of the Savable

class plumpy.BundleKeys[source]

Bases: object

String keys used by the process to save its state in the state bundle.

See plumpy.processes.Process.save_instance_state() and plumpy.processes.Process.load_instance_state().

INPUTS_PARSED = 'INPUTS_PARSED'
INPUTS_RAW = 'INPUTS_RAW'
OUTPUTS = 'OUTPUTS'
exception plumpy.CancelledError[source]

Bases: concurrent.futures._base.Error

The Future was cancelled.

exception plumpy.ClosedError[source]

Bases: Exception

Raised when an mutable operation is attempted on a closed process

class plumpy.Communicator[source]

Bases: object

The interface for a communicator used to both send and receive various types of message.

abstract add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) → Any[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns

an identifier for the subscriber and can be subsequently used to remove it

abstract add_rpc_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

abstract add_task_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

Parameters
  • subscriber – The task callback function

  • identifier – the subscriber identifier

abstract broadcast_send(body, sender=None, subject=None, correlation_id=None)bool[source]

Broadcast a message to all subscribers

abstract close()[source]

Close a communicator, free up all resources and do not allow any further operations

abstract is_closed()bool[source]

Return True if the communicator was closed

abstract remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters

identifier – the identifier of the subscriber to remove

abstract remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters

identifier – The RPC subscriber identifier

abstract remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.

Parameters

identifier – the subscriber to remove

Raises

ValueError if identifier does not correspond to a known subscriber

abstract rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns

A future corresponding to the outcome of the call

Return type

kiwipy.Future

abstract task_send(task, no_reply=False) → concurrent.futures._base.Future[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns

A future corresponding to the outcome of the task

class plumpy.ContextMixin(*args: Any, **kwargs: Any)[source]

Bases: plumpy.persistence.Savable

Add a context to a Process. The contents of the context will be saved in the instance state unlike standard instance variables.

CONTEXT: str = '_context'
property ctx
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext])None[source]

Add the instance state to out_state. .. important:

The instance state will contain a pointer to the ``ctx``,
and so should be deep copied or serialised before persisting.
class plumpy.Continue(continue_fn: Callable[[], Any], *args: Any, **kwargs: Any)[source]

Bases: plumpy.process_states.Command

CONTINUE_FN = 'continue_fn'
_auto_persist: Optional[Set[str]] = {'args', 'kwargs'}
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.Created(process: Process, run_fn: Callable[[], Any], *args: Any, **kwargs: Any)[source]

Bases: plumpy.process_states.State

ALLOWED: Set[Union[None, enum.Enum, str]] = {<ProcessState.RUNNING: 'running'>, <ProcessState.KILLED: 'killed'>, <ProcessState.EXCEPTED: 'excepted'>}
LABEL: Union[None, enum.Enum, str] = 'created'
RUN_FN = 'run_fn'
_auto_persist: Optional[Set[str]] = {'args', 'in_state', 'kwargs'}
execute() → plumpy.base.state_machine.State[source]

Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.

load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.DefaultObjectLoader[source]

Bases: plumpy.loaders.ObjectLoader

A default implementation for an object loader. Can load module level classes, functions and constants.

_abc_impl = <_abc_data object>
identify_object(obj: Any)str[source]

Get an identifier for an object.

Throws a ValueError if the object cannot be identified.

Parameters

obj – The object to identify

Returns

An identifier for the object

load_object(identifier: str) → Any[source]

Given an identifier load an object.

Throws a ValueError if the object cannot be loaded.

Parameters

identifier – The identifier

Returns

The loaded object

exception plumpy.DeliveryFailed[source]

Bases: Exception

Failed to deliver a message

class plumpy.Excepted(process: Process, exception: Optional[BaseException], trace_back: Optional[traceback] = None)[source]

Bases: plumpy.process_states.State

EXC_VALUE = 'ex_value'
LABEL: Union[None, enum.Enum, str] = 'excepted'
TRACEBACK = 'traceback'
get_exc_info() → Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]][source]

Recreate the exc_info tuple and return it

load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.Finished(process: Process, result: Any, successful: bool)[source]

Bases: plumpy.process_states.State

LABEL: Union[None, enum.Enum, str] = 'finished'
_auto_persist: Optional[Set[str]] = {'in_state', 'result', 'successful'}
class plumpy.Future(*, loop=None)

Bases: object

This class is almost compatible with concurrent.futures.Future.

Differences:

  • result() and exception() do not take a timeout argument and raise an exception when the future isn’t done yet.

  • Callbacks registered with add_done_callback() are always called via the event loop’s call_soon_threadsafe().

  • This class is not compatible with the wait() and as_completed() methods in the concurrent.futures package.

_asyncio_future_blocking
_callbacks
_exception
_log_traceback
_loop
_repr_info()
_result
_source_traceback
_state
add_done_callback()

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

cancel()

Cancel the future and schedule callbacks.

If the future is already done or cancelled, return False. Otherwise, change the future’s state to cancelled, schedule the callbacks and return True.

cancelled()

Return True if the future was cancelled.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_loop()

Return the event loop the Future is bound to.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.

class plumpy.InMemoryPersister(loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]

Bases: plumpy.persistence.Persister

Mainly to be used in testing/debugging

_abc_impl = <_abc_data object>
delete_checkpoint(pid: Hashable, tag: Optional[str] = None)None[source]

Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

delete_process_checkpoints(pid: Hashable)None[source]

Delete all persisted checkpoints related to the given process id

Parameters

pid – the process id of the plumpy.Process

get_checkpoints() → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag

Returns

list of PersistedCheckpoint

get_process_checkpoints(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag

Parameters

pid – the process pid

Returns

list of PersistedCheckpoint tuples

load_checkpoint(pid: Hashable, tag: Optional[str] = None)plumpy.persistence.Bundle[source]

Load a process from a persisted checkpoint by its process id

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

Returns

a bundle with the process state

Raises

plumpy.PersistenceError Raised if there was a problem loading the checkpoint

save_checkpoint(process: Process, tag: Optional[str] = None)None[source]

Persist a Process instance

Parameters
  • processplumpy.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

Raises

plumpy.PersistenceError Raised if there was a problem saving the checkpoint

class plumpy.InputPort(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, default: Any = (), required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]

Bases: plumpy.ports.Port

A simple input port for a value being received by a process

property default
get_description() → Dict[str, str][source]

Return a description of the InputPort, which will be a dictionary of its attributes

Returns

a dictionary of the stringified InputPort attributes

has_default()bool[source]
static required_override(required: bool, default: Any)bool[source]

If a default is specified an input should no longer be marked as required. Otherwise the input should always be marked explicitly to be not required even if a default is specified.

exception plumpy.Interruption[source]

Bases: Exception

exception plumpy.InvalidStateError[source]

Bases: Exception

Raised when an operation is attempted that requires the process to be in a state that is different from the current state

class plumpy.Kill(msg: Optional[Any] = None)[source]

Bases: plumpy.process_states.Command

_auto_persist: Optional[Set[str]] = {'msg'}
exception plumpy.KillInterruption[source]

Bases: plumpy.process_states.Interruption

class plumpy.Killed(process: Process, msg: Optional[str])[source]

Bases: plumpy.process_states.State

LABEL: Union[None, enum.Enum, str] = 'killed'
_auto_persist: Optional[Set[str]] = {'in_state', 'msg'}
exception plumpy.KilledError[source]

Bases: Exception

The process was killed.

class plumpy.LoadSaveContext(loader: Optional[plumpy.loaders.ObjectLoader] = None, **kwargs: Any)[source]

Bases: object

copyextend(**kwargs: Any)plumpy.persistence.LoadSaveContext[source]

Add additional information to the context by making a copy with the new values

class plumpy.ObjectLoader[source]

Bases: object

An abstract object loaders. Concrete implementations can be used to identify an object and load it with that identifier.

_abc_impl = <_abc_data object>
abstract identify_object(obj: Any)str[source]

Get an identifier for an object.

Throws a ValueError if the object cannot be identified.

Parameters

obj – The object to identify

Returns

An identifier for the object

abstract load_object(identifier: str) → Any[source]

Given an identifier load an object.

Throws a ValueError if the object cannot be loaded.

Parameters

identifier – The identifier

Returns

The loaded object

class plumpy.OutputPort(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]

Bases: plumpy.ports.Port

exception plumpy.PauseInterruption[source]

Bases: plumpy.process_states.Interruption

class plumpy.PersistedCheckpoint(pid, tag)

Bases: tuple

_asdict()

Return a new dict which maps field names to their values.

_field_defaults = {}
_fields = ('pid', 'tag')
_fields_defaults = {}
classmethod _make(iterable)

Make a new PersistedCheckpoint object from a sequence or iterable

_replace(**kwds)

Return a new PersistedCheckpoint object replacing specified fields with new values

pid

Alias for field number 0

tag

Alias for field number 1

exception plumpy.PersistenceError[source]

Bases: Exception

Raised when there is a problem persisting the process

class plumpy.Persister[source]

Bases: object

_abc_impl = <_abc_data object>
abstract delete_checkpoint(pid: Hashable, tag: Optional[str] = None)None[source]

Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

abstract delete_process_checkpoints(pid: Hashable)None[source]

Delete all persisted checkpoints related to the given process id

Parameters

pid – the process id of the plumpy.Process

abstract get_checkpoints() → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag

Returns

list of PersistedCheckpoint

abstract get_process_checkpoints(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag

Parameters

pid – the process pid

Returns

list of PersistedCheckpoint tuples

abstract load_checkpoint(pid: Hashable, tag: Optional[str] = None)plumpy.persistence.Bundle[source]

Load a process from a persisted checkpoint by its process id

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

Returns

a bundle with the process state

Raises

plumpy.PersistenceError Raised if there was a problem loading the checkpoint

abstract save_checkpoint(process: Process, tag: Optional[str] = None)None[source]

Persist a Process instance

Parameters
  • processplumpy.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

Raises

plumpy.PersistenceError Raised if there was a problem saving the checkpoint

class plumpy.PicklePersister(pickle_directory: str)[source]

Bases: plumpy.persistence.Persister

Implementation of the abstract Persister class that stores Process states in pickles on a filesystem.

_abc_impl = <_abc_data object>
_pickle_filepath(pid: Hashable, tag: Optional[str] = None)str[source]

Returns the full filepath of the pickle for the given process id and optional checkpoint tag

delete_checkpoint(pid: Hashable, tag: Optional[str] = None)None[source]

Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

delete_process_checkpoints(pid: Hashable)None[source]

Delete all persisted checkpoints related to the given process id

Parameters

pid – the process id of the plumpy.Process

static ensure_pickle_directory(dirpath: str)None[source]

Will attempt to create the directory at dirpath and raise if it fails, except if the exception arose because the directory already existed

get_checkpoints() → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag

Returns

list of PersistedCheckpoint

get_process_checkpoints(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]

Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag

Parameters

pid – the process pid

Returns

list of PersistedCheckpoint

load_checkpoint(pid: Hashable, tag: Optional[str] = None)plumpy.persistence.Bundle[source]

Load a process from a persisted checkpoint by its process id

Parameters
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process

Returns

a bundle with the process state

static load_pickle(filepath: str) → plumpy.persistence.PersistedPickle[source]

Load a pickle from disk

Parameters

filepath – absolute filepath to the pickle

Returns

the loaded pickle

static pickle_filename(pid: Hashable, tag: Optional[str] = None)str[source]

Returns the relative filepath of the pickle for the given process id and optional checkpoint tag

save_checkpoint(process: Process, tag: Optional[str] = None)None[source]

Persist a process to a pickle on disk

Parameters
  • processplumpy.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

class plumpy.PlumpyEventLoopPolicy[source]

Bases: asyncio.unix_events._UnixDefaultEventLoopPolicy

Custom event policy that always returns the same event loop that is made reentrant by nest_asyncio.

_loop: Optional[asyncio.events.AbstractEventLoop] = None
get_event_loop() → asyncio.events.AbstractEventLoop[source]

Return the patched event loop.

class plumpy.Port(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]

Bases: object

Specifications relating to a general input/output value including properties like whether it is required, valid types, the help string, etc.

get_description() → Dict[str, Any][source]

Return a description of the Port, which will be a dictionary of its attributes

Returns

a dictionary of the stringified Port attributes

property help

Get the help string for this port

Returns

the help string

property name
property required

Is this port required?

Returns

True if required, False otherwise

property valid_type

Get the valid value type for this port if one is specified

Returns

the value value type

validate(value: Any, breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]

Validate a value to see if it is valid for this port

Parameters
  • value – the value to check

  • breadcrumbs – a tuple of the path to having reached this point in validation

property validator

Get the validator for this port

Returns

the validator

Return type

typing.Callable[[typing.Any], typing.Tuple[bool, typing.Optional[str]]]

class plumpy.PortNamespace(name: str = '', help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None, valid_type: Optional[Type[Any]] = None, default: Any = (), dynamic: bool = False, populate_defaults: bool = True)[source]

Bases: collections.abc.MutableMapping, plumpy.ports.Port

A container for Ports. Effectively it maintains a dictionary whose members are either a Port or yet another PortNamespace. This allows for the nesting of ports

NAMESPACE_SEPARATOR = '.'
_abc_impl = <_abc_data object>
absorb(port_namespace: plumpy.ports.PortNamespace, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[Dict[str, Any]] = None) → List[str][source]

Absorb another PortNamespace instance into oneself, including all its mutable properties and ports.

Mutable properties of self will be overwritten with those of the port namespace that is to be absorbed. The same goes for the ports, meaning that any ports with a key that already exists in self will be overwritten. The namespace_options dictionary can be used to yet override the mutable properties of the port namespace that is to be absorbed. The exclude and include tuples can be used to exclude or include certain ports and both are mutually exclusive.

Parameters
  • port_namespace – instance of PortNamespace that is to be absorbed into self

  • exclude – input keys to exclude from being exposed

  • include – input keys to include as exposed inputs

  • namespace_options – a dictionary with mutable PortNamespace property values to override

Returns

list of the names of the ports that were absorbed

create_port_namespace(name: str, **kwargs: Any)plumpy.ports.PortNamespace[source]

Create and return a new PortNamespace in this PortNamespace. If the name is namespaced, the sub PortNamespaces will be created recursively, except if one of the namespaces is already occupied at any level by a Port in which case a ValueError will be thrown

Parameters
  • name – name (potentially namespaced) of the port to create and return

  • kwargs – constructor arguments that will be used only for the construction of the terminal PortNamespace

Returns

PortNamespace

Raises

ValueError if any sub namespace is occupied by a non-PortNamespace port

property default
property dynamic
get_description() → Dict[str, Dict[str, Any]][source]

Return a dictionary with a description of the ports this namespace contains Nested PortNamespaces will be properly recursed and Ports will print their properties in a list

Returns

a dictionary of descriptions of the Ports contained within this PortNamespace

get_port(name: str) → Union[plumpy.ports.Port, plumpy.ports.PortNamespace][source]

Retrieve a (namespaced) port from this PortNamespace. If any of the sub namespaces of the terminal port itself cannot be found, a ValueError will be raised

Parameters

name – name (potentially namespaced) of the port to retrieve

Returns

Port

Raises

ValueError if port or namespace does not exist

has_default()bool[source]
property populate_defaults
property ports
pre_process(port_values: MutableMapping[str, Any]) → plumpy.utils.AttributesFrozendict[source]

Map port values onto the port namespace, filling in values for ports with a default.

Parameters

port_values – the dictionary with supplied port values

Returns

an AttributesFrozenDict with pre-processed port value mapping, complemented with port default values

project(port_values: MutableMapping[str, Any]) → MutableMapping[str, Any][source]

Project a (nested) dictionary of port values onto the port dictionary of this PortNamespace. That is to say, return those keys of the dictionary that are shared by this PortNamespace. If a matching key corresponds to another PortNamespace, this method will be called recursively, passing the sub dictionary belonging to that port name.

Parameters

port_values – a dictionary where keys are port names and values are actual input values

static strip_namespace(namespace: str, separator: str, rules: Optional[Sequence[str]] = None) → Optional[List[str]][source]

Filter given exclude/include rules staring with namespace and strip the first level.

For example if the namespace is base and the rules are:

('base.a', 'base.sub.b','relax.base.c', 'd')

the function will return:

('a', 'sub.c')

If the rules are None, that is what is returned as well.

Parameters
  • namespace – the string name of the namespace

  • separator – the namespace separator string

  • rules – the list or tuple of exclude or include rules to strip

Returns

None if rules=None or the list of stripped rules

property valid_type

Get the valid value type for this port if one is specified

Returns

the value value type

validate(port_values: Mapping[str, Any] = None, breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]

Validate the namespace port itself and subsequently all the port_values it contains

Parameters
  • port_values – an arbitrarily nested dictionary of parsed port values

  • breadcrumbs – a tuple of the path to having reached this point in validation

Returns

None or tuple containing 0: error string 1: tuple of breadcrumb strings to where the validation failed

validate_dynamic_ports(port_values: MutableMapping[str, Any], breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]

Validate port values with respect to the dynamic properties of the port namespace. It will check if the namespace is actually dynamic and if all values adhere to the valid types of the namespace if those are specified

Parameters
  • port_values (dict) – an arbitrarily nested dictionary of parsed port values

  • breadcrumbs (typing.Tuple[str]) – a tuple of the path to having reached this point in validation

Returns

if invalid returns a string with the reason for the validation failure, otherwise None

Return type

typing.Optional[str]

validate_ports(port_values: MutableMapping[str, Any], breadcrumbs: Sequence[str]) → Optional[plumpy.ports.PortValidationError][source]

Validate port values with respect to the explicitly defined ports of the port namespace. Ports values that are matched to an actual Port will be popped from the dictionary

Parameters
  • port_values – an arbitrarily nested dictionary of parsed port values

  • breadcrumbs – a tuple of breadcrumbs showing the path to to the value being validated

Returns

None or tuple containing 0: error string 1: tuple of breadcrumb strings to where the validation failed

exception plumpy.PortValidationError(message: str, port: str)[source]

Bases: Exception

Error when validation fails on a port

property message

Get the validation error message

Returns

the error message

property port

Get the port breadcrumbs

Returns

the port where the error occurred

class plumpy.Process(*args: Any, **kwargs: Any)[source]

Bases: plumpy.base.state_machine.StateMachine, plumpy.persistence.Savable

The Process class is the base for any unit of work in plumpy.

A process can be in one of the following states:

  • CREATED

  • RUNNING

  • WAITING

  • FINISHED

  • EXCEPTED

  • KILLED

as defined in the ProcessState enum.

                  ___
                 |   v
CREATED (x) --- RUNNING (x) --- FINISHED (o)
                 |   ^          /
                 v   |         /
                WAITING (x) --
                 |   ^
                  ---

* -- EXCEPTED (o)
* -- KILLED (o)
  • (o): terminal state

  • (x): non terminal state

When a Process enters a state is always gets a corresponding message, e.g. on entering RUNNING it will receive the on_run message. These are always called immediately after that state is entered but before being executed.

__called: bool = False
_abc_impl = <_abc_data object>
_auto_persist: Optional[Set[str]] = {'_creation_time', '_future', '_paused', '_pid', '_pre_paused_status', '_status'}
_cleanups: Optional[List[Callable[], None]]] = None
_closed = False
_create_interrupt_action(exception: plumpy.process_states.Interruption) → plumpy.futures.CancellableAction[source]

Create an interrupt action from the corresponding interrupt exception

Parameters

exception – The interrupt exception

Returns

The interrupt action

_do_pause(state_msg: Optional[str], next_state: Optional[plumpy.process_states.State] = None)bool[source]

Carry out the pause procedure, optionally transitioning to the next state first

_fire_event(evt: Callable[[], Any], *args: Any, **kwargs: Any)None[source]
_interrupt_action: Optional[plumpy.futures.CancellableAction] = None
_killing: Optional[plumpy.futures.CancellableAction] = None
_paused: Optional[plumpy.persistence.SavableFuture] = None
_pausing: Optional[plumpy.futures.CancellableAction] = None
_process_scope() → Generator[None, None, None][source]

This context manager function is used to make sure the process stack is correct meaning that globally someone can ask for Process.current() to get the last process that is on the call stack.

async _run_task(callback: Callable[[], Any], *args: Any, **kwargs: Any) → Any[source]

This method should be used to run all Process related functions and coroutines. If there is an exception the process will enter the EXCEPTED state.

Parameters
  • callback – A function or coroutine

  • args – Optional positional arguments passed to fn

  • kwargs – Optional keyword arguments passed to fn

Returns

The value as returned by fn

_schedule_rpc(callback: Callable[[], Any], *args: Any, **kwargs: Any) → concurrent.futures._base.Future[source]

Schedule a call to a callback as a result of an RPC communication call, this will return a future that resolves to the final result (even after one or more layer of futures being returned) of the callback.

The callback will be scheduled at the working thread where the process event loop runs.

Parameters
  • callback – the callback function or coroutine

  • args – the positional arguments to the callback

  • kwargs – the keyword arguments to the callback

Returns

a kiwi future that resolves to the outcome of the callback

_set_interrupt_action(new_action: Optional[plumpy.futures.CancellableAction])None[source]

Set the interrupt action cancelling the current one if it exists :param new_action: The new interrupt action to set

_set_interrupt_action_from_exception(interrupt_exception: plumpy.process_states.Interruption)None[source]

Set an interrupt action from the corresponding interrupt exception

_setup_event_hooks()None[source]

Set the event hooks to process, when it is created or loaded(recreated).

_spec_class

alias of plumpy.process_spec.ProcessSpec

_stepping = False
add_cleanup(cleanup: Callable[], None])None[source]

Add callback, which will be run when the process is being closed.

add_process_listener(listener: plumpy.process_listener.ProcessListener)None[source]

Add a process listener to the process.

The listener defines the actions to take when the process is triggering the specific state condition.

broadcast_receive(_comm: kiwipy.communications.Communicator, body: Any, sender: Any, subject: Any, correlation_id: Any) → Optional[concurrent.futures._base.Future][source]

Coroutine called when the process receives a message from the communicator

Parameters
  • _comm – the communicator that sent the message

  • msg – the message

call_soon(callback: Callable[[], Any], *args: Any, **kwargs: Any) → plumpy.events.ProcessCallback[source]

Schedule a callback to what is considered an internal process function (this needn’t be a method). If it raises an exception it will cause the process to fail.

callback_excepted(_callback: Callable[[], Any], exception: Optional[BaseException], trace: Optional[traceback])None[source]
close()None[source]

Calling this method indicates that this process should not ran anymore and will trigger any runtime resources (such as the communicator connection) to be cleaned up. The state of the process will still be accessible.

It is safe to call this method multiple times.

create_initial_state() → plumpy.process_states.State[source]

This method is here to override its superclass.

Automatically enter the CREATED state when the process is created.

Returns

A Created state

property creation_time

The creation time of this Process as returned by time.time() when instantiated :return: The creation time

classmethod current() → Optional[plumpy.processes.Process][source]

Get the currently running process i.e. the one at the top of the stack

Returns

the currently running process

decode_input_args(encoded: Any) → Any[source]

Decode saved input arguments as they came from the saved instance state plumpy.persistence.Bundle. The decoded inputs should contain no reference to the encoded inputs that were passed in. This often will mean making a deepcopy of the encoded input dictionary.

Parameters

encoded

Returns

The decoded input args

classmethod define(_spec: plumpy.process_spec.ProcessSpec)None[source]

Define the specification of the process.

Normally should be overridden by subclasses.

done()bool[source]

Return True if the call was successfully killed or finished running.

Deprecated since version 0.18.6: Use the has_terminated method instead

encode_input_args(inputs: Any) → Any[source]

Encode input arguments such that they may be saved in a plumpy.persistence.Bundle. The encoded inputs should contain no reference to the inputs that were passed in. This often will mean making a deepcopy of the input dictionary.

Parameters

inputs – A mapping of the inputs as passed to the process

Returns

The encoded inputs

exception() → Optional[BaseException][source]

Return exception, if the process is terminated in excepted state.

execute() → Optional[Dict[str, Any]][source]

Execute the process. This will return if the process terminates or is paused.

Returns

None if not terminated, otherwise self.outputs

fail(exception: Optional[BaseException], trace_back: Optional[traceback])None[source]

Fail the process in response to an exception :param exception: The exception that caused the failure :param trace_back: Optional exception traceback

future()plumpy.persistence.SavableFuture[source]

Return a savable future representing an eventual result of an asynchronous operation.

The result is set at the terminal state.

classmethod get_description() → Dict[str, Any][source]

Get a human readable description of what this Process does.

Returns

The description.

classmethod get_name()str[source]

Return the process class name.

classmethod get_state_classes() → Dict[Hashable, Type[plumpy.process_states.State]][source]
classmethod get_states() → Sequence[Type[plumpy.process_states.State]][source]

Return all allowed states of the process.

get_status_info(out_status_info: dict)None[source]

Return updated status information of process.

Parameters

out_status_info – the old status

has_terminated()bool[source]

Return whether the process was terminated.

init(*args: Any, **kwargs: Any)None
property inputs

Return the parsed inputs.

property is_killing

Return if the process is already being killed.

property is_successful

Return whether the result of the process is considered successful.

Returns

boolean, True if the process is in Finished state with successful attribute set to True

kill(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]

Kill the process :param msg: An optional kill message

killed()bool[source]

Return whether the process is killed.

killed_msg() → Optional[str][source]

Return the killed message.

launch(process_class: Type[Process], inputs: Optional[dict] = None, pid: Optional[Hashable] = None, logger: Optional[logging.Logger] = None)plumpy.processes.Process[source]

Start running the nested process.

The process is started asynchronously, without blocking other task in the event loop.

load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]

Load the process from its saved instance state.

Parameters
  • saved_state – A bundle to load the state from

  • load_context – The load context

log_with_pid(level: int, msg: str)None[source]

Log the message with the process pid.

property logger

Return the logger for this class.

If not set, return the default logger.

Returns

The logger.

property loop

Return the event loop of the process.

message_receive(_comm: kiwipy.communications.Communicator, msg: Dict[str, Any]) → Any[source]

Coroutine called when the process receives a message from the communicator

Parameters
  • _comm – the communicator that sent the message

  • msg – the message

Returns

the outcome of processing the message, the return value will be sent back as a response to the sender

on_close(*args: Any, **kwargs: Any)None
on_create(*args: Any, **kwargs: Any)None
on_entered(from_state: Optional[plumpy.process_states.State])None[source]
on_entering(state: plumpy.process_states.State)None[source]
on_except(*args: Any, **kwargs: Any)None
on_excepted(*args: Any, **kwargs: Any)None
on_exit_running(*args: Any, **kwargs: Any)None
on_exit_waiting(*args: Any, **kwargs: Any)None
on_exiting()None[source]
on_finish(*args: Any, **kwargs: Any)None
on_finished(*args: Any, **kwargs: Any)None
on_kill(*args: Any, **kwargs: Any)None
on_killed(*args: Any, **kwargs: Any)None
on_output_emitted(output_port: str, value: Any, dynamic: bool)None[source]
on_output_emitting(output_port: str, value: Any)None[source]

Output is about to be emitted.

on_paused(*args: Any, **kwargs: Any)None
on_pausing(*args: Any, **kwargs: Any)None
on_playing(*args: Any, **kwargs: Any)None
on_run(*args: Any, **kwargs: Any)None
on_running(*args: Any, **kwargs: Any)None
on_terminated()None[source]

Call when a terminal state is reached.

on_wait(*args: Any, **kwargs: Any)None
on_waiting(*args: Any, **kwargs: Any)None
out(output_port: str, value: Any)None[source]

Record an output value for a specific output port. If the output port matches an explicitly defined Port it will be validated against that. If not it will be validated against the PortNamespace, which means it will be checked for dynamicity and whether the type of the value is valid

Parameters
  • output_port – the name of the output port, can be namespaced

  • value – the value for the output port

Raises

ValueError if the output value is not validated against the port

property outputs

Get the current outputs emitted by the Process. These may grow over time as the process runs.

Returns

A mapping of {output_port: value} outputs

pause(msg: Optional[str] = None) → Union[bool, plumpy.futures.CancellableAction][source]

Pause the process.

Parameters

msg – an optional message to set as the status. The current status will be saved in the private _pre_paused_status attribute, such that it can be restored when the process is played again.

Returns

False if process is already terminated, True if already paused or pausing, a CancellableAction to pause if the process was running steps

property paused

Return whether the process was being paused.

property pid

Return the pid of the process.

play()bool[source]

Play a process. Returns True if after this call the process is playing, False otherwise

Returns

True if playing, False otherwise

property raw_inputs

The AttributesFrozendict of inputs (if not None).

classmethod recreate_from(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None)plumpy.processes.Process[source]

Recreate a process from a saved state, passing any positional and keyword arguments on to load_instance_state

Parameters
  • saved_state – The saved state to load from

  • load_context – The load context to use

Returns

An instance of the object with its state loaded from the save state.

recreate_state(saved_state: plumpy.persistence.Bundle) → plumpy.process_states.State[source]

Create a state object from a saved state

Parameters

saved_state – The saved state

Returns

An instance of the object with its state loaded from the save state.

remove_process_listener(listener: plumpy.process_listener.ProcessListener)None[source]

Remove a process listener from the process.

result() → Any[source]

Get the result from the process if it is finished. If the process was killed then a KilledError will be raise. If the process has excepted then the failing exception will be raised. If in any other state this will raise an InvalidStateError. :return: The result of the process

resume(*args: Any)None[source]

Start running the process again.

run() → Any[source]

This function will be run when the process is triggered. It should be overridden by a subclass.

save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext])None[source]

Ask the process to save its current instance state.

Parameters
  • out_state – A bundle to save the state to

  • save_context – The save context

set_logger(logger: logging.Logger)None[source]

Set the logger of the process.

set_status(status: Optional[str])None[source]

Set the status message of the process.

classmethod spec()plumpy.process_spec.ProcessSpec[source]
property status

Return the status massage of the process.

step()None[source]

Run a step.

The step is run synchronously with steps in its own process, and asynchronously with steps in other processes.

The execute function running in this method is dependent on the state of the process.

async step_until_terminated()None[source]

If the process has not terminated, run the current step and wait until the step finished.

This is the function run by the event loop (not step).

successful()bool[source]

Returns whether the result of the process is considered successful Will raise if the process is not in the FINISHED state

transition_excepted(_initial_state: Any, final_state: plumpy.process_states.ProcessState, exception: Exception, trace: traceback)None[source]
property uuid

Return the UUID of the process

class plumpy.ProcessLauncher(loop: Optional[asyncio.events.AbstractEventLoop] = None, persister: Optional[plumpy.persistence.Persister] = None, load_context: Optional[plumpy.persistence.LoadSaveContext] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]

Bases: object

Takes incoming task messages and uses them to launch processes.

Expected format of task:

For launch:

{
    'task': <LAUNCH_TASK>
    'process_class': <Process class to launch>
    'args': <tuple of positional args for process constructor>
    'kwargs': <dict of keyword args for process constructor>.
    'nowait': True or False
}

For continue:

{
    'task': <CONTINUE_TASK>
    'pid': <Process ID>
    'nowait': True or False
}
async _continue(_communicator: kiwipy.communications.Communicator, pid: Hashable, nowait: bool, tag: Optional[str] = None) → Union[Hashable, Any][source]

Continue the process

Parameters
  • _communicator – the communicator

  • pid – the pid of the process to continue

  • nowait – if True don’t wait for the process to complete

  • tag – the checkpoint tag to continue from

async _create(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Hashable[source]

Create the process

Parameters
  • _communicator – the communicator

  • process_class – the process class to create

  • persist – should the process be persisted

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

Returns

the pid of the created process

async _launch(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, nowait: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Union[Hashable, Any][source]

Launch the process

Parameters
  • _communicator – the communicator

  • process_class – the process class to launch

  • persist – should the process be persisted

  • nowait – if True only return when the process finishes

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

Returns

the pid of the created process or the outputs (if nowait=False)

class plumpy.ProcessListener[source]

Bases: object

_abc_impl = <_abc_data object>
on_output_emitted(process: Process, output_port: str, value: Any, dynamic: bool)None[source]

Called when the process has emitted an output value

Parameters
  • process – The process

  • output_port – The output port that the value was outputted on

  • value – The value that was outputted

  • dynamic – True if the port is dynamic, False otherwise

on_process_created(process: Process)None[source]

Called when the process has been started

Parameters

process – The process

on_process_excepted(process: Process, reason: str)None[source]

Called when the process has excepted

Parameters
  • process – The process

  • reason – A string of the exception message

on_process_finished(process: Process, outputs: Any)None[source]

Called when the process has finished successfully

Parameters
  • process – The process

  • outputs – The process outputs

on_process_killed(process: Process, msg: str)None[source]

Called when the process was killed

Parameters

process – The process

on_process_paused(process: Process)None[source]

Called when the process is about to re-enter the RUNNING state

Parameters

process – The process

on_process_played(process: Process)None[source]

Called when the process is about to re-enter the RUNNING state

Parameters

process – The process

on_process_running(process: Process)None[source]

Called when the process is about to enter the RUNNING state

Parameters

process – The process

on_process_waiting(process: Process)None[source]

Called when the process is about to enter the WAITING state

Parameters

process – The process

class plumpy.ProcessSpec[source]

Bases: object

A class that defines the specifications of a plumpy.Process, this includes what its inputs, outputs, etc are.

All methods to modify the spec should have declarative names describe the spec e.g.: input, output

Every Process class has one of these.

INPUT_PORT_TYPE

alias of plumpy.ports.InputPort

NAME_INPUTS_PORT_NAMESPACE: str = 'inputs'
NAME_OUTPUTS_PORT_NAMESPACE: str = 'outputs'
OUTPUT_PORT_TYPE

alias of plumpy.ports.OutputPort

PORT_NAMESPACE_TYPE

alias of plumpy.ports.PortNamespace

_create_port(port_namespace: plumpy.ports.PortNamespace, port_class: Type[Union[plumpy.ports.Port, plumpy.ports.PortNamespace]], name: str, **kwargs: Any)None[source]

Create a new Port of a given class and name in a given PortNamespace

Parameters
  • port_namespace – PortNamespace to which to add the port

  • port_class – class of the Port to create

  • name – name of the port to create

  • kwargs – options for the port

static _expose_ports(process_class: Type[Process], source: plumpy.ports.PortNamespace, destination: plumpy.ports.PortNamespace, expose_memory: Dict[Optional[str], Dict[Type[Process], Sequence[str]]], namespace: Optional[str], exclude: Optional[Sequence[str]], include: Optional[Sequence[str]], namespace_options: Optional[dict] = None)None[source]

Expose ports from a source PortNamespace of the ProcessSpec of a Process class into the destination PortNamespace of this ProcessSpec. If the namespace is specified, the ports will be exposed in that sub namespace. The set of ports can be restricted using the mutually exclusive exclude and include tuples. The namespace_options will be used to override the properties of the PortNamespace into which the ports are exposed, whether that has been newly created or existed already.

Parameters
  • process_class – the Process class whose outputs to expose

  • source – the PortNamespace whose ports are to be exposed

  • destination – the PortNamespace into which the ports are to be exposed

  • namespace – a namespace in which to place PortNamespace with the exposed outputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

expose_inputs(process_class: Type[Process], namespace: Optional[str] = None, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[dict] = None)None[source]

This method allows one to automatically add the inputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed inputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.

Parameters
  • process_class – the Process class whose inputs to expose

  • namespace – a namespace in which to place the exposed inputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

expose_outputs(process_class: Type[Process], namespace: Optional[str] = None, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[dict] = None)None[source]

This method allows one to automatically add the ouputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed outputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.

Parameters
  • process_class – the Process class whose outputs to expose

  • namespace – a namespace in which to place the exposed outputs

  • exclude – input ports that are to be excluded

  • include – input ports that are to be included

  • namespace_options – a dictionary with mutable PortNamespace property values to override

get_description() → Dict[str, Any][source]

Get a description of this process specification

Returns

a dictionary with the descriptions of the input and output port namespaces

has_input(name: str)bool[source]

Return whether the input port namespace contains a port with the given name

Parameters

name – key of the port in the input port namespace

has_output(name: str)bool[source]

Return whether the output port namespace contains a port with the given name

Parameters

name – key of the port in the output port namespace

input(name: str, **kwargs: Any)None[source]

Define an input port in the input port namespace

Parameters
  • name – name of the input port to create

  • kwargs – options for the input port

input_namespace(name: str, **kwargs: Any)None[source]

Create a new PortNamespace in the input port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults

Parameters
  • name – namespace of the new port namespace

  • kwargs – keyword arguments for the PortNamespace constructor

property inputs

Get the input port namespace of the process specification

Returns

the input PortNamespace

property logger
property namespace_separator
output(name: str, **kwargs: Any)None[source]

Define an output port in the output port namespace

Parameters
  • name – name of the output port to create

  • kwargs – options for the output port

output_namespace(name: str, **kwargs: Any)None[source]

Create a new PortNamespace in the output port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults

Parameters
  • name – namespace of the new port namespace

  • kwargs – keyword arguments for the PortNamespace constructor

property outputs

Get the output port namespace of the process specification

Returns

the outputs PortNamespace

property ports
seal()None[source]

Seal this specification disallowing any further changes

property sealed

Indicates if the spec is sealed or not

Returns

True if sealed, False otherwise

Return type

bool

class plumpy.ProcessState(value)[source]

Bases: enum.Enum

The possible states that a Process can be in.

CREATED: str = 'created'
EXCEPTED: str = 'excepted'
FINISHED: str = 'finished'
KILLED: str = 'killed'
RUNNING: str = 'running'
WAITING: str = 'waiting'
exception plumpy.RemoteException[source]

Bases: Exception

An exception occurred at the remote end of the call

class plumpy.RemoteProcessController(communicator: kiwipy.communications.Communicator)[source]

Bases: object

Control remote processes using coroutines that will send messages and wait (in a non-blocking way) for their response

async continue_process(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Optional[Any][source]

Continue the process

Parameters
  • _communicator – the communicator

  • pid – the pid of the process to continue

  • tag – the checkpoint tag to continue from

async execute_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]

Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.

Parameters
  • process_class – the process class to execute

  • init_args – the positional arguments to the class constructor

  • init_kwargs – the keyword arguments to the class constructor

  • loader – the class loader to use

  • nowait – if True, don’t wait for the process to send a response

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of executing the process

async get_status(pid: Hashable) → Any[source]

Get the status of a process with the given PID :param pid: the process id :return: the status response from the process

async kill_process(pid: Hashable, msg: Optional[Any] = None) → Any[source]

Kill the process

Parameters
  • pid – the pid of the process to kill

  • msg – optional kill message

Returns

True if killed, False otherwise

async launch_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]

Launch a process given the class and constructor arguments

Parameters
  • process_class – the class of the process to launch

  • init_args – the constructor positional arguments

  • init_kwargs – the constructor keyword arguments

  • persist – should the process be persisted

  • loader – the classloader to use

  • nowait – if True, don’t wait for the process to send a response, just return the pid

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of launching the process

async pause_process(pid: Hashable, msg: Optional[Any] = None) → Any[source]

Pause the process

Parameters
  • pid – the pid of the process to pause

  • msg – optional pause message

Returns

True if paused, False otherwise

async play_process(pid: Hashable) → Any[source]

Play the process

Parameters

pid – the pid of the process to play

Returns

True if played, False otherwise

class plumpy.RemoteProcessThreadController(communicator: kiwipy.communications.Communicator)[source]

Bases: object

A class that can be used to control and launch remote processes

continue_process(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]
execute_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]

Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.

Parameters
  • process_class – the process class to execute

  • init_args – the positional arguments to the class constructor

  • init_kwargs – the keyword arguments to the class constructor

  • loader – the class loader to use

  • nowait – if True, don’t wait for the process to send a response

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of executing the process

get_status(pid: Hashable) → concurrent.futures._base.Future[source]

Get the status of a process with the given PID.

Parameters

pid – the process id

Returns

the status response from the process

kill_all(msg: Optional[Any])None[source]

Kill all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

kill_process(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]

Kill the process

Parameters
  • pid – the pid of the process to kill

  • msg – optional kill message

Returns

a response future from the process to be killed

launch_process(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]

Launch the process

Parameters
  • process_class – the process class to launch

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

  • persist – should the process be persisted

  • loader – the class loader to use

  • nowait – if True only return when the process finishes

  • no_reply – don’t send a reply to the sender

Returns

the pid of the created process or the outputs (if nowait=False)

pause_all(msg: Any)None[source]

Pause all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

pause_process(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]

Pause the process

Parameters
  • pid – the pid of the process to pause

  • msg – optional pause message

Returns

a response future from the process to be paused

play_all()None[source]

Play all processes that are subscribed to the same communicator

play_process(pid: Hashable) → concurrent.futures._base.Future[source]

Play the process

Parameters

pid – the pid of the process to pause

Returns

a response future from the process to be played

task_send(message: Any, no_reply: bool = False) → Optional[Any][source]

Send a task to be performed using the communicator

Parameters
  • message – the task message

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the response from the remote side (if no_reply=False)

class plumpy.Running(process: Process, run_fn: Callable[[], Any], *args: Any, **kwargs: Any)[source]

Bases: plumpy.process_states.State

ALLOWED: Set[Union[None, enum.Enum, str]] = {<ProcessState.RUNNING: 'running'>, <ProcessState.KILLED: 'killed'>, <ProcessState.EXCEPTED: 'excepted'>, <ProcessState.FINISHED: 'finished'>, <ProcessState.WAITING: 'waiting'>}
COMMAND = 'command'
LABEL: Union[None, enum.Enum, str] = 'running'
RUN_FN = 'run_fn'
_action_command(command: Union[plumpy.process_states.Kill, plumpy.process_states.Stop, plumpy.process_states.Wait, plumpy.process_states.Continue]) → plumpy.process_states.State[source]
_auto_persist: Optional[Set[str]] = {'args', 'in_state', 'kwargs'}
_command: Union[None, plumpy.process_states.Kill, plumpy.process_states.Stop, plumpy.process_states.Wait, plumpy.process_states.Continue] = None
_run_handle = None
_running: bool = False
async execute() → plumpy.process_states.State[source]

Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.

interrupt(reason: Any)None[source]
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.Savable[source]

Bases: object

CLASS_NAME: str = 'class_name'
_auto_persist: Optional[Set[str]] = None
_ensure_persist_configured()None[source]
static _get_class_name(saved_state: MutableMapping[str, Any])str[source]
static _get_create_meta(out_state: MutableMapping[str, Any]) → Dict[str, Any][source]
static _get_meta_type(saved_state: MutableMapping[str, Any], name: str) → Any[source]
_get_value(saved_state: MutableMapping[str, Any], name: str, load_context: Optional[plumpy.persistence.LoadSaveContext]) → Union[method, plumpy.persistence.Savable][source]
_persist_configured = False
static _set_class_name(out_state: MutableMapping[str, Any], name: str)None[source]
static _set_meta_type(out_state: MutableMapping[str, Any], name: str, type_spec: Any)None[source]
classmethod auto_persist(*members: str)None[source]
static get_custom_meta(saved_state: MutableMapping[str, Any], name: str) → Any[source]
static load(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None)plumpy.persistence.Savable[source]

Load a Savable from a saved instance state. The load context is a way of passing runtime data to the object being loaded.

Parameters
  • saved_state – The saved state

  • load_context – Additional runtime state that can be passed into when loading. The type and content (if any) is completely user defined

Returns

The loaded Savable instance

load_instance_state(*args: Any, **kwargs: Any)None
load_members(members: Iterable[str], saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None)None[source]
classmethod persist()None[source]
classmethod recreate_from(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None)plumpy.persistence.Savable[source]

Recreate a Savable from a saved state using an optional load context.

Parameters
  • saved_state – The saved state

  • load_context – An optional load context

Returns

The recreated instance

save(save_context: Optional[plumpy.persistence.LoadSaveContext] = None) → MutableMapping[str, Any][source]
save_instance_state(*args: Any, **kwargs: Any)None
save_members(members: Iterable[str], out_state: MutableMapping[str, Any])None[source]
static set_custom_meta(out_state: MutableMapping[str, Any], name: str, value: Any)None[source]
class plumpy.SavableFuture(*, loop=None)[source]

Bases: _asyncio.Future, plumpy.persistence.Savable

A savable future.

_auto_persist: Optional[Set[str]] = {'_result', '_state'}
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
classmethod recreate_from(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None)plumpy.persistence.Savable[source]

Recreate a Savable from a saved state using an optional load context.

Parameters
  • saved_state – The saved state

  • load_context – An optional load context

Returns

The recreated instance

save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.Stop(result: Any, successful: bool)[source]

Bases: plumpy.process_states.Command

_auto_persist: Optional[Set[str]] = {'result'}
exception plumpy.TaskRejected[source]

Bases: Exception

A task was rejected at the remote end

plumpy.ToContext

alias of builtins.dict

exception plumpy.TransitionFailed(initial_state: plumpy.base.state_machine.State, final_state: Optional[State] = None, traceback_str: Optional[str] = None)[source]

Bases: Exception

A state transition failed

_format_msg()str[source]
class plumpy.UnsuccessfulResult(result: Optional[int] = None)[source]

Bases: object

The result of the process was unsuccessful

class plumpy.Wait(continue_fn: Optional[Callable[[], Any]] = None, msg: Optional[Any] = None, data: Optional[Any] = None)[source]

Bases: plumpy.process_states.Command

_auto_persist: Optional[Set[str]] = {'data', 'msg'}
class plumpy.Waiting(process: Process, done_callback: Optional[Callable[[], Any]], msg: Optional[str] = None, data: Optional[Any] = None)[source]

Bases: plumpy.process_states.State

ALLOWED: Set[Union[None, enum.Enum, str]] = {<ProcessState.RUNNING: 'running'>, <ProcessState.KILLED: 'killed'>, <ProcessState.EXCEPTED: 'excepted'>, <ProcessState.FINISHED: 'finished'>, <ProcessState.WAITING: 'waiting'>}
DONE_CALLBACK = 'DONE_CALLBACK'
LABEL: Union[None, enum.Enum, str] = 'waiting'
_auto_persist: Optional[Set[str]] = {'data', 'in_state', 'msg'}
_interruption = None
async execute() → plumpy.process_states.State[source]

Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.

interrupt(reason: Any)None[source]
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]
resume(value: Any = <plumpy.lang.__NULL object>)None[source]
save_instance_state(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext)None[source]
class plumpy.WorkChain(*args: Any, **kwargs: Any)[source]

Bases: plumpy.mixins.ContextMixin, plumpy.processes.Process

A WorkChain is a series of instructions carried out with the ability to save state in between.

_CONTEXT = 'CONTEXT'
_STEPPER_STATE = 'stepper_state'
_abc_impl = <_abc_data object>
_do_step() → Any[source]
_spec_class

alias of WorkChainSpec

classmethod get_state_classes() → Dict[Hashable, Type[plumpy.process_states.State]][source]
load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext)None[source]

Load the process from its saved instance state.

Parameters
  • saved_state – A bundle to load the state from

  • load_context – The load context

on_create()None[source]
run() → Any[source]

This function will be run when the process is triggered. It should be overridden by a subclass.

save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext])None[source]

Add the instance state to out_state. .. important:

The instance state will contain a pointer to the ``ctx``,
and so should be deep copied or serialised before persisting.
classmethod spec()plumpy.workchains.WorkChainSpec[source]
to_context(**kwargs: Union[_asyncio.Future, plumpy.processes.Process])None[source]

This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the workchain

class plumpy.WorkChainSpec[source]

Bases: plumpy.process_spec.ProcessSpec

get_description() → Dict[str, str][source]

Get a description of this process specification

Returns

a dictionary with the descriptions of the input and output port namespaces

get_outline() → Union[plumpy.workchains._Instruction, plumpy.workchains._FunctionCall][source]
outline(*commands: Union[_Instruction, Callable[[WorkChain], Any]])None[source]

Define the outline that describes this work chain.

Parameters

commands – One or more functions that make up this work chain.

plumpy.auto_persist(*members: str) → Callable[[Type[plumpy.persistence.Savable]], Type[plumpy.persistence.Savable]][source]
plumpy.chain(source, target)[source]

Chain two futures together so that when one completes, so does the other.

The result (success or failure) of a will be copied to b, unless b has already been completed or cancelled by the time a finishes.

plumpy.copy_future(source, target)[source]

Copy the status of future a to b unless b is already done in which case return

Parameters
  • source (kiwipy.Future) – The source future

  • target (kiwipy.Future) – The target future

plumpy.create_continue_body(pid: Hashable, tag: Optional[str] = None, nowait: bool = False) → Dict[str, Any][source]

Create a message body to continue an existing process :param pid: the pid of the existing process :param tag: the optional persistence tag :param nowait: wait for the process to finish before completing the task, otherwise just return the PID :return: a dictionary with the body of the message to continue the process

plumpy.create_launch_body(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = True) → Dict[str, Any][source]

Create a message body for the launch action

Parameters
  • process_class – the class of the process to launch

  • init_args – any initialisation positional arguments

  • init_kwargs – any initialisation keyword arguments

  • persist – persist this process if True, otherwise don’t

  • loader – the loader to use to load the persisted process

  • nowait – wait for the process to finish before completing the task, otherwise just return the PID

Returns

a dictionary with the body of the message to launch the process

Return type

dict

plumpy.create_task(coro: Callable[], Coroutine], loop: Optional[asyncio.events.AbstractEventLoop] = None) → _asyncio.Future[source]

Schedule a call to a coro in the event loop and wrap the outcome in a future.

Parameters
  • coro – a function which creates the coroutine to schedule

  • loop – the event loop to schedule it in

Returns

the future representing the outcome of the coroutine

plumpy.gather(*coros_or_futures, loop=None, return_exceptions=False)[source]

Return a future aggregating results from the given coroutines/futures.

Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as passed in.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. If any child is cancelled, this is treated as if it raised CancelledError – the outer Future is not cancelled in this case. (This is to prevent the cancellation of one child to cause other children to be cancelled.)

If return_exceptions is False, cancelling gather() after it has been marked done won’t cancel any submitted awaitables. For instance, gather can be marked done after propagating an exception to the caller, therefore, calling gather.cancel() after catching an exception (raised by one of the awaitables) from gather won’t cancel any other awaitables.

plumpy.get_event_loop()

Return an asyncio event loop.

When called from a coroutine or a callback (e.g. scheduled with call_soon or similar API), this function will always return the running event loop.

If there is no running event loop set, the function will return the result of get_event_loop_policy().get_event_loop() call.

plumpy.get_object_loader()plumpy.loaders.ObjectLoader[source]

Get the plumpy global class loader

Returns

A class loader

Return type

ObjectLoader

plumpy.if_(condition: Callable[[WorkChain], bool]) → plumpy.workchains._If[source]

A conditional that can be used in a workchain outline.

Use as:

if_(cls.conditional)(
  cls.step1,
  cls.step2
)

Each step can, of course, also be any valid workchain step e.g. conditional.

Parameters

condition – The workchain method that will return True or False

plumpy.new_event_loop(*args: Any, **kwargs: Any) → asyncio.events.AbstractEventLoop[source]
plumpy.plum_to_kiwi_future(plum_future: _asyncio.Future) → concurrent.futures._base.Future[source]

Return a kiwi future that resolves to the outcome of the plum future

Parameters

plum_future – the plum future

Returns

the kiwipy future

plumpy.reset_event_loop_policy()None[source]

Reset the event loop policy to the default.

plumpy.run_until_complete(future: _asyncio.Future, loop: Optional[asyncio.events.AbstractEventLoop] = None) → Any[source]
plumpy.set_event_loop(*args: Any, **kwargs: Any)None[source]
plumpy.set_event_loop_policy()None[source]

Enable plumpy’s event loop policy that will make event loop’s reentrant.

plumpy.set_object_loader(loader: Optional[plumpy.loaders.ObjectLoader])None[source]

Set the plumpy global object loader

Parameters

loader (ObjectLoader) – An object loader

Returns

plumpy.while_(condition: Callable[[WorkChain], bool]) → plumpy.workchains._While[source]

A while loop that can be used in a workchain outline.

Use as:

while_(cls.conditional)(
  cls.step1,
  cls.step2
)

Each step can, of course, also be any valid workchain step e.g. conditional.

Parameters

condition – The workchain method that will return True or False

plumpy.wrap_communicator(communicator: kiwipy.communications.Communicator, loop: Optional[asyncio.events.AbstractEventLoop] = None) → plumpy.communications.LoopCommunicator[source]

Wrap a communicator such that all callbacks made to any subscribers are scheduled on the given event loop.

If the communicator is already an equivalent communicator wrapper then it will not be wrapped again.

Parameters
  • communicator – the communicator to wrap

  • loop – the event loop to schedule callbacks on

Returns

a communicator wrapper

Subpackages

Submodules