{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tutorial"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This Jupyter Notebook provides a tutorial for the features of plumpy.\n",
"\n",
"To use, create an environment with:\n",
"\n",
"```console\n",
"$ conda create -n plumpy-tutorial plumpy>=0.18 jupyterlab\n",
"$ conda activate plumpy-tutorial\n",
"```\n",
"\n",
"and open jupyterlab in the notebook folder:\n",
"\n",
"```console\n",
"$ jupyter lab\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"from pprint import pprint\n",
"import time\n",
"\n",
"import kiwipy\n",
"import plumpy\n",
"\n",
"# this is required because jupyter is already running an event loop\n",
"plumpy.set_event_loop_policy()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plumpy is a library used to create and control long-running workflows.\n",
"\n",
"The library consists of a number of key components, that we will shall cover:\n",
"\n",
"The {py:class}`~plumpy.processes.Process`\n",
": To run a user defined action, with well defined inputs and outputs.\n",
"\n",
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Processes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The {py:class}`~plumpy.processes.Process` is the minimal component of the plumpy workflow.\n",
"It utilises the concept of a [finite state machine](https://en.wikipedia.org/wiki/Finite-state_machine) to run a user defined action, transitioning between a set of states:\n",
"\n",
":::{figure-md}\n",
"\n",
"\n",
"Process state transitions\n",
":::"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The simplest process, implements a `run` method and can be executed with the `execute` method:"
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CREATED\n",
"RUNNING\n",
"FINISHED\n",
"Success True\n",
"Result None\n"
]
}
],
"source": [
"class SimpleProcess(plumpy.Process):\n",
"\n",
" def run(self):\n",
" print(self.state.name)\n",
" \n",
"process = SimpleProcess()\n",
"print(process.state.name)\n",
"process.execute()\n",
"print(process.state.name)\n",
"print(\"Success\", process.is_successful)\n",
"print(\"Result\", process.result())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Defining inputs and outputs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To integrate the process into a more complex workflow, you can define an input and output {py:class}`~plumpy.process_spec.ProcessSpec`.\n",
"\n",
"```{important}\n",
"`define` is a class method and should always call the `super` method.\n",
"```\n",
"\n",
"The process specification creates {py:class}`~plumpy.ports.Port`, which can have help strings, defaults and validators, and can be nested in {py:class}`~plumpy.ports.PortNamespace`."
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'inputs': {'_attrs': {'default': (),\n",
" 'dynamic': False,\n",
" 'help': None,\n",
" 'required': 'True',\n",
" 'valid_type': 'None'},\n",
" 'input1': {'help': 'A help string',\n",
" 'name': 'input1',\n",
" 'required': 'True',\n",
" 'valid_type': \"\"},\n",
" 'input2': {'_attrs': {'default': (),\n",
" 'dynamic': False,\n",
" 'help': None,\n",
" 'required': 'True',\n",
" 'valid_type': 'None'},\n",
" 'input2a': {'name': 'input2a', 'required': 'True'},\n",
" 'input2b': {'default': 'default',\n",
" 'name': 'input2b',\n",
" 'required': 'False'}}},\n",
" 'outputs': {'_attrs': {'default': (),\n",
" 'dynamic': False,\n",
" 'help': None,\n",
" 'required': 'True',\n",
" 'valid_type': 'None'},\n",
" 'output1': {'name': 'output1', 'required': 'True'},\n",
" 'output2': {'_attrs': {'default': (),\n",
" 'dynamic': False,\n",
" 'help': None,\n",
" 'required': 'True',\n",
" 'valid_type': 'None'},\n",
" 'output2a': {'name': 'output2a', 'required': 'True'},\n",
" 'output2b': {'name': 'output2b', 'required': 'True'}}}}\n"
]
},
{
"data": {
"text/plain": [
"{'output1': 'my input',\n",
" 'output2': {'output2a': 'other input', 'output2b': 'default'}}"
]
},
"execution_count": 52,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"class SpecProcess(plumpy.Process):\n",
" \n",
" @classmethod\n",
" def define(cls, spec: plumpy.ProcessSpec):\n",
" super().define(spec)\n",
" spec.input('input1', valid_type=str, help='A help string')\n",
" spec.output('output1')\n",
" \n",
" spec.input_namespace('input2')\n",
" spec.input('input2.input2a')\n",
" spec.input('input2.input2b', default='default')\n",
" \n",
" spec.output_namespace('output2')\n",
" spec.output('output2.output2a')\n",
" spec.output('output2.output2b')\n",
"\n",
" def run(self):\n",
" self.out('output1', self.inputs.input1)\n",
" self.out('output2.output2a', self.inputs.input2.input2a)\n",
" self.out('output2.output2b', self.inputs.input2.input2b)\n",
" \n",
"pprint(SpecProcess.spec().get_description())\n",
"process = SpecProcess(inputs={\n",
" 'input1': 'my input',\n",
" 'input2': {'input2a': 'other input'}\n",
"})\n",
"process.execute()\n",
"process.outputs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Commands and actions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `run` method can also return a {py:class}`~plumpy.process_states.Command`, to tell the process what action to perform next.\n",
"By default the command is `plumpy.Stop(result=None, successful=True)`, but you can also, for example, return a `Continue` command with another function to run:"
]
},
{
"cell_type": "code",
"execution_count": 211,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"running\n",
"continuing\n",
"ProcessState.KILLED\n",
"I was killed\n"
]
},
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 211,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"class ContinueProcess(plumpy.Process):\n",
"\n",
" def run(self):\n",
" print(\"running\")\n",
" return plumpy.Continue(self.continue_fn)\n",
" \n",
" def continue_fn(self):\n",
" print(\"continuing\")\n",
" # message is stored in the process status\n",
" return plumpy.Kill(\"I was killed\")\n",
" \n",
"process = ContinueProcess()\n",
"try:\n",
" process.execute()\n",
"except plumpy.KilledError as error:\n",
" pass\n",
"\n",
"print(process.state)\n",
"print(process.status)\n",
"process.is_successful"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Listeners"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By defining and adding a {py:class}`~plumpy.process_listener.ProcessListener` to a process, we can add functions which are triggered when the process enters a particular state."
]
},
{
"cell_type": "code",
"execution_count": 175,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CREATED\n",
"RUNNING\n",
"WAITING\n",
"RUNNING\n",
"FINISHED\n"
]
}
],
"source": [
"class WaitListener(plumpy.ProcessListener):\n",
"\n",
" def on_process_running(self, process):\n",
" print(process.state.name)\n",
"\n",
" def on_process_waiting(self, process):\n",
" print(process.state.name)\n",
" process.resume()\n",
"\n",
"class WaitProcess(plumpy.Process):\n",
"\n",
" def run(self):\n",
" return plumpy.Wait(self.resume_fn)\n",
" \n",
" def resume_fn(self):\n",
" return plumpy.Stop(None, True)\n",
"\n",
"process = WaitProcess()\n",
"print(process.state.name)\n",
"\n",
"listener = WaitListener()\n",
"process.add_process_listener(listener)\n",
"\n",
"process.execute()\n",
"print(process.state.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Asynchronicity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Processes run asynchronously on the [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html),\n",
"with `step_until_terminated` the higher level async function inside of the `execute` method that drives the execution.\n",
"\n",
"Note that each process will always run its steps synchronously and that `Continue` commands will not relinquish the control of the event loop.\n",
"In the following section on `WorkChain`s we will see how to run steps that do relinquish the event loop control, allowing for multiple processes to run asynchronously."
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"async_fn start\n",
"process1 run\n",
"process1 continued\n",
"process2 run\n",
"process2 continued\n",
"async_fn end\n"
]
}
],
"source": [
"async def async_fn():\n",
" print(\"async_fn start\")\n",
" await asyncio.sleep(.01)\n",
" print(\"async_fn end\")\n",
"\n",
"class NamedProcess(plumpy.Process):\n",
" \n",
" @classmethod\n",
" def define(cls, spec: plumpy.ProcessSpec):\n",
" super().define(spec)\n",
" spec.input('name')\n",
"\n",
" def run(self):\n",
" print(self.inputs.name, \"run\")\n",
" return plumpy.Continue(self.continue_fn)\n",
"\n",
" def continue_fn(self):\n",
" print(self.inputs.name, \"continued\")\n",
"\n",
"process1 = NamedProcess({\"name\": \"process1\"})\n",
"process2 = NamedProcess({\"name\": \"process2\"})\n",
"\n",
"async def execute():\n",
" await asyncio.gather(\n",
" async_fn(),\n",
" process1.step_until_terminated(),\n",
" process2.step_until_terminated()\n",
" )\n",
"\n",
"plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Pausing and playing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Pausing a process will create an action to await a pause `Future` **before the start of the next step**, i.e. it will still finish the current step before awaiting.\n",
"When awaiting the pause `Future` the event loop control is relinquished, and so other processes are free to start.\n",
"\n",
"```{note}\n",
"Pausing a process does not change its state and is different to the concept of waiting.\n",
"```\n",
"\n",
"The pause `Future` can be marked as done, and thus the process continued, *via* the use of the `play` method.\n",
"If the `play` is called before then end of the step during which the process was paused, it will cancel the pause action.\n",
"\n",
"Below is a toy example, but this concept becomes more useful when using a `Controller`, as discussed later."
]
},
{
"cell_type": "code",
"execution_count": 212,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PauseProcess: pausing\n",
"PauseProcess: continue step\n",
"SimpleProcess\n",
"PauseProcess: playing (state=RUNNING)\n",
"PauseProcess: next step\n"
]
}
],
"source": [
"class SimpleProcess(plumpy.Process):\n",
" \n",
" def run(self):\n",
" print(self.get_name())\n",
" \n",
"class PauseProcess(plumpy.Process):\n",
"\n",
" def run(self):\n",
" print(f\"{self.get_name()}: pausing\")\n",
" self.pause()\n",
" print(f\"{self.get_name()}: continue step\")\n",
" return plumpy.Continue(self.next_step)\n",
" \n",
" def next_step(self):\n",
" print(f\"{self.get_name()}: next step\")\n",
"\n",
"pause_proc = PauseProcess()\n",
"simple_proc = SimpleProcess()\n",
"\n",
"async def play(proc):\n",
" while True:\n",
" if proc.paused:\n",
" print(f\"{proc.get_name()}: playing (state={proc.state.name})\")\n",
" proc.play()\n",
" break\n",
"\n",
"async def execute():\n",
" return await asyncio.gather(\n",
" pause_proc.step_until_terminated(),\n",
" simple_proc.step_until_terminated(),\n",
" play(pause_proc),\n",
" )\n",
"\n",
"outputs = plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## WorkChains"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Introduction"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The {py:class}`~plumpy.workchains.WorkChain` allows for improved logic, when it comes to running a process as a set of discrete steps (also known as instructions).\n",
"\n",
"The set of steps are defined in the {py:meth}`~plumpy.workchains.WorkChainSpec.outline`, which defines a succinct summary of the logical steps that the workchain will perform."
]
},
{
"cell_type": "code",
"execution_count": 110,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"step1\n",
"step2\n"
]
}
],
"source": [
"class SimpleWorkChain(plumpy.WorkChain):\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.outline(\n",
" cls.step1,\n",
" cls.step2,\n",
" )\n",
"\n",
" def step1(self):\n",
" print('step1')\n",
"\n",
" def step2(self):\n",
" print('step2')\n",
" \n",
"workchain = SimpleWorkChain()\n",
"output = workchain.execute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Conditionals"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `outline` can contain more than just a list of steps.\n",
"It allows for \"conditionals\" which should return a truthy value to indicate if the nested steps should be run.\n",
"\n",
"For example, using the `if_` conditional."
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"execute False\n",
" if\n",
"execute True\n",
" if\n",
" conditional\n"
]
}
],
"source": [
"class IfWorkChain(plumpy.WorkChain):\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.input('run', valid_type=bool)\n",
"\n",
" spec.outline(\n",
" plumpy.if_(cls.if_step)(\n",
" cls.conditional_step\n",
" )\n",
" )\n",
"\n",
" def if_step(self):\n",
" print(' if')\n",
" return self.inputs.run\n",
"\n",
" def conditional_step(self):\n",
" print(' conditional')\n",
" \n",
"workchain = IfWorkChain({\"run\": False})\n",
"print('execute False')\n",
"output = workchain.execute()\n",
"\n",
"workchain = IfWorkChain({\"run\": True})\n",
"print('execute True')\n",
"output = workchain.execute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### The context and while conditional"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `while_` conditional allows steps to be run multiple times.\n",
"To achieve this we need to define and iterate an internal variable.\n",
"It is important that these variables are saved on the `WorkChain.ctx` attribute (an {py:class}`~plumpy.utils.AttributesDict`), since this can be persisted in between steps as we will discuss later.\n",
"\n",
"```{important}\n",
"Arbitrary WorkChain instance attributes will not be persisted.\n",
"Internal variables should always be saved to `self.ctx`.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 149,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"step 1\n",
"step 2\n",
"step 3\n"
]
}
],
"source": [
"class WhileWorkChain(plumpy.WorkChain):\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.input('steps', valid_type=int, default=3)\n",
"\n",
" spec.outline(\n",
" cls.init_step,\n",
" plumpy.while_(cls.while_step)(\n",
" cls.conditional_step\n",
" )\n",
" )\n",
" \n",
" def init_step(self):\n",
" self.ctx.iterator = 0\n",
"\n",
" def while_step(self):\n",
" self.ctx.iterator += 1\n",
" return (self.ctx.iterator <= self.inputs.steps)\n",
"\n",
" def conditional_step(self):\n",
" print('step', self.ctx.iterator)\n",
" \n",
"workchain = WhileWorkChain()\n",
"output = workchain.execute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Interstep processes and asynchronicity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By default, each \"non-terminal\" step will return a `Continue` command, to progress to the next step (as a `RUNNING` state).\n",
"In terms of asynchronicity, this means that no step will relinquish control to the asyncio event loop.\n",
"\n",
"By using the `to_context` method though, we can add \"awaitable\" objects.\n",
"If the step adds any of these objects, then the workchain is transitioned into a `WAITING` state until the awaitables are complete, at which point the workchain moves on to the next step.\n",
"\n",
"The following example shows how to add an arbitrary asychronous function and a `Process` as awaitables. You could even add nested `WrokChain` with their own interstep processes."
]
},
{
"cell_type": "code",
"execution_count": 152,
"metadata": {},
"outputs": [],
"source": [
"async def awaitable_func(msg):\n",
" await asyncio.sleep(.01)\n",
" print(msg)\n",
" return True\n",
" \n",
"\n",
"class InternalProcess(plumpy.Process):\n",
"\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.input('name', valid_type=str, default='process')\n",
" spec.output('value')\n",
"\n",
" def run(self):\n",
" print(self.inputs.name)\n",
" self.out('value', 'value')\n",
"\n",
"\n",
"class InterstepWorkChain(plumpy.WorkChain):\n",
"\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.input('name', valid_type=str, default='workchain')\n",
" spec.input('process', valid_type=bool, default=False)\n",
" spec.input('awaitable', valid_type=bool, default=False)\n",
" spec.outline(\n",
" cls.step1,\n",
" cls.step2,\n",
" cls.step3,\n",
" )\n",
" \n",
" def step1(self):\n",
" print(self.inputs.name, 'step1')\n",
"\n",
" def step2(self):\n",
" print(self.inputs.name, 'step2')\n",
" time.sleep(.01)\n",
" \n",
" if self.inputs.awaitable:\n",
" self.to_context(\n",
" awaitable=asyncio.ensure_future(\n",
" awaitable_func(f'{self.inputs.name} step2 awaitable'),\n",
" loop=self.loop\n",
" )\n",
" )\n",
" if self.inputs.process:\n",
" self.to_context(\n",
" process=self.launch(\n",
" InternalProcess, \n",
" inputs={'name': f'{self.inputs.name} step2 process'})\n",
" )\n",
"\n",
" def step3(self):\n",
" print(self.inputs.name, 'step3')\n",
" print(f\" ctx={self.ctx}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Without awaitables, we can see that each workchain runs to completion before the next workchain starts."
]
},
{
"cell_type": "code",
"execution_count": 151,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"wkchain1 step1\n",
"wkchain1 step2\n",
"wkchain1 step3\n",
" ctx=AttributesDict()\n",
"wkchain2 step1\n",
"wkchain2 step2\n",
"wkchain2 step3\n",
" ctx=AttributesDict()\n"
]
}
],
"source": [
"wkchain1 = InterstepWorkChain({'name': 'wkchain1'})\n",
"wkchain2 = InterstepWorkChain({'name': 'wkchain2'})\n",
"\n",
"async def execute():\n",
" return await asyncio.gather(\n",
" wkchain1.step_until_terminated(),\n",
" wkchain2.step_until_terminated()\n",
" )\n",
"\n",
"output = plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By adding an interstep process we see that the workchain, in the `WAITING` state, relinquishes control of the event loop for other workchains to start.\n",
"\n",
"The outputs of interstep awaitables are saved on on the workchain's context for later use."
]
},
{
"cell_type": "code",
"execution_count": 143,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"wkchain1 step1\n",
"wkchain1 step2\n",
"wkchain2 step1\n",
"wkchain2 step2\n",
"wkchain1 step2 process\n",
"wkchain2 step2 process\n",
"wkchain1 step3\n",
" ctx=AttributesDict(process={'value': 'value'})\n",
"wkchain2 step3\n",
" ctx=AttributesDict(process={'value': 'value'})\n"
]
}
],
"source": [
"wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True})\n",
"wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True})\n",
"\n",
"async def execute():\n",
" return await asyncio.gather(\n",
" wkchain1.step_until_terminated(),\n",
" wkchain2.step_until_terminated()\n",
" )\n",
"\n",
"output = plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "code",
"execution_count": 144,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"wkchain1 step1\n",
"wkchain1 step2\n",
"wkchain2 step1\n",
"wkchain2 step2\n",
"wkchain1 step2 awaitable\n",
"wkchain2 step2 awaitable\n",
"wkchain1 step3\n",
" ctx=AttributesDict(awaitable=True)\n",
"wkchain2 step3\n",
" ctx=AttributesDict(awaitable=True)\n"
]
}
],
"source": [
"wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'awaitable': True})\n",
"wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'awaitable': True})\n",
"\n",
"async def execute():\n",
" return await asyncio.gather(\n",
" wkchain1.step_until_terminated(),\n",
" wkchain2.step_until_terminated()\n",
" )\n",
"\n",
"output = plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Although the steps of individual workchains always progress synchronusly, the order of interleaving of steps from different workchains is non-deterministic."
]
},
{
"cell_type": "code",
"execution_count": 159,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"wkchain1 step1\n",
"wkchain1 step2\n",
"wkchain2 step1\n",
"wkchain2 step2\n",
"wkchain1 step2 process\n",
"wkchain2 step2 process\n",
"wkchain1 step2 awaitable\n",
"wkchain1 step3\n",
" ctx=AttributesDict(awaitable=True, process={'value': 'value'})\n",
"wkchain2 step2 awaitable\n",
"wkchain2 step3\n",
" ctx=AttributesDict(awaitable=True, process={'value': 'value'})\n"
]
}
],
"source": [
"wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True, 'awaitable': True})\n",
"wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True, 'awaitable': True})\n",
"\n",
"async def execute():\n",
" return await asyncio.gather(\n",
" wkchain1.step_until_terminated(),\n",
" wkchain2.step_until_terminated()\n",
" )\n",
"\n",
"output = plumpy.get_event_loop().run_until_complete(execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Persistence"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A process can be saved as a checkpoint by a {py:class}`~plumpy.persistence.Persister` implementation,\n",
"recording enough information to fully recreate the current instance state.\n",
"Out-of-the-box implemented persisters are the {py:class}`~plumpy.persistence.InMemoryPersister` and {py:class}`~plumpy.persistence.PicklePersister`.\n",
"\n",
"Below is a toy implementation, but they are primarily used as part of the `Controller` functionality described later."
]
},
{
"cell_type": "code",
"execution_count": 163,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PersistWorkChain(ctx=AttributesDict(step=3))"
]
},
"execution_count": 163,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"persister = plumpy.InMemoryPersister()\n",
"\n",
"class PersistWorkChain(plumpy.WorkChain):\n",
"\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.outline(\n",
" cls.init_step,\n",
" cls.step2,\n",
" cls.step3,\n",
" )\n",
" \n",
" def __repr__(self):\n",
" return f\"PersistWorkChain(ctx={self.ctx})\"\n",
" \n",
" def init_step(self):\n",
" self.ctx.step = 1\n",
" persister.save_checkpoint(self, 'init')\n",
"\n",
" def step2(self):\n",
" self.ctx.step += 1\n",
" persister.save_checkpoint(self, 'step2')\n",
"\n",
" def step3(self):\n",
" self.ctx.step += 1\n",
" persister.save_checkpoint(self, 'step3')\n",
" \n",
"workchain = PersistWorkChain()\n",
"workchain.execute()\n",
"workchain"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The persister now contains three checkpoints:"
]
},
{
"cell_type": "code",
"execution_count": 164,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='init'),\n",
" PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='step2'),\n",
" PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='step3')]"
]
},
"execution_count": 164,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"persister.get_checkpoints()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Checkpoints can be accessed *via* the processes PID and the given checkpoint tag. This returns a `Bundle`, which can be used to recreate the workchain instance state at the point of the checkpoint:"
]
},
{
"cell_type": "code",
"execution_count": 168,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PersistWorkChain(ctx=AttributesDict(step=2))"
]
},
"execution_count": 168,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"workchain_checkpoint = persister.load_checkpoint(workchain.pid, 'step2').unbundle()\n",
"workchain_checkpoint"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Communicators and Controllers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`kiwipy` communicators can send messages to subscribers, to launch or control processes.\n",
"Messages take one of three forms:\n",
"\n",
"Tasks are one to many messages.\n",
": This means that you sent out a task to a queue and there can be zero or more subscribers attached one of which will process the task when it is ready.\n",
": The result of the task can optionally be delivered to the sender.\n",
"\n",
"A Remote Procedure Call (RPC) is one-to-one.\n",
": This is used when you want to call a particular remote function/method and (usually) expect an immediate response.\n",
": For example imagine asking a remote process to pause itself.\n",
": Here you would make a RPC and wait to get confirmation that it has, indeed, paused.\n",
"\n",
"\n",
"Broadcasters send messages to to zero or more consumers.\n",
": These are fire and forget calls that broadcast a message to anyone who is listening.\n",
": Consumers may optionally apply a filter to only receive messages that match some criteria."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As a simple example, a process can take a communicator on initiation.\n",
"On process creation, this will add the process to the communicator as an RPC and broadcast subscriber,\n",
"the communicator can then send RPC messages *via* a processes PID (as a string) or broadcast to all subscribed processes.\n",
"Four message types are available:\n",
"\n",
"- `STATUS` (RPC only)\n",
"- `KILL`\n",
"- `PAUSE`\n",
"- `PLAY`\n",
"\n",
"\n",
"```{important}\n",
"On process termination/close it will remove the process subscriptions to the communicator.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 250,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'ctime': 1610495402.981895,\n",
" 'paused': False,\n",
" 'process_string': ' (ProcessState.CREATED)',\n",
" 'state': ,\n",
" 'state_info': 'ProcessState.CREATED'}\n"
]
}
],
"source": [
"communicator = kiwipy.LocalCommunicator()\n",
"\n",
"class SimpleProcess(plumpy.Process):\n",
" pass\n",
"\n",
"process = SimpleProcess(communicator=communicator)\n",
"\n",
"pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A `ProcessLauncher` can be subscribed to a communicator, for process launch and continue tasks.\n",
"Process controllers can then wrap a communicator, to provide an improved interface."
]
},
{
"cell_type": "code",
"execution_count": 332,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'result': 11}\n",
"659b8b63-42a9-44a7-978f-07c6a2887f7e\n"
]
}
],
"source": [
"class ControlledWorkChain(plumpy.WorkChain):\n",
"\n",
" @classmethod\n",
" def define(cls, spec):\n",
" super().define(spec)\n",
" spec.input('steps', valid_type=int, default=10)\n",
" spec.output('result', valid_type=int)\n",
"\n",
" spec.outline(\n",
" cls.init_step,\n",
" plumpy.while_(cls.while_step)(cls.loop_step),\n",
" cls.final_step\n",
" )\n",
" \n",
" def init_step(self):\n",
" self.ctx.iterator = 0\n",
"\n",
" def while_step(self):\n",
" return (self.ctx.iterator <= self.inputs.steps)\n",
" \n",
" def loop_step(self):\n",
" self.ctx.iterator += 1\n",
"\n",
" def final_step(self):\n",
" self.out('result', self.ctx.iterator)\n",
"\n",
"loop_communicator = plumpy.wrap_communicator(kiwipy.LocalCommunicator())\n",
"loop_communicator.add_task_subscriber(plumpy.ProcessLauncher())\n",
"controller = plumpy.RemoteProcessController(loop_communicator)\n",
"\n",
"wkchain = ControlledWorkChain(communicator=loop_communicator)\n",
" \n",
"async def run_wait():\n",
" return await controller.launch_process(ControlledWorkChain)\n",
"\n",
"async def run_nowait():\n",
" return await controller.launch_process(ControlledWorkChain, nowait=True)\n",
"\n",
"print(plumpy.get_event_loop().run_until_complete(run_wait()))\n",
"print(plumpy.get_event_loop().run_until_complete(run_nowait()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"more to come..."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}