Tutorial¶
This Jupyter Notebook provides a tutorial for the features of plumpy.
To use, create an environment with:
$ conda create -n plumpy-tutorial plumpy>=0.18 jupyterlab
$ conda activate plumpy-tutorial
and open jupyterlab in the notebook folder:
$ jupyter lab
import asyncio
from pprint import pprint
import time
import kiwipy
import plumpy
# this is required because jupyter is already running an event loop
plumpy.set_event_loop_policy()
Introduction¶
Plumpy is a library used to create and control long-running workflows.
The library consists of a number of key components, that we will shall cover:
- The
Process
To run a user defined action, with well defined inputs and outputs.
- The
WorkChain
A subclass of
Process
that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.- The process
Controller
(principally theRemoteProcessThreadController
) To control the process or workchain throughout its lifetime.
Processes¶
The Process
is the minimal component of the plumpy workflow.
It utilises the concept of a finite state machine to run a user defined action, transitioning between a set of states:
The simplest process, implements a run
method and can be executed with the execute
method:
class SimpleProcess(plumpy.Process):
def run(self):
print(self.state.name)
process = SimpleProcess()
print(process.state.name)
process.execute()
print(process.state.name)
print("Success", process.is_successful)
print("Result", process.result())
CREATED
RUNNING
FINISHED
Success True
Result None
Defining inputs and outputs¶
To integrate the process into a more complex workflow, you can define an input and output ProcessSpec
.
Important
define
is a class method and should always call the super
method.
The process specification creates Port
, which can have help strings, defaults and validators, and can be nested in PortNamespace
.
class SpecProcess(plumpy.Process):
@classmethod
def define(cls, spec: plumpy.ProcessSpec):
super().define(spec)
spec.input('input1', valid_type=str, help='A help string')
spec.output('output1')
spec.input_namespace('input2')
spec.input('input2.input2a')
spec.input('input2.input2b', default='default')
spec.output_namespace('output2')
spec.output('output2.output2a')
spec.output('output2.output2b')
def run(self):
self.out('output1', self.inputs.input1)
self.out('output2.output2a', self.inputs.input2.input2a)
self.out('output2.output2b', self.inputs.input2.input2b)
pprint(SpecProcess.spec().get_description())
process = SpecProcess(inputs={
'input1': 'my input',
'input2': {'input2a': 'other input'}
})
process.execute()
process.outputs
{'inputs': {'_attrs': {'default': (),
'dynamic': False,
'help': None,
'required': 'True',
'valid_type': 'None'},
'input1': {'help': 'A help string',
'name': 'input1',
'required': 'True',
'valid_type': "<class 'str'>"},
'input2': {'_attrs': {'default': (),
'dynamic': False,
'help': None,
'required': 'True',
'valid_type': 'None'},
'input2a': {'name': 'input2a', 'required': 'True'},
'input2b': {'default': 'default',
'name': 'input2b',
'required': 'False'}}},
'outputs': {'_attrs': {'default': (),
'dynamic': False,
'help': None,
'required': 'True',
'valid_type': 'None'},
'output1': {'name': 'output1', 'required': 'True'},
'output2': {'_attrs': {'default': (),
'dynamic': False,
'help': None,
'required': 'True',
'valid_type': 'None'},
'output2a': {'name': 'output2a', 'required': 'True'},
'output2b': {'name': 'output2b', 'required': 'True'}}}}
{'output1': 'my input',
'output2': {'output2a': 'other input', 'output2b': 'default'}}
Commands and actions¶
The run
method can also return a Command
, to tell the process what action to perform next.
By default the command is plumpy.Stop(result=None, successful=True)
, but you can also, for example, return a Continue
command with another function to run:
class ContinueProcess(plumpy.Process):
def run(self):
print("running")
return plumpy.Continue(self.continue_fn)
def continue_fn(self):
print("continuing")
# message is stored in the process status
return plumpy.Kill("I was killed")
process = ContinueProcess()
try:
process.execute()
except plumpy.KilledError as error:
pass
print(process.state)
print(process.status)
process.is_successful
running
continuing
ProcessState.KILLED
I was killed
False
Listeners¶
By defining and adding a ProcessListener
to a process, we can add functions which are triggered when the process enters a particular state.
class WaitListener(plumpy.ProcessListener):
def on_process_running(self, process):
print(process.state.name)
def on_process_waiting(self, process):
print(process.state.name)
process.resume()
class WaitProcess(plumpy.Process):
def run(self):
return plumpy.Wait(self.resume_fn)
def resume_fn(self):
return plumpy.Stop(None, True)
process = WaitProcess()
print(process.state.name)
listener = WaitListener()
process.add_process_listener(listener)
process.execute()
print(process.state.name)
CREATED
RUNNING
WAITING
RUNNING
FINISHED
Asynchronicity¶
Processes run asynchronously on the asyncio event loop,
with step_until_terminated
the higher level async function inside of the execute
method that drives the execution.
Note that each process will always run its steps synchronously and that Continue
commands will not relinquish the control of the event loop.
In the following section on WorkChain
s we will see how to run steps that do relinquish the event loop control, allowing for multiple processes to run asynchronously.
async def async_fn():
print("async_fn start")
await asyncio.sleep(.01)
print("async_fn end")
class NamedProcess(plumpy.Process):
@classmethod
def define(cls, spec: plumpy.ProcessSpec):
super().define(spec)
spec.input('name')
def run(self):
print(self.inputs.name, "run")
return plumpy.Continue(self.continue_fn)
def continue_fn(self):
print(self.inputs.name, "continued")
process1 = NamedProcess({"name": "process1"})
process2 = NamedProcess({"name": "process2"})
async def execute():
await asyncio.gather(
async_fn(),
process1.step_until_terminated(),
process2.step_until_terminated()
)
plumpy.get_event_loop().run_until_complete(execute())
async_fn start
process1 run
process1 continued
process2 run
process2 continued
async_fn end
Pausing and playing¶
Pausing a process will create an action to await a pause Future
before the start of the next step, i.e. it will still finish the current step before awaiting.
When awaiting the pause Future
the event loop control is relinquished, and so other processes are free to start.
Note
Pausing a process does not change its state and is different to the concept of waiting.
The pause Future
can be marked as done, and thus the process continued, via the use of the play
method.
If the play
is called before then end of the step during which the process was paused, it will cancel the pause action.
Below is a toy example, but this concept becomes more useful when using a Controller
, as discussed later.
class SimpleProcess(plumpy.Process):
def run(self):
print(self.get_name())
class PauseProcess(plumpy.Process):
def run(self):
print(f"{self.get_name()}: pausing")
self.pause()
print(f"{self.get_name()}: continue step")
return plumpy.Continue(self.next_step)
def next_step(self):
print(f"{self.get_name()}: next step")
pause_proc = PauseProcess()
simple_proc = SimpleProcess()
async def play(proc):
while True:
if proc.paused:
print(f"{proc.get_name()}: playing (state={proc.state.name})")
proc.play()
break
async def execute():
return await asyncio.gather(
pause_proc.step_until_terminated(),
simple_proc.step_until_terminated(),
play(pause_proc),
)
outputs = plumpy.get_event_loop().run_until_complete(execute())
PauseProcess: pausing
PauseProcess: continue step
SimpleProcess
PauseProcess: playing (state=RUNNING)
PauseProcess: next step
WorkChains¶
Introduction¶
The WorkChain
allows for improved logic, when it comes to running a process as a set of discrete steps (also known as instructions).
The set of steps are defined in the outline()
, which defines a succinct summary of the logical steps that the workchain will perform.
class SimpleWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.outline(
cls.step1,
cls.step2,
)
def step1(self):
print('step1')
def step2(self):
print('step2')
workchain = SimpleWorkChain()
output = workchain.execute()
step1
step2
Conditionals¶
The outline
can contain more than just a list of steps.
It allows for “conditionals” which should return a truthy value to indicate if the nested steps should be run.
For example, using the if_
conditional.
class IfWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('run', valid_type=bool)
spec.outline(
plumpy.if_(cls.if_step)(
cls.conditional_step
)
)
def if_step(self):
print(' if')
return self.inputs.run
def conditional_step(self):
print(' conditional')
workchain = IfWorkChain({"run": False})
print('execute False')
output = workchain.execute()
workchain = IfWorkChain({"run": True})
print('execute True')
output = workchain.execute()
execute False
if
execute True
if
conditional
The context and while conditional¶
The while_
conditional allows steps to be run multiple times.
To achieve this we need to define and iterate an internal variable.
It is important that these variables are saved on the WorkChain.ctx
attribute (an AttributesDict
), since this can be persisted in between steps as we will discuss later.
Important
Arbitrary WorkChain instance attributes will not be persisted.
Internal variables should always be saved to self.ctx
.
class WhileWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('steps', valid_type=int, default=3)
spec.outline(
cls.init_step,
plumpy.while_(cls.while_step)(
cls.conditional_step
)
)
def init_step(self):
self.ctx.iterator = 0
def while_step(self):
self.ctx.iterator += 1
return (self.ctx.iterator <= self.inputs.steps)
def conditional_step(self):
print('step', self.ctx.iterator)
workchain = WhileWorkChain()
output = workchain.execute()
step 1
step 2
step 3
Interstep processes and asynchronicity¶
By default, each “non-terminal” step will return a Continue
command, to progress to the next step (as a RUNNING
state).
In terms of asynchronicity, this means that no step will relinquish control to the asyncio event loop.
By using the to_context
method though, we can add “awaitable” objects.
If the step adds any of these objects, then the workchain is transitioned into a WAITING
state until the awaitables are complete, at which point the workchain moves on to the next step.
The following example shows how to add an arbitrary asychronous function and a Process
as awaitables. You could even add nested WrokChain
with their own interstep processes.
async def awaitable_func(msg):
await asyncio.sleep(.01)
print(msg)
return True
class InternalProcess(plumpy.Process):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('name', valid_type=str, default='process')
spec.output('value')
def run(self):
print(self.inputs.name)
self.out('value', 'value')
class InterstepWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('name', valid_type=str, default='workchain')
spec.input('process', valid_type=bool, default=False)
spec.input('awaitable', valid_type=bool, default=False)
spec.outline(
cls.step1,
cls.step2,
cls.step3,
)
def step1(self):
print(self.inputs.name, 'step1')
def step2(self):
print(self.inputs.name, 'step2')
time.sleep(.01)
if self.inputs.awaitable:
self.to_context(
awaitable=asyncio.ensure_future(
awaitable_func(f'{self.inputs.name} step2 awaitable'),
loop=self.loop
)
)
if self.inputs.process:
self.to_context(
process=self.launch(
InternalProcess,
inputs={'name': f'{self.inputs.name} step2 process'})
)
def step3(self):
print(self.inputs.name, 'step3')
print(f" ctx={self.ctx}")
Without awaitables, we can see that each workchain runs to completion before the next workchain starts.
wkchain1 = InterstepWorkChain({'name': 'wkchain1'})
wkchain2 = InterstepWorkChain({'name': 'wkchain2'})
async def execute():
return await asyncio.gather(
wkchain1.step_until_terminated(),
wkchain2.step_until_terminated()
)
output = plumpy.get_event_loop().run_until_complete(execute())
wkchain1 step1
wkchain1 step2
wkchain1 step3
ctx=AttributesDict()
wkchain2 step1
wkchain2 step2
wkchain2 step3
ctx=AttributesDict()
By adding an interstep process we see that the workchain, in the WAITING
state, relinquishes control of the event loop for other workchains to start.
The outputs of interstep awaitables are saved on on the workchain’s context for later use.
wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True})
wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True})
async def execute():
return await asyncio.gather(
wkchain1.step_until_terminated(),
wkchain2.step_until_terminated()
)
output = plumpy.get_event_loop().run_until_complete(execute())
wkchain1 step1
wkchain1 step2
wkchain2 step1
wkchain2 step2
wkchain1 step2 process
wkchain2 step2 process
wkchain1 step3
ctx=AttributesDict(process={'value': 'value'})
wkchain2 step3
ctx=AttributesDict(process={'value': 'value'})
wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'awaitable': True})
wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'awaitable': True})
async def execute():
return await asyncio.gather(
wkchain1.step_until_terminated(),
wkchain2.step_until_terminated()
)
output = plumpy.get_event_loop().run_until_complete(execute())
wkchain1 step1
wkchain1 step2
wkchain2 step1
wkchain2 step2
wkchain1 step2 awaitable
wkchain2 step2 awaitable
wkchain1 step3
ctx=AttributesDict(awaitable=True)
wkchain2 step3
ctx=AttributesDict(awaitable=True)
Although the steps of individual workchains always progress synchronusly, the order of interleaving of steps from different workchains is non-deterministic.
wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True, 'awaitable': True})
wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True, 'awaitable': True})
async def execute():
return await asyncio.gather(
wkchain1.step_until_terminated(),
wkchain2.step_until_terminated()
)
output = plumpy.get_event_loop().run_until_complete(execute())
wkchain1 step1
wkchain1 step2
wkchain2 step1
wkchain2 step2
wkchain1 step2 process
wkchain2 step2 process
wkchain1 step2 awaitable
wkchain2 step2 awaitable
wkchain1 step3
ctx=AttributesDict(awaitable=True, process={'value': 'value'})
wkchain2 step3
ctx=AttributesDict(awaitable=True, process={'value': 'value'})
Persistence¶
A process can be saved as a checkpoint by a Persister
implementation,
recording enough information to fully recreate the current instance state.
Out-of-the-box implemented persisters are the InMemoryPersister
and PicklePersister
.
Below is a toy implementation, but they are primarily used as part of the Controller
functionality described later.
persister = plumpy.InMemoryPersister()
class PersistWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.outline(
cls.init_step,
cls.step2,
cls.step3,
)
def __repr__(self):
return f"PersistWorkChain(ctx={self.ctx})"
def init_step(self):
self.ctx.step = 1
persister.save_checkpoint(self, 'init')
def step2(self):
self.ctx.step += 1
persister.save_checkpoint(self, 'step2')
def step3(self):
self.ctx.step += 1
persister.save_checkpoint(self, 'step3')
workchain = PersistWorkChain()
workchain.execute()
workchain
PersistWorkChain(ctx=AttributesDict(step=3))
The persister now contains three checkpoints:
persister.get_checkpoints()
[PersistedCheckpoint(pid=UUID('0175da80-5715-4067-a9db-1a7a7ee01eec'), tag='init'),
PersistedCheckpoint(pid=UUID('0175da80-5715-4067-a9db-1a7a7ee01eec'), tag='step2'),
PersistedCheckpoint(pid=UUID('0175da80-5715-4067-a9db-1a7a7ee01eec'), tag='step3')]
Checkpoints can be accessed via the processes PID and the given checkpoint tag. This returns a Bundle
, which can be used to recreate the workchain instance state at the point of the checkpoint:
workchain_checkpoint = persister.load_checkpoint(workchain.pid, 'step2').unbundle()
workchain_checkpoint
PersistWorkChain(ctx=AttributesDict(step=2))
Communicators and Controllers¶
kiwipy
communicators can send messages to subscribers, to launch or control processes.
Messages take one of three forms:
- Tasks are one to many messages.
This means that you sent out a task to a queue and there can be zero or more subscribers attached one of which will process the task when it is ready.
The result of the task can optionally be delivered to the sender.
- A Remote Procedure Call (RPC) is one-to-one.
This is used when you want to call a particular remote function/method and (usually) expect an immediate response.
For example imagine asking a remote process to pause itself.
Here you would make a RPC and wait to get confirmation that it has, indeed, paused.
- Broadcasters send messages to to zero or more consumers.
These are fire and forget calls that broadcast a message to anyone who is listening.
Consumers may optionally apply a filter to only receive messages that match some criteria.
As a simple example, a process can take a communicator on initiation. On process creation, this will add the process to the communicator as an RPC and broadcast subscriber, the communicator can then send RPC messages via a processes PID (as a string) or broadcast to all subscribed processes. Four message types are available:
STATUS
(RPC only)KILL
PAUSE
PLAY
Important
On process termination/close it will remove the process subscriptions to the communicator.
communicator = kiwipy.LocalCommunicator()
class SimpleProcess(plumpy.Process):
pass
process = SimpleProcess(communicator=communicator)
pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())
{'ctime': 1649438757.413114,
'paused': False,
'process_string': '<SimpleProcess> (ProcessState.CREATED)',
'state': 'ProcessState.CREATED'}
A ProcessLauncher
can be subscribed to a communicator, for process launch and continue tasks.
Process controllers can then wrap a communicator, to provide an improved interface.
class ControlledWorkChain(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('steps', valid_type=int, default=10)
spec.output('result', valid_type=int)
spec.outline(
cls.init_step,
plumpy.while_(cls.while_step)(cls.loop_step),
cls.final_step
)
def init_step(self):
self.ctx.iterator = 0
def while_step(self):
return (self.ctx.iterator <= self.inputs.steps)
def loop_step(self):
self.ctx.iterator += 1
def final_step(self):
self.out('result', self.ctx.iterator)
loop_communicator = plumpy.wrap_communicator(kiwipy.LocalCommunicator())
loop_communicator.add_task_subscriber(plumpy.ProcessLauncher())
controller = plumpy.RemoteProcessController(loop_communicator)
wkchain = ControlledWorkChain(communicator=loop_communicator)
async def run_wait():
return await controller.launch_process(ControlledWorkChain)
async def run_nowait():
return await controller.launch_process(ControlledWorkChain, nowait=True)
print(plumpy.get_event_loop().run_until_complete(run_wait()))
print(plumpy.get_event_loop().run_until_complete(run_nowait()))
{'result': 11}
3f43527d-4ca6-403b-935b-b6fb94624bc0
more to come…