plumpy package¶
-
class
plumpy.
AttributesDict
[source]¶ Bases:
types.SimpleNamespace
Works like a dictionary, but items can also be added / accessed as attributes.
For example:
dct = AttributeDict() dct["key1"] = "value" dct.key2 = "value"
-
class
plumpy.
Bundle
(savable: plumpy.persistence.Savable, save_context: Optional[LoadSaveContext] = None, dereference: bool = False)[source]¶ Bases:
dict
-
unbundle
(load_context: Optional[LoadSaveContext] = None) → plumpy.persistence.Savable[source]¶ This method loads the class of the object and calls its recreate_from method passing the positional and keyword arguments.
- Parameters
load_context – The optional load context
- Returns
An instance of the Savable
-
-
class
plumpy.
BundleKeys
[source]¶ Bases:
object
String keys used by the process to save its state in the state bundle.
See
plumpy.processes.Process.save_instance_state()
andplumpy.processes.Process.load_instance_state()
.-
INPUTS_PARSED
= 'INPUTS_PARSED'¶
-
INPUTS_RAW
= 'INPUTS_RAW'¶
-
OUTPUTS
= 'OUTPUTS'¶
-
-
exception
plumpy.
CancelledError
[source]¶ Bases:
concurrent.futures._base.Error
The Future was cancelled.
-
exception
plumpy.
ClosedError
[source]¶ Bases:
Exception
Raised when an mutable operation is attempted on a closed process
-
class
plumpy.
Communicator
[source]¶ Bases:
object
The interface for a communicator used to both send and receive various types of message.
-
abstract
add_broadcast_subscriber
(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) → Any[source]¶ Add a broadcast subscriber that will receive all broadcast messages
- Parameters
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns
an identifier for the subscriber and can be subsequently used to remove it
-
abstract
add_rpc_subscriber
(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]¶ Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
-
abstract
add_task_subscriber
(subscriber: Callable[[Communicator, Any], Any], identifier=None) → Any[source]¶ Add a task subscriber to the communicator’s default queue. Returns the identifier.
- Parameters
subscriber – The task callback function
identifier – the subscriber identifier
-
abstract
broadcast_send
(body, sender=None, subject=None, correlation_id=None) → bool[source]¶ Broadcast a message to all subscribers
-
abstract
close
()[source]¶ Close a communicator, free up all resources and do not allow any further operations
-
abstract
remove_broadcast_subscriber
(identifier)[source]¶ Remove a broadcast subscriber
- Parameters
identifier – the identifier of the subscriber to remove
-
abstract
remove_rpc_subscriber
(identifier)[source]¶ Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.
- Parameters
identifier – The RPC subscriber identifier
-
abstract
remove_task_subscriber
(identifier)[source]¶ Remove a task subscriber from the communicator’s default queue.
- Parameters
identifier – the subscriber to remove
- Raises
ValueError if identifier does not correspond to a known subscriber
-
abstract
rpc_send
(recipient_id, msg)[source]¶ Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters
recipient_id – The recipient identifier
msg – The body of the message
- Returns
A future corresponding to the outcome of the call
- Return type
kiwipy.Future
-
abstract
task_send
(task, no_reply=False) → concurrent.futures._base.Future[source]¶ Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns
A future corresponding to the outcome of the task
-
abstract
-
class
plumpy.
ContextMixin
(*args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.persistence.Savable
Add a context to a Process. The contents of the context will be saved in the instance state unlike standard instance variables.
-
property
ctx
¶
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) → None[source]¶ Add the instance state to
out_state
. .. important:The instance state will contain a pointer to the ``ctx``, and so should be deep copied or serialised before persisting.
-
property
-
class
plumpy.
Continue
(continue_fn: Callable[[…], Any], *args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.process_states.Command
-
CONTINUE_FN
= 'continue_fn'¶
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
class
plumpy.
Created
(process: Process, run_fn: Callable[[…], Any], *args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.process_states.State
-
ALLOWED
: Set[Union[None, enum.Enum, str]] = {<ProcessState.KILLED: 'killed'>, <ProcessState.RUNNING: 'running'>, <ProcessState.EXCEPTED: 'excepted'>}¶
-
RUN_FN
= 'run_fn'¶
-
execute
() → plumpy.base.state_machine.State[source]¶ Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
class
plumpy.
DefaultObjectLoader
[source]¶ Bases:
plumpy.loaders.ObjectLoader
A default implementation for an object loader. Can load module level classes, functions and constants.
-
_abc_impl
= <_abc_data object>¶
-
-
class
plumpy.
Excepted
(process: Process, exception: Optional[BaseException], trace_back: Optional[traceback] = None)[source]¶ Bases:
plumpy.process_states.State
-
EXC_VALUE
= 'ex_value'¶
-
TRACEBACK
= 'traceback'¶
-
get_exc_info
() → Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]][source]¶ Recreate the exc_info tuple and return it
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
class
plumpy.
Finished
(process: Process, result: Any, successful: bool)[source]¶ Bases:
plumpy.process_states.State
-
class
plumpy.
Future
(*, loop=None)¶ Bases:
object
This class is almost compatible with concurrent.futures.Future.
Differences:
result() and exception() do not take a timeout argument and raise an exception when the future isn’t done yet.
Callbacks registered with add_done_callback() are always called via the event loop’s call_soon_threadsafe().
This class is not compatible with the wait() and as_completed() methods in the concurrent.futures package.
-
_asyncio_future_blocking
¶
-
_callbacks
¶
-
_exception
¶
-
_log_traceback
¶
-
_loop
¶
-
_repr_info
()¶
-
_result
¶
-
_source_traceback
¶
-
_state
¶
-
add_done_callback
()¶ Add a callback to be run when the future becomes done.
The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.
-
cancel
()¶ Cancel the future and schedule callbacks.
If the future is already done or cancelled, return False. Otherwise, change the future’s state to cancelled, schedule the callbacks and return True.
-
cancelled
()¶ Return True if the future was cancelled.
-
done
()¶ Return True if the future is done.
Done means either that a result / exception are available, or that the future was cancelled.
-
exception
()¶ Return the exception that was set on this future.
The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.
-
get_loop
()¶ Return the event loop the Future is bound to.
-
remove_done_callback
(fn, /)¶ Remove all instances of a callback from the “call when done” list.
Returns the number of callbacks removed.
-
result
()¶ Return the result this future represents.
If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.
-
set_exception
(exception, /)¶ Mark the future done and set an exception.
If the future is already done when this method is called, raises InvalidStateError.
-
set_result
(result, /)¶ Mark the future done and set its result.
If the future is already done when this method is called, raises InvalidStateError.
-
class
plumpy.
InMemoryPersister
(loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]¶ Bases:
plumpy.persistence.Persister
Mainly to be used in testing/debugging
-
_abc_impl
= <_abc_data object>¶
-
delete_checkpoint
(pid: Hashable, tag: Optional[str] = None) → None[source]¶ Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
-
delete_process_checkpoints
(pid: Hashable) → None[source]¶ Delete all persisted checkpoints related to the given process id
- Parameters
pid – the process id of the
plumpy.Process
-
get_checkpoints
() → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag
- Returns
list of PersistedCheckpoint
-
get_process_checkpoints
(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag
- Parameters
pid – the process pid
- Returns
list of PersistedCheckpoint tuples
-
load_checkpoint
(pid: Hashable, tag: Optional[str] = None) → plumpy.persistence.Bundle[source]¶ Load a process from a persisted checkpoint by its process id
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
- Returns
a bundle with the process state
- Raises
plumpy.PersistenceError
Raised if there was a problem loading the checkpoint
-
save_checkpoint
(process: Process, tag: Optional[str] = None) → None[source]¶ Persist a Process instance
- Parameters
process –
plumpy.Process
tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process
- Raises
plumpy.PersistenceError
Raised if there was a problem saving the checkpoint
-
-
class
plumpy.
InputPort
(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, default: Any = (), required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]¶ Bases:
plumpy.ports.Port
A simple input port for a value being received by a process
-
property
default
¶
-
property
-
exception
plumpy.
InvalidStateError
[source]¶ Bases:
Exception
Raised when an operation is attempted that requires the process to be in a state that is different from the current state
-
class
plumpy.
Killed
(process: Process, msg: Optional[str])[source]¶ Bases:
plumpy.process_states.State
-
class
plumpy.
LoadSaveContext
(loader: Optional[plumpy.loaders.ObjectLoader] = None, **kwargs: Any)[source]¶ Bases:
object
-
copyextend
(**kwargs: Any) → plumpy.persistence.LoadSaveContext[source]¶ Add additional information to the context by making a copy with the new values
-
-
class
plumpy.
ObjectLoader
[source]¶ Bases:
object
An abstract object loaders. Concrete implementations can be used to identify an object and load it with that identifier.
-
_abc_impl
= <_abc_data object>¶
-
-
class
plumpy.
OutputPort
(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]¶ Bases:
plumpy.ports.Port
-
class
plumpy.
PersistedCheckpoint
(pid, tag)¶ Bases:
tuple
-
_asdict
()¶ Return a new dict which maps field names to their values.
-
_field_defaults
= {}¶
-
_fields
= ('pid', 'tag')¶
-
_fields_defaults
= {}¶
-
classmethod
_make
(iterable)¶ Make a new PersistedCheckpoint object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new PersistedCheckpoint object replacing specified fields with new values
-
pid
¶ Alias for field number 0
-
tag
¶ Alias for field number 1
-
-
exception
plumpy.
PersistenceError
[source]¶ Bases:
Exception
Raised when there is a problem persisting the process
-
class
plumpy.
Persister
[source]¶ Bases:
object
-
_abc_impl
= <_abc_data object>¶
-
abstract
delete_checkpoint
(pid: Hashable, tag: Optional[str] = None) → None[source]¶ Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
-
abstract
delete_process_checkpoints
(pid: Hashable) → None[source]¶ Delete all persisted checkpoints related to the given process id
- Parameters
pid – the process id of the
plumpy.Process
-
abstract
get_checkpoints
() → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag
- Returns
list of PersistedCheckpoint
-
abstract
get_process_checkpoints
(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag
- Parameters
pid – the process pid
- Returns
list of PersistedCheckpoint tuples
-
abstract
load_checkpoint
(pid: Hashable, tag: Optional[str] = None) → plumpy.persistence.Bundle[source]¶ Load a process from a persisted checkpoint by its process id
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
- Returns
a bundle with the process state
- Raises
plumpy.PersistenceError
Raised if there was a problem loading the checkpoint
-
abstract
save_checkpoint
(process: Process, tag: Optional[str] = None) → None[source]¶ Persist a Process instance
- Parameters
process –
plumpy.Process
tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process
- Raises
plumpy.PersistenceError
Raised if there was a problem saving the checkpoint
-
-
class
plumpy.
PicklePersister
(pickle_directory: str)[source]¶ Bases:
plumpy.persistence.Persister
Implementation of the abstract Persister class that stores Process states in pickles on a filesystem.
-
_abc_impl
= <_abc_data object>¶
-
_pickle_filepath
(pid: Hashable, tag: Optional[str] = None) → str[source]¶ Returns the full filepath of the pickle for the given process id and optional checkpoint tag
-
delete_checkpoint
(pid: Hashable, tag: Optional[str] = None) → None[source]¶ Delete a persisted process checkpoint. No error will be raised if the checkpoint does not exist
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
-
delete_process_checkpoints
(pid: Hashable) → None[source]¶ Delete all persisted checkpoints related to the given process id
- Parameters
pid – the process id of the
plumpy.Process
-
static
ensure_pickle_directory
(dirpath: str) → None[source]¶ Will attempt to create the directory at dirpath and raise if it fails, except if the exception arose because the directory already existed
-
get_checkpoints
() → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints with each element containing the process id and optional checkpoint tag
- Returns
list of PersistedCheckpoint
-
get_process_checkpoints
(pid: Hashable) → List[plumpy.persistence.PersistedCheckpoint][source]¶ Return a list of all the current persisted process checkpoints for the specified process with each element containing the process id and optional checkpoint tag
- Parameters
pid – the process pid
- Returns
list of PersistedCheckpoint
-
load_checkpoint
(pid: Hashable, tag: Optional[str] = None) → plumpy.persistence.Bundle[source]¶ Load a process from a persisted checkpoint by its process id
- Parameters
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint for the corresponding process
- Returns
a bundle with the process state
-
static
load_pickle
(filepath: str) → plumpy.persistence.PersistedPickle[source]¶ Load a pickle from disk
- Parameters
filepath – absolute filepath to the pickle
- Returns
the loaded pickle
-
-
class
plumpy.
PlumpyEventLoopPolicy
[source]¶ Bases:
asyncio.unix_events._UnixDefaultEventLoopPolicy
Custom event policy that always returns the same event loop that is made reentrant by
nest_asyncio
.-
_loop
: Optional[asyncio.events.AbstractEventLoop] = None¶
-
-
class
plumpy.
Port
(name: str, valid_type: Optional[Type[Any]] = None, help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None)[source]¶ Bases:
object
Specifications relating to a general input/output value including properties like whether it is required, valid types, the help string, etc.
-
get_description
() → Dict[str, Any][source]¶ Return a description of the Port, which will be a dictionary of its attributes
- Returns
a dictionary of the stringified Port attributes
-
property
help
¶ Get the help string for this port
- Returns
the help string
-
property
name
¶
-
property
required
¶ Is this port required?
- Returns
True if required, False otherwise
-
property
valid_type
¶ Get the valid value type for this port if one is specified
- Returns
the value value type
-
validate
(value: Any, breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]¶ Validate a value to see if it is valid for this port
- Parameters
value – the value to check
breadcrumbs – a tuple of the path to having reached this point in validation
-
-
class
plumpy.
PortNamespace
(name: str = '', help: Optional[str] = None, required: bool = True, validator: Optional[Callable[[Any, Port], Optional[str]]] = None, valid_type: Optional[Type[Any]] = None, default: Any = (), dynamic: bool = False, populate_defaults: bool = True)[source]¶ Bases:
collections.abc.MutableMapping
,plumpy.ports.Port
A container for Ports. Effectively it maintains a dictionary whose members are either a Port or yet another PortNamespace. This allows for the nesting of ports
-
NAMESPACE_SEPARATOR
= '.'¶
-
_abc_impl
= <_abc_data object>¶
-
absorb
(port_namespace: plumpy.ports.PortNamespace, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[Dict[str, Any]] = None) → List[str][source]¶ Absorb another PortNamespace instance into oneself, including all its mutable properties and ports.
Mutable properties of self will be overwritten with those of the port namespace that is to be absorbed. The same goes for the ports, meaning that any ports with a key that already exists in self will be overwritten. The namespace_options dictionary can be used to yet override the mutable properties of the port namespace that is to be absorbed. The exclude and include tuples can be used to exclude or include certain ports and both are mutually exclusive.
- Parameters
port_namespace – instance of PortNamespace that is to be absorbed into self
exclude – input keys to exclude from being exposed
include – input keys to include as exposed inputs
namespace_options – a dictionary with mutable PortNamespace property values to override
- Returns
list of the names of the ports that were absorbed
-
create_port_namespace
(name: str, **kwargs: Any) → plumpy.ports.PortNamespace[source]¶ Create and return a new PortNamespace in this PortNamespace. If the name is namespaced, the sub PortNamespaces will be created recursively, except if one of the namespaces is already occupied at any level by a Port in which case a ValueError will be thrown
- Parameters
name – name (potentially namespaced) of the port to create and return
kwargs – constructor arguments that will be used only for the construction of the terminal PortNamespace
- Returns
PortNamespace
- Raises
ValueError if any sub namespace is occupied by a non-PortNamespace port
-
property
default
¶
-
property
dynamic
¶
-
get_description
() → Dict[str, Dict[str, Any]][source]¶ Return a dictionary with a description of the ports this namespace contains Nested PortNamespaces will be properly recursed and Ports will print their properties in a list
- Returns
a dictionary of descriptions of the Ports contained within this PortNamespace
-
get_port
(name: str, create_dynamically: bool = False) → Union[plumpy.ports.Port, plumpy.ports.PortNamespace][source]¶ Retrieve a (namespaced) port from this PortNamespace. If any of the sub namespaces of the terminal port itself cannot be found, a ValueError will be raised
- Parameters
name – name (potentially namespaced) of the port to retrieve.
create_dynamically – If set to
True
, dynamically create the requested port if it doesn’t exist and the namespace is dynamic, instead of raising aValueError
.
- Returns
Port
- Raises
ValueError if port or namespace does not exist
-
property
populate_defaults
¶
-
property
ports
¶
-
pre_process
(port_values: MutableMapping[str, Any]) → plumpy.utils.AttributesFrozendict[source]¶ Map port values onto the port namespace, filling in values for ports with a default.
- Parameters
port_values – the dictionary with supplied port values
- Returns
an AttributesFrozenDict with pre-processed port value mapping, complemented with port default values
-
project
(port_values: MutableMapping[str, Any]) → MutableMapping[str, Any][source]¶ Project a (nested) dictionary of port values onto the port dictionary of this PortNamespace. That is to say, return those keys of the dictionary that are shared by this PortNamespace. If a matching key corresponds to another PortNamespace, this method will be called recursively, passing the sub dictionary belonging to that port name.
- Parameters
port_values – a dictionary where keys are port names and values are actual input values
-
static
strip_namespace
(namespace: str, separator: str, rules: Optional[Sequence[str]] = None) → Optional[List[str]][source]¶ Filter given exclude/include rules staring with namespace and strip the first level.
For example if the namespace is base and the rules are:
('base.a', 'base.sub.b','relax.base.c', 'd')
the function will return:
('a', 'sub.c')
If the rules are None, that is what is returned as well.
- Parameters
namespace – the string name of the namespace
separator – the namespace separator string
rules – the list or tuple of exclude or include rules to strip
- Returns
None if rules=None or the list of stripped rules
-
property
valid_type
¶ Get the valid value type for this port if one is specified
- Returns
the value value type
-
validate
(port_values: Optional[Mapping[str, Any]] = None, breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]¶ Validate the namespace port itself and subsequently all the port_values it contains
- Parameters
port_values – an arbitrarily nested dictionary of parsed port values
breadcrumbs – a tuple of the path to having reached this point in validation
- Returns
None or tuple containing 0: error string 1: tuple of breadcrumb strings to where the validation failed
-
validate_dynamic_ports
(port_values: MutableMapping[str, Any], breadcrumbs: Sequence[str] = ()) → Optional[plumpy.ports.PortValidationError][source]¶ Validate port values with respect to the dynamic properties of the port namespace. It will check if the namespace is actually dynamic and if all values adhere to the valid types of the namespace if those are specified
- Parameters
- Returns
if invalid returns a string with the reason for the validation failure, otherwise None
- Return type
typing.Optional[str]
-
validate_ports
(port_values: MutableMapping[str, Any], breadcrumbs: Sequence[str]) → Optional[plumpy.ports.PortValidationError][source]¶ Validate port values with respect to the explicitly defined ports of the port namespace. Ports values that are matched to an actual Port will be popped from the dictionary
- Parameters
port_values – an arbitrarily nested dictionary of parsed port values
breadcrumbs – a tuple of breadcrumbs showing the path to to the value being validated
- Returns
None or tuple containing 0: error string 1: tuple of breadcrumb strings to where the validation failed
-
-
exception
plumpy.
PortValidationError
(message: str, port: str)[source]¶ Bases:
Exception
Error when validation fails on a port
-
property
message
¶ Get the validation error message
- Returns
the error message
-
property
port
¶ Get the port breadcrumbs
- Returns
the port where the error occurred
-
property
-
class
plumpy.
Process
(*args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.base.state_machine.StateMachine
,plumpy.persistence.Savable
The Process class is the base for any unit of work in plumpy.
A process can be in one of the following states:
CREATED
RUNNING
WAITING
FINISHED
EXCEPTED
KILLED
as defined in the
ProcessState
enum.___ | v CREATED (x) --- RUNNING (x) --- FINISHED (o) | ^ / v | / WAITING (x) -- | ^ --- * -- EXCEPTED (o) * -- KILLED (o)
(o): terminal state
(x): non terminal state
When a Process enters a state is always gets a corresponding message, e.g. on entering RUNNING it will receive the on_run message. These are always called immediately after that state is entered but before being executed.
-
_abc_impl
= <_abc_data object>¶
-
_auto_persist
: Optional[Set[str]] = {'_creation_time', '_event_helper', '_future', '_paused', '_pid', '_pre_paused_status', '_status'}¶
-
_closed
= False¶
-
_create_interrupt_action
(exception: plumpy.process_states.Interruption) → plumpy.futures.CancellableAction[source]¶ Create an interrupt action from the corresponding interrupt exception
- Parameters
exception – The interrupt exception
- Returns
The interrupt action
-
_do_pause
(state_msg: Optional[str], next_state: Optional[plumpy.process_states.State] = None) → bool[source]¶ Carry out the pause procedure, optionally transitioning to the next state first
-
_interrupt_action
: Optional[plumpy.futures.CancellableAction] = None¶
-
_killing
: Optional[plumpy.futures.CancellableAction] = None¶
-
_paused
: Optional[plumpy.persistence.SavableFuture] = None¶
-
_pausing
: Optional[plumpy.futures.CancellableAction] = None¶
-
_process_scope
() → Generator[None, None, None][source]¶ This context manager function is used to make sure the process stack is correct meaning that globally someone can ask for Process.current() to get the last process that is on the call stack.
-
async
_run_task
(callback: Callable[[…], Any], *args: Any, **kwargs: Any) → Any[source]¶ This method should be used to run all Process related functions and coroutines. If there is an exception the process will enter the EXCEPTED state.
- Parameters
callback – A function or coroutine
args – Optional positional arguments passed to fn
kwargs – Optional keyword arguments passed to fn
- Returns
The value as returned by fn
-
_schedule_rpc
(callback: Callable[[…], Any], *args: Any, **kwargs: Any) → concurrent.futures._base.Future[source]¶ Schedule a call to a callback as a result of an RPC communication call, this will return a future that resolves to the final result (even after one or more layer of futures being returned) of the callback.
The callback will be scheduled at the working thread where the process event loop runs.
- Parameters
callback – the callback function or coroutine
args – the positional arguments to the callback
kwargs – the keyword arguments to the callback
- Returns
a kiwi future that resolves to the outcome of the callback
-
_set_interrupt_action
(new_action: Optional[plumpy.futures.CancellableAction]) → None[source]¶ Set the interrupt action cancelling the current one if it exists :param new_action: The new interrupt action to set
-
_set_interrupt_action_from_exception
(interrupt_exception: plumpy.process_states.Interruption) → None[source]¶ Set an interrupt action from the corresponding interrupt exception
-
_setup_event_hooks
() → None[source]¶ Set the event hooks to process, when it is created or loaded(recreated).
-
_spec_class
¶ alias of
plumpy.process_spec.ProcessSpec
-
_stepping
= False¶
-
add_cleanup
(cleanup: Callable[], None]) → None[source]¶ Add callback, which will be run when the process is being closed.
-
add_process_listener
(listener: plumpy.process_listener.ProcessListener) → None[source]¶ Add a process listener to the process.
The listener defines the actions to take when the process is triggering the specific state condition.
-
broadcast_receive
(_comm: kiwipy.communications.Communicator, body: Any, sender: Any, subject: Any, correlation_id: Any) → Optional[concurrent.futures._base.Future][source]¶ Coroutine called when the process receives a message from the communicator
- Parameters
_comm – the communicator that sent the message
msg – the message
-
call_soon
(callback: Callable[[…], Any], *args: Any, **kwargs: Any) → plumpy.events.ProcessCallback[source]¶ Schedule a callback to what is considered an internal process function (this needn’t be a method). If it raises an exception it will cause the process to fail.
-
callback_excepted
(_callback: Callable[[…], Any], exception: Optional[BaseException], trace: Optional[traceback]) → None[source]¶
-
close
() → None[source]¶ Calling this method indicates that this process should not ran anymore and will trigger any runtime resources (such as the communicator connection) to be cleaned up. The state of the process will still be accessible.
It is safe to call this method multiple times.
-
create_initial_state
() → plumpy.process_states.State[source]¶ This method is here to override its superclass.
Automatically enter the CREATED state when the process is created.
- Returns
A Created state
-
property
creation_time
¶ The creation time of this Process as returned by time.time() when instantiated :return: The creation time
-
classmethod
current
() → Optional[plumpy.processes.Process][source]¶ Get the currently running process i.e. the one at the top of the stack
- Returns
the currently running process
-
decode_input_args
(encoded: Any) → Any[source]¶ Decode saved input arguments as they came from the saved instance state
plumpy.persistence.Bundle
. The decoded inputs should contain no reference to the encoded inputs that were passed in. This often will mean making a deepcopy of the encoded input dictionary.- Parameters
encoded –
- Returns
The decoded input args
-
classmethod
define
(_spec: plumpy.process_spec.ProcessSpec) → None[source]¶ Define the specification of the process.
Normally should be overridden by subclasses.
-
done
() → bool[source]¶ Return True if the call was successfully killed or finished running.
Deprecated since version 0.18.6: Use the has_terminated method instead
-
encode_input_args
(inputs: Any) → Any[source]¶ Encode input arguments such that they may be saved in a
plumpy.persistence.Bundle
. The encoded inputs should contain no reference to the inputs that were passed in. This often will mean making a deepcopy of the input dictionary.- Parameters
inputs – A mapping of the inputs as passed to the process
- Returns
The encoded inputs
-
exception
() → Optional[BaseException][source]¶ Return exception, if the process is terminated in excepted state.
-
execute
() → Optional[Dict[str, Any]][source]¶ Execute the process. This will return if the process terminates or is paused.
- Returns
None if not terminated, otherwise self.outputs
-
fail
(exception: Optional[BaseException], trace_back: Optional[traceback]) → None[source]¶ Fail the process in response to an exception :param exception: The exception that caused the failure :param trace_back: Optional exception traceback
-
future
() → plumpy.persistence.SavableFuture[source]¶ Return a savable future representing an eventual result of an asynchronous operation.
The result is set at the terminal state.
-
classmethod
get_description
() → Dict[str, Any][source]¶ Get a human readable description of what this
Process
does.- Returns
The description.
-
classmethod
get_states
() → Sequence[Type[plumpy.process_states.State]][source]¶ Return all allowed states of the process.
-
get_status_info
(out_status_info: dict) → None[source]¶ Return updated status information of process.
- Parameters
out_status_info – the old status
-
property
inputs
¶ Return the parsed inputs.
-
property
is_excepted
¶ Return whether the process excepted.
- Returns
boolean, True if the process is in
EXCEPTED
state.
-
property
is_killing
¶ Return if the process is already being killed.
-
property
is_successful
¶ Return whether the result of the process is considered successful.
- Returns
boolean, True if the process is in Finished state with successful attribute set to True
-
kill
(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]¶ Kill the process :param msg: An optional kill message
-
launch
(process_class: Type[Process], inputs: Optional[dict] = None, pid: Optional[Hashable] = None, logger: Optional[logging.Logger] = None) → plumpy.processes.Process[source]¶ Start running the nested process.
The process is started asynchronously, without blocking other task in the event loop.
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶ Load the process from its saved instance state.
- Parameters
saved_state – A bundle to load the state from
load_context – The load context
-
property
logger
¶ Return the logger for this class.
If not set, return the default logger.
- Returns
The logger.
-
property
loop
¶ Return the event loop of the process.
-
message_receive
(_comm: kiwipy.communications.Communicator, msg: Dict[str, Any]) → Any[source]¶ Coroutine called when the process receives a message from the communicator
- Parameters
_comm – the communicator that sent the message
msg – the message
- Returns
the outcome of processing the message, the return value will be sent back as a response to the sender
-
out
(output_port: str, value: Any) → None[source]¶ Record an output value for a specific output port. If the output port matches an explicitly defined Port it will be validated against that. If not it will be validated against the PortNamespace, which means it will be checked for dynamicity and whether the type of the value is valid
- Parameters
output_port – the name of the output port, can be namespaced
value – the value for the output port
- Raises
ValueError if the output value is not validated against the port
-
property
outputs
¶ Get the current outputs emitted by the Process. These may grow over time as the process runs.
- Returns
A mapping of {output_port: value} outputs
-
pause
(msg: Optional[str] = None) → Union[bool, plumpy.futures.CancellableAction][source]¶ Pause the process.
- Parameters
msg – an optional message to set as the status. The current status will be saved in the private _pre_paused_status attribute, such that it can be restored when the process is played again.
- Returns
False if process is already terminated, True if already paused or pausing, a CancellableAction to pause if the process was running steps
-
property
paused
¶ Return whether the process was being paused.
-
property
pid
¶ Return the pid of the process.
-
play
() → bool[source]¶ Play a process. Returns True if after this call the process is playing, False otherwise
- Returns
True if playing, False otherwise
-
property
raw_inputs
¶ The AttributesFrozendict of inputs (if not None).
-
classmethod
recreate_from
(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None) → plumpy.processes.Process[source]¶ Recreate a process from a saved state, passing any positional and keyword arguments on to load_instance_state
- Parameters
saved_state – The saved state to load from
load_context – The load context to use
- Returns
An instance of the object with its state loaded from the save state.
-
recreate_state
(saved_state: plumpy.persistence.Bundle) → plumpy.process_states.State[source]¶ Create a state object from a saved state
- Parameters
saved_state – The saved state
- Returns
An instance of the object with its state loaded from the save state.
-
remove_process_listener
(listener: plumpy.process_listener.ProcessListener) → None[source]¶ Remove a process listener from the process.
-
result
() → Any[source]¶ Get the result from the process if it is finished. If the process was killed then a KilledError will be raise. If the process has excepted then the failing exception will be raised. If in any other state this will raise an InvalidStateError. :return: The result of the process
-
run
() → Any[source]¶ This function will be run when the process is triggered. It should be overridden by a subclass.
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) → None[source]¶ Ask the process to save its current instance state.
- Parameters
out_state – A bundle to save the state to
save_context – The save context
-
set_logger
(logger: logging.Logger) → None[source]¶ Set the logger of the process.
-
classmethod
spec
() → plumpy.process_spec.ProcessSpec[source]¶
-
property
status
¶ Return the status massage of the process.
-
step
() → None[source]¶ Run a step.
The step is run synchronously with steps in its own process, and asynchronously with steps in other processes.
The execute function running in this method is dependent on the state of the process.
-
async
step_until_terminated
() → None[source]¶ If the process has not terminated, run the current step and wait until the step finished.
This is the function run by the event loop (not
step
).
-
successful
() → bool[source]¶ Returns whether the result of the process is considered successful Will raise if the process is not in the FINISHED state
-
transition_failed
(initial_state: Hashable, final_state: Hashable, exception: Exception, trace: traceback) → None[source]¶ Called when a state transitions fails.
This method can be overwritten to change the default behaviour which is to raise the exception.
- Parameters
exception – The transition failed exception.
-
property
uuid
¶ Return the UUID of the process
-
class
plumpy.
ProcessLauncher
(loop: Optional[asyncio.events.AbstractEventLoop] = None, persister: Optional[plumpy.persistence.Persister] = None, load_context: Optional[plumpy.persistence.LoadSaveContext] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None)[source]¶ Bases:
object
Takes incoming task messages and uses them to launch processes.
Expected format of task:
For launch:
{ 'task': <LAUNCH_TASK> 'process_class': <Process class to launch> 'args': <tuple of positional args for process constructor> 'kwargs': <dict of keyword args for process constructor>. 'nowait': True or False }
For continue:
{ 'task': <CONTINUE_TASK> 'pid': <Process ID> 'nowait': True or False }
-
async
_continue
(_communicator: kiwipy.communications.Communicator, pid: Hashable, nowait: bool, tag: Optional[str] = None) → Union[Hashable, Any][source]¶ Continue the process
- Parameters
_communicator – the communicator
pid – the pid of the process to continue
nowait – if True don’t wait for the process to complete
tag – the checkpoint tag to continue from
-
async
_create
(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Hashable[source]¶ Create the process
- Parameters
_communicator – the communicator
process_class – the process class to create
persist – should the process be persisted
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns
the pid of the created process
-
async
_launch
(_communicator: kiwipy.communications.Communicator, process_class: str, persist: bool, nowait: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None) → Union[Hashable, Any][source]¶ Launch the process
- Parameters
_communicator – the communicator
process_class – the process class to launch
persist – should the process be persisted
nowait – if True only return when the process finishes
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
- Returns
the pid of the created process or the outputs (if nowait=False)
-
async
-
class
plumpy.
ProcessListener
[source]¶ Bases:
plumpy.persistence.Savable
-
_abc_impl
= <_abc_data object>¶
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext]) → None[source]¶
-
on_output_emitted
(process: Process, output_port: str, value: Any, dynamic: bool) → None[source]¶ Called when the process has emitted an output value
- Parameters
process – The process
output_port – The output port that the value was outputted on
value – The value that was outputted
dynamic – True if the port is dynamic, False otherwise
-
on_process_created
(process: Process) → None[source]¶ Called when the process has been started
- Parameters
process – The process
-
on_process_excepted
(process: Process, reason: str) → None[source]¶ Called when the process has excepted
- Parameters
process – The process
reason – A string of the exception message
-
on_process_finished
(process: Process, outputs: Any) → None[source]¶ Called when the process has finished successfully
- Parameters
process – The process
outputs – The process outputs
-
on_process_killed
(process: Process, msg: str) → None[source]¶ Called when the process was killed
- Parameters
process – The process
-
on_process_paused
(process: Process) → None[source]¶ Called when the process is about to re-enter the RUNNING state
- Parameters
process – The process
-
on_process_played
(process: Process) → None[source]¶ Called when the process is about to re-enter the RUNNING state
- Parameters
process – The process
-
-
class
plumpy.
ProcessSpec
[source]¶ Bases:
object
A class that defines the specifications of a
plumpy.Process
, this includes what its inputs, outputs, etc are.All methods to modify the spec should have declarative names describe the spec e.g.: input, output
Every Process class has one of these.
-
INPUT_PORT_TYPE
¶ alias of
plumpy.ports.InputPort
-
OUTPUT_PORT_TYPE
¶ alias of
plumpy.ports.OutputPort
-
PORT_NAMESPACE_TYPE
¶ alias of
plumpy.ports.PortNamespace
-
_create_port
(port_namespace: plumpy.ports.PortNamespace, port_class: Type[Union[plumpy.ports.Port, plumpy.ports.PortNamespace]], name: str, **kwargs: Any) → None[source]¶ Create a new Port of a given class and name in a given PortNamespace
- Parameters
port_namespace – PortNamespace to which to add the port
port_class – class of the Port to create
name – name of the port to create
kwargs – options for the port
-
static
_expose_ports
(process_class: Type[Process], source: plumpy.ports.PortNamespace, destination: plumpy.ports.PortNamespace, expose_memory: Dict[Optional[str], Dict[Type[Process], Sequence[str]]], namespace: Optional[str], exclude: Optional[Sequence[str]], include: Optional[Sequence[str]], namespace_options: Optional[dict] = None) → None[source]¶ Expose ports from a source PortNamespace of the ProcessSpec of a Process class into the destination PortNamespace of this ProcessSpec. If the namespace is specified, the ports will be exposed in that sub namespace. The set of ports can be restricted using the mutually exclusive exclude and include tuples. The namespace_options will be used to override the properties of the PortNamespace into which the ports are exposed, whether that has been newly created or existed already.
- Parameters
process_class – the Process class whose outputs to expose
source – the PortNamespace whose ports are to be exposed
destination – the PortNamespace into which the ports are to be exposed
namespace – a namespace in which to place PortNamespace with the exposed outputs
exclude – input ports that are to be excluded
include – input ports that are to be included
namespace_options – a dictionary with mutable PortNamespace property values to override
-
expose_inputs
(process_class: Type[Process], namespace: Optional[str] = None, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[dict] = None) → None[source]¶ This method allows one to automatically add the inputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed inputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.
- Parameters
process_class – the Process class whose inputs to expose
namespace – a namespace in which to place the exposed inputs
exclude – input ports that are to be excluded
include – input ports that are to be included
namespace_options – a dictionary with mutable PortNamespace property values to override
-
expose_outputs
(process_class: Type[Process], namespace: Optional[str] = None, exclude: Optional[Sequence[str]] = None, include: Optional[Sequence[str]] = None, namespace_options: Optional[dict] = None) → None[source]¶ This method allows one to automatically add the ouputs from another Process to this ProcessSpec. The optional namespace argument can be used to group the exposed outputs in a separated PortNamespace. The exclude and include arguments can be used to restrict the set of ports that are exposed. Note that these two options are mutually exclusive.
- Parameters
process_class – the Process class whose outputs to expose
namespace – a namespace in which to place the exposed outputs
exclude – input ports that are to be excluded
include – input ports that are to be included
namespace_options – a dictionary with mutable PortNamespace property values to override
-
get_description
() → Dict[str, Any][source]¶ Get a description of this process specification
- Returns
a dictionary with the descriptions of the input and output port namespaces
-
has_input
(name: str) → bool[source]¶ Return whether the input port namespace contains a port with the given name
- Parameters
name – key of the port in the input port namespace
-
has_output
(name: str) → bool[source]¶ Return whether the output port namespace contains a port with the given name
- Parameters
name – key of the port in the output port namespace
-
input
(name: str, **kwargs: Any) → None[source]¶ Define an input port in the input port namespace
- Parameters
name – name of the input port to create
kwargs – options for the input port
-
input_namespace
(name: str, **kwargs: Any) → None[source]¶ Create a new PortNamespace in the input port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults
- Parameters
name – namespace of the new port namespace
kwargs – keyword arguments for the PortNamespace constructor
-
property
inputs
¶ Get the input port namespace of the process specification
- Returns
the input PortNamespace
-
property
logger
¶
-
property
namespace_separator
¶
-
output
(name: str, **kwargs: Any) → None[source]¶ Define an output port in the output port namespace
- Parameters
name – name of the output port to create
kwargs – options for the output port
-
output_namespace
(name: str, **kwargs: Any) → None[source]¶ Create a new PortNamespace in the output port namespace. The keyword arguments will be passed to the PortNamespace constructor. Any intermediate port namespaces that need to be created for a nested namespace, will take constructor defaults
- Parameters
name – namespace of the new port namespace
kwargs – keyword arguments for the PortNamespace constructor
-
property
outputs
¶ Get the output port namespace of the process specification
- Returns
the outputs PortNamespace
-
property
ports
¶
-
-
class
plumpy.
ProcessState
(value)[source]¶ Bases:
enum.Enum
The possible states that a
Process
can be in.
-
exception
plumpy.
RemoteException
[source]¶ Bases:
Exception
An exception occurred at the remote end of the call
-
class
plumpy.
RemoteProcessController
(communicator: kiwipy.communications.Communicator)[source]¶ Bases:
object
Control remote processes using coroutines that will send messages and wait (in a non-blocking way) for their response
-
async
continue_process
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Optional[Any][source]¶ Continue the process
- Parameters
_communicator – the communicator
pid – the pid of the process to continue
tag – the checkpoint tag to continue from
-
async
execute_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]¶ Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of executing the process
-
async
get_status
(pid: Hashable) → Any[source]¶ Get the status of a process with the given PID :param pid: the process id :return: the status response from the process
-
async
kill_process
(pid: Hashable, msg: Optional[Any] = None) → Any[source]¶ Kill the process
- Parameters
pid – the pid of the process to kill
msg – optional kill message
- Returns
True if killed, False otherwise
-
async
launch_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Any[source]¶ Launch a process given the class and constructor arguments
- Parameters
process_class – the class of the process to launch
init_args – the constructor positional arguments
init_kwargs – the constructor keyword arguments
persist – should the process be persisted
loader – the classloader to use
nowait – if True, don’t wait for the process to send a response, just return the pid
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of launching the process
-
async
-
class
plumpy.
RemoteProcessThreadController
(communicator: kiwipy.communications.Communicator)[source]¶ Bases:
object
A class that can be used to control and launch remote processes
-
continue_process
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶
-
execute_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶ Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of executing the process
-
get_status
(pid: Hashable) → concurrent.futures._base.Future[source]¶ Get the status of a process with the given PID.
- Parameters
pid – the process id
- Returns
the status response from the process
-
kill_all
(msg: Optional[Any]) → None[source]¶ Kill all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
kill_process
(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]¶ Kill the process
- Parameters
pid – the pid of the process to kill
msg – optional kill message
- Returns
a response future from the process to be killed
-
launch_process
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False) → Union[None, Hashable, Any][source]¶ Launch the process
- Parameters
process_class – the process class to launch
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
persist – should the process be persisted
loader – the class loader to use
nowait – if True only return when the process finishes
no_reply – don’t send a reply to the sender
- Returns
the pid of the created process or the outputs (if nowait=False)
-
pause_all
(msg: Any) → None[source]¶ Pause all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
pause_process
(pid: Hashable, msg: Optional[Any] = None) → concurrent.futures._base.Future[source]¶ Pause the process
- Parameters
pid – the pid of the process to pause
msg – optional pause message
- Returns
a response future from the process to be paused
-
-
class
plumpy.
Running
(process: Process, run_fn: Callable[[…], Any], *args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.process_states.State
-
ALLOWED
: Set[Union[None, enum.Enum, str]] = {<ProcessState.EXCEPTED: 'excepted'>, <ProcessState.RUNNING: 'running'>, <ProcessState.KILLED: 'killed'>, <ProcessState.FINISHED: 'finished'>, <ProcessState.WAITING: 'waiting'>}¶
-
COMMAND
= 'command'¶
-
RUN_FN
= 'run_fn'¶
-
_action_command
(command: Union[plumpy.process_states.Kill, plumpy.process_states.Stop, plumpy.process_states.Wait, plumpy.process_states.Continue]) → plumpy.process_states.State[source]¶
-
_command
: Union[None, plumpy.process_states.Kill, plumpy.process_states.Stop, plumpy.process_states.Wait, plumpy.process_states.Continue] = None¶
-
_run_handle
= None¶
-
async
execute
() → plumpy.process_states.State[source]¶ Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
class
plumpy.
Savable
[source]¶ Bases:
object
-
_get_value
(saved_state: MutableMapping[str, Any], name: str, load_context: Optional[plumpy.persistence.LoadSaveContext]) → Union[method, plumpy.persistence.Savable][source]¶
-
_persist_configured
= False¶
-
static
_set_meta_type
(out_state: MutableMapping[str, Any], name: str, type_spec: Any) → None[source]¶
-
static
load
(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None) → plumpy.persistence.Savable[source]¶ Load a Savable from a saved instance state. The load context is a way of passing runtime data to the object being loaded.
- Parameters
saved_state – The saved state
load_context – Additional runtime state that can be passed into when loading. The type and content (if any) is completely user defined
- Returns
The loaded Savable instance
-
load_members
(members: Iterable[str], saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None) → None[source]¶
-
classmethod
recreate_from
(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None) → plumpy.persistence.Savable[source]¶ Recreate a
Savable
from a saved state using an optional load context.- Parameters
saved_state – The saved state
load_context – An optional load context
- Returns
The recreated instance
-
save
(save_context: Optional[plumpy.persistence.LoadSaveContext] = None) → MutableMapping[str, Any][source]¶
-
-
class
plumpy.
SavableFuture
(*, loop=None)[source]¶ Bases:
_asyncio.Future
,plumpy.persistence.Savable
A savable future.
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
classmethod
recreate_from
(saved_state: MutableMapping[str, Any], load_context: Optional[plumpy.persistence.LoadSaveContext] = None) → plumpy.persistence.Savable[source]¶ Recreate a
Savable
from a saved state using an optional load context.- Parameters
saved_state – The saved state
load_context – An optional load context
- Returns
The recreated instance
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
plumpy.
ToContext
¶ alias of
builtins.dict
-
exception
plumpy.
TransitionFailed
(initial_state: plumpy.base.state_machine.State, final_state: Optional[State] = None, traceback_str: Optional[str] = None)[source]¶ Bases:
Exception
A state transition failed
-
class
plumpy.
UnsuccessfulResult
(result: Optional[int] = None)[source]¶ Bases:
object
The result of the process was unsuccessful
-
class
plumpy.
Wait
(continue_fn: Optional[Callable[[…], Any]] = None, msg: Optional[Any] = None, data: Optional[Any] = None)[source]¶ Bases:
plumpy.process_states.Command
-
class
plumpy.
Waiting
(process: Process, done_callback: Optional[Callable[[…], Any]], msg: Optional[str] = None, data: Optional[Any] = None)[source]¶ Bases:
plumpy.process_states.State
-
ALLOWED
: Set[Union[None, enum.Enum, str]] = {<ProcessState.EXCEPTED: 'excepted'>, <ProcessState.RUNNING: 'running'>, <ProcessState.KILLED: 'killed'>, <ProcessState.FINISHED: 'finished'>, <ProcessState.WAITING: 'waiting'>}¶
-
DONE_CALLBACK
= 'DONE_CALLBACK'¶
-
_interruption
= None¶
-
async
execute
() → plumpy.process_states.State[source]¶ Execute the state, performing the actions that this state is responsible for. :returns: a state to transition to or None if finished.
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: plumpy.persistence.LoadSaveContext) → None[source]¶
-
-
class
plumpy.
WorkChain
(*args: Any, **kwargs: Any)[source]¶ Bases:
plumpy.mixins.ContextMixin
,plumpy.processes.Process
A WorkChain is a series of instructions carried out with the ability to save state in between.
-
_CONTEXT
= 'CONTEXT'¶
-
_STEPPER_STATE
= 'stepper_state'¶
-
_abc_impl
= <_abc_data object>¶
-
_spec_class
¶ alias of
WorkChainSpec
-
load_instance_state
(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) → None[source]¶ Load the process from its saved instance state.
- Parameters
saved_state – A bundle to load the state from
load_context – The load context
-
run
() → Any[source]¶ This function will be run when the process is triggered. It should be overridden by a subclass.
-
save_instance_state
(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) → None[source]¶ Add the instance state to
out_state
. .. important:The instance state will contain a pointer to the ``ctx``, and so should be deep copied or serialised before persisting.
-
classmethod
spec
() → plumpy.workchains.WorkChainSpec[source]¶
-
to_context
(**kwargs: Union[_asyncio.Future, plumpy.processes.Process]) → None[source]¶ This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the workchain
-
-
class
plumpy.
WorkChainSpec
[source]¶
-
plumpy.
chain
(source, target)[source]¶ Chain two futures together so that when one completes, so does the other.
The result (success or failure) of
a
will be copied tob
, unlessb
has already been completed or cancelled by the timea
finishes.
-
plumpy.
copy_future
(source, target)[source]¶ Copy the status of future a to b unless b is already done in which case return
- Parameters
source (
kiwipy.Future
) – The source futuretarget (
kiwipy.Future
) – The target future
-
plumpy.
create_continue_body
(pid: Hashable, tag: Optional[str] = None, nowait: bool = False) → Dict[str, Any][source]¶ Create a message body to continue an existing process :param pid: the pid of the existing process :param tag: the optional persistence tag :param nowait: wait for the process to finish before completing the task, otherwise just return the PID :return: a dictionary with the body of the message to continue the process
-
plumpy.
create_launch_body
(process_class: str, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, persist: bool = False, loader: Optional[plumpy.loaders.ObjectLoader] = None, nowait: bool = True) → Dict[str, Any][source]¶ Create a message body for the launch action
- Parameters
process_class – the class of the process to launch
init_args – any initialisation positional arguments
init_kwargs – any initialisation keyword arguments
persist – persist this process if True, otherwise don’t
loader – the loader to use to load the persisted process
nowait – wait for the process to finish before completing the task, otherwise just return the PID
- Returns
a dictionary with the body of the message to launch the process
- Return type
-
plumpy.
create_task
(coro: Callable[], Coroutine], loop: Optional[asyncio.events.AbstractEventLoop] = None) → _asyncio.Future[source]¶ Schedule a call to a coro in the event loop and wrap the outcome in a future.
- Parameters
coro – a function which creates the coroutine to schedule
loop – the event loop to schedule it in
- Returns
the future representing the outcome of the coroutine
-
plumpy.
gather
(*coros_or_futures, loop=None, return_exceptions=False)[source]¶ Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as passed in.
All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. If any child is cancelled, this is treated as if it raised CancelledError – the outer Future is not cancelled in this case. (This is to prevent the cancellation of one child to cause other children to be cancelled.)
If return_exceptions is False, cancelling gather() after it has been marked done won’t cancel any submitted awaitables. For instance, gather can be marked done after propagating an exception to the caller, therefore, calling
gather.cancel()
after catching an exception (raised by one of the awaitables) from gather won’t cancel any other awaitables.
-
plumpy.
get_event_loop
()¶ Return an asyncio event loop.
When called from a coroutine or a callback (e.g. scheduled with call_soon or similar API), this function will always return the running event loop.
If there is no running event loop set, the function will return the result of get_event_loop_policy().get_event_loop() call.
-
plumpy.
get_object_loader
() → plumpy.loaders.ObjectLoader[source]¶ Get the plumpy global class loader
- Returns
A class loader
- Return type
-
plumpy.
if_
(condition: Callable[[WorkChain], bool]) → plumpy.workchains._If[source]¶ A conditional that can be used in a workchain outline.
Use as:
if_(cls.conditional)( cls.step1, cls.step2 )
Each step can, of course, also be any valid workchain step e.g. conditional.
- Parameters
condition – The workchain method that will return True or False
-
plumpy.
plum_to_kiwi_future
(plum_future: _asyncio.Future) → concurrent.futures._base.Future[source]¶ Return a kiwi future that resolves to the outcome of the plum future
- Parameters
plum_future – the plum future
- Returns
the kiwipy future
-
plumpy.
run_until_complete
(future: _asyncio.Future, loop: Optional[asyncio.events.AbstractEventLoop] = None) → Any[source]¶
-
plumpy.
set_event_loop_policy
() → None[source]¶ Enable plumpy’s event loop policy that will make event loop’s reentrant.
-
plumpy.
set_object_loader
(loader: Optional[plumpy.loaders.ObjectLoader]) → None[source]¶ Set the plumpy global object loader
- Parameters
loader (
ObjectLoader
) – An object loader- Returns
-
plumpy.
while_
(condition: Callable[[WorkChain], bool]) → plumpy.workchains._While[source]¶ A while loop that can be used in a workchain outline.
Use as:
while_(cls.conditional)( cls.step1, cls.step2 )
Each step can, of course, also be any valid workchain step e.g. conditional.
- Parameters
condition – The workchain method that will return True or False
-
plumpy.
wrap_communicator
(communicator: kiwipy.communications.Communicator, loop: Optional[asyncio.events.AbstractEventLoop] = None) → plumpy.communications.LoopCommunicator[source]¶ Wrap a communicator such that all callbacks made to any subscribers are scheduled on the given event loop.
If the communicator is already an equivalent communicator wrapper then it will not be wrapped again.
- Parameters
communicator – the communicator to wrap
loop – the event loop to schedule callbacks on
- Returns
a communicator wrapper
Subpackages¶
Submodules¶
- plumpy.communications module
- plumpy.event_helper module
- plumpy.events module
- plumpy.exceptions module
- plumpy.futures module
- plumpy.lang module
- plumpy.loaders module
- plumpy.mixins module
- plumpy.persistence module
- plumpy.ports module
- plumpy.process_comms module
- plumpy.process_listener module
- plumpy.process_spec module
- plumpy.process_states module
- plumpy.processes module
- plumpy.settings module
- plumpy.utils module
- plumpy.workchains module