{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Tutorial" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This Jupyter Notebook provides a tutorial for the features of plumpy.\n", "\n", "To use, create an environment with:\n", "\n", "```console\n", "$ conda create -n plumpy-tutorial plumpy>=0.18 jupyterlab\n", "$ conda activate plumpy-tutorial\n", "```\n", "\n", "and open jupyterlab in the notebook folder:\n", "\n", "```console\n", "$ jupyter lab\n", "```" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "from pprint import pprint\n", "import time\n", "\n", "import kiwipy\n", "import plumpy\n", "\n", "# this is required because jupyter is already running an event loop\n", "plumpy.set_event_loop_policy()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plumpy is a library used to create and control long-running workflows.\n", "\n", "The library consists of a number of key components, that we will shall cover:\n", "\n", "The {py:class}`~plumpy.processes.Process`\n", ": To run a user defined action, with well defined inputs and outputs.\n", "\n", "The {py:class}`~plumpy.workchains.WorkChain`\n", ": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n", "\n", "The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n", ": To control the process or workchain throughout its lifetime." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Processes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The {py:class}`~plumpy.processes.Process` is the minimal component of the plumpy workflow.\n", "It utilises the concept of a [finite state machine](https://en.wikipedia.org/wiki/Finite-state_machine) to run a user defined action, transitioning between a set of states:\n", "\n", ":::{figure-md}\n", "\n", "\n", "Process state transitions\n", ":::" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The simplest process, implements a `run` method and can be executed with the `execute` method:" ] }, { "cell_type": "code", "execution_count": 73, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CREATED\n", "RUNNING\n", "FINISHED\n", "Success True\n", "Result None\n" ] } ], "source": [ "class SimpleProcess(plumpy.Process):\n", "\n", " def run(self):\n", " print(self.state.name)\n", " \n", "process = SimpleProcess()\n", "print(process.state.name)\n", "process.execute()\n", "print(process.state.name)\n", "print(\"Success\", process.is_successful)\n", "print(\"Result\", process.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Defining inputs and outputs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To integrate the process into a more complex workflow, you can define an input and output {py:class}`~plumpy.process_spec.ProcessSpec`.\n", "\n", "```{important}\n", "`define` is a class method and should always call the `super` method.\n", "```\n", "\n", "The process specification creates {py:class}`~plumpy.ports.Port`, which can have help strings, defaults and validators, and can be nested in {py:class}`~plumpy.ports.PortNamespace`." ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'inputs': {'_attrs': {'default': (),\n", " 'dynamic': False,\n", " 'help': None,\n", " 'required': 'True',\n", " 'valid_type': 'None'},\n", " 'input1': {'help': 'A help string',\n", " 'name': 'input1',\n", " 'required': 'True',\n", " 'valid_type': \"\"},\n", " 'input2': {'_attrs': {'default': (),\n", " 'dynamic': False,\n", " 'help': None,\n", " 'required': 'True',\n", " 'valid_type': 'None'},\n", " 'input2a': {'name': 'input2a', 'required': 'True'},\n", " 'input2b': {'default': 'default',\n", " 'name': 'input2b',\n", " 'required': 'False'}}},\n", " 'outputs': {'_attrs': {'default': (),\n", " 'dynamic': False,\n", " 'help': None,\n", " 'required': 'True',\n", " 'valid_type': 'None'},\n", " 'output1': {'name': 'output1', 'required': 'True'},\n", " 'output2': {'_attrs': {'default': (),\n", " 'dynamic': False,\n", " 'help': None,\n", " 'required': 'True',\n", " 'valid_type': 'None'},\n", " 'output2a': {'name': 'output2a', 'required': 'True'},\n", " 'output2b': {'name': 'output2b', 'required': 'True'}}}}\n" ] }, { "data": { "text/plain": [ "{'output1': 'my input',\n", " 'output2': {'output2a': 'other input', 'output2b': 'default'}}" ] }, "execution_count": 52, "metadata": {}, "output_type": "execute_result" } ], "source": [ "class SpecProcess(plumpy.Process):\n", " \n", " @classmethod\n", " def define(cls, spec: plumpy.ProcessSpec):\n", " super().define(spec)\n", " spec.input('input1', valid_type=str, help='A help string')\n", " spec.output('output1')\n", " \n", " spec.input_namespace('input2')\n", " spec.input('input2.input2a')\n", " spec.input('input2.input2b', default='default')\n", " \n", " spec.output_namespace('output2')\n", " spec.output('output2.output2a')\n", " spec.output('output2.output2b')\n", "\n", " def run(self):\n", " self.out('output1', self.inputs.input1)\n", " self.out('output2.output2a', self.inputs.input2.input2a)\n", " self.out('output2.output2b', self.inputs.input2.input2b)\n", " \n", "pprint(SpecProcess.spec().get_description())\n", "process = SpecProcess(inputs={\n", " 'input1': 'my input',\n", " 'input2': {'input2a': 'other input'}\n", "})\n", "process.execute()\n", "process.outputs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Commands and actions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `run` method can also return a {py:class}`~plumpy.process_states.Command`, to tell the process what action to perform next.\n", "By default the command is `plumpy.Stop(result=None, successful=True)`, but you can also, for example, return a `Continue` command with another function to run:" ] }, { "cell_type": "code", "execution_count": 211, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "running\n", "continuing\n", "ProcessState.KILLED\n", "I was killed\n" ] }, { "data": { "text/plain": [ "False" ] }, "execution_count": 211, "metadata": {}, "output_type": "execute_result" } ], "source": [ "class ContinueProcess(plumpy.Process):\n", "\n", " def run(self):\n", " print(\"running\")\n", " return plumpy.Continue(self.continue_fn)\n", " \n", " def continue_fn(self):\n", " print(\"continuing\")\n", " # message is stored in the process status\n", " return plumpy.Kill(\"I was killed\")\n", " \n", "process = ContinueProcess()\n", "try:\n", " process.execute()\n", "except plumpy.KilledError as error:\n", " pass\n", "\n", "print(process.state)\n", "print(process.status)\n", "process.is_successful" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Listeners" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By defining and adding a {py:class}`~plumpy.process_listener.ProcessListener` to a process, we can add functions which are triggered when the process enters a particular state." ] }, { "cell_type": "code", "execution_count": 175, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CREATED\n", "RUNNING\n", "WAITING\n", "RUNNING\n", "FINISHED\n" ] } ], "source": [ "class WaitListener(plumpy.ProcessListener):\n", "\n", " def on_process_running(self, process):\n", " print(process.state.name)\n", "\n", " def on_process_waiting(self, process):\n", " print(process.state.name)\n", " process.resume()\n", "\n", "class WaitProcess(plumpy.Process):\n", "\n", " def run(self):\n", " return plumpy.Wait(self.resume_fn)\n", " \n", " def resume_fn(self):\n", " return plumpy.Stop(None, True)\n", "\n", "process = WaitProcess()\n", "print(process.state.name)\n", "\n", "listener = WaitListener()\n", "process.add_process_listener(listener)\n", "\n", "process.execute()\n", "print(process.state.name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Asynchronicity" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Processes run asynchronously on the [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html),\n", "with `step_until_terminated` the higher level async function inside of the `execute` method that drives the execution.\n", "\n", "Note that each process will always run its steps synchronously and that `Continue` commands will not relinquish the control of the event loop.\n", "In the following section on `WorkChain`s we will see how to run steps that do relinquish the event loop control, allowing for multiple processes to run asynchronously." ] }, { "cell_type": "code", "execution_count": 95, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "async_fn start\n", "process1 run\n", "process1 continued\n", "process2 run\n", "process2 continued\n", "async_fn end\n" ] } ], "source": [ "async def async_fn():\n", " print(\"async_fn start\")\n", " await asyncio.sleep(.01)\n", " print(\"async_fn end\")\n", "\n", "class NamedProcess(plumpy.Process):\n", " \n", " @classmethod\n", " def define(cls, spec: plumpy.ProcessSpec):\n", " super().define(spec)\n", " spec.input('name')\n", "\n", " def run(self):\n", " print(self.inputs.name, \"run\")\n", " return plumpy.Continue(self.continue_fn)\n", "\n", " def continue_fn(self):\n", " print(self.inputs.name, \"continued\")\n", "\n", "process1 = NamedProcess({\"name\": \"process1\"})\n", "process2 = NamedProcess({\"name\": \"process2\"})\n", "\n", "async def execute():\n", " await asyncio.gather(\n", " async_fn(),\n", " process1.step_until_terminated(),\n", " process2.step_until_terminated()\n", " )\n", "\n", "plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pausing and playing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pausing a process will create an action to await a pause `Future` **before the start of the next step**, i.e. it will still finish the current step before awaiting.\n", "When awaiting the pause `Future` the event loop control is relinquished, and so other processes are free to start.\n", "\n", "```{note}\n", "Pausing a process does not change its state and is different to the concept of waiting.\n", "```\n", "\n", "The pause `Future` can be marked as done, and thus the process continued, *via* the use of the `play` method.\n", "If the `play` is called before then end of the step during which the process was paused, it will cancel the pause action.\n", "\n", "Below is a toy example, but this concept becomes more useful when using a `Controller`, as discussed later." ] }, { "cell_type": "code", "execution_count": 212, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "PauseProcess: pausing\n", "PauseProcess: continue step\n", "SimpleProcess\n", "PauseProcess: playing (state=RUNNING)\n", "PauseProcess: next step\n" ] } ], "source": [ "class SimpleProcess(plumpy.Process):\n", " \n", " def run(self):\n", " print(self.get_name())\n", " \n", "class PauseProcess(plumpy.Process):\n", "\n", " def run(self):\n", " print(f\"{self.get_name()}: pausing\")\n", " self.pause()\n", " print(f\"{self.get_name()}: continue step\")\n", " return plumpy.Continue(self.next_step)\n", " \n", " def next_step(self):\n", " print(f\"{self.get_name()}: next step\")\n", "\n", "pause_proc = PauseProcess()\n", "simple_proc = SimpleProcess()\n", "\n", "async def play(proc):\n", " while True:\n", " if proc.paused:\n", " print(f\"{proc.get_name()}: playing (state={proc.state.name})\")\n", " proc.play()\n", " break\n", "\n", "async def execute():\n", " return await asyncio.gather(\n", " pause_proc.step_until_terminated(),\n", " simple_proc.step_until_terminated(),\n", " play(pause_proc),\n", " )\n", "\n", "outputs = plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## WorkChains" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Introduction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The {py:class}`~plumpy.workchains.WorkChain` allows for improved logic, when it comes to running a process as a set of discrete steps (also known as instructions).\n", "\n", "The set of steps are defined in the {py:meth}`~plumpy.workchains.WorkChainSpec.outline`, which defines a succinct summary of the logical steps that the workchain will perform." ] }, { "cell_type": "code", "execution_count": 110, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "step1\n", "step2\n" ] } ], "source": [ "class SimpleWorkChain(plumpy.WorkChain):\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.outline(\n", " cls.step1,\n", " cls.step2,\n", " )\n", "\n", " def step1(self):\n", " print('step1')\n", "\n", " def step2(self):\n", " print('step2')\n", " \n", "workchain = SimpleWorkChain()\n", "output = workchain.execute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Conditionals" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `outline` can contain more than just a list of steps.\n", "It allows for \"conditionals\" which should return a truthy value to indicate if the nested steps should be run.\n", "\n", "For example, using the `if_` conditional." ] }, { "cell_type": "code", "execution_count": 109, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "execute False\n", " if\n", "execute True\n", " if\n", " conditional\n" ] } ], "source": [ "class IfWorkChain(plumpy.WorkChain):\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.input('run', valid_type=bool)\n", "\n", " spec.outline(\n", " plumpy.if_(cls.if_step)(\n", " cls.conditional_step\n", " )\n", " )\n", "\n", " def if_step(self):\n", " print(' if')\n", " return self.inputs.run\n", "\n", " def conditional_step(self):\n", " print(' conditional')\n", " \n", "workchain = IfWorkChain({\"run\": False})\n", "print('execute False')\n", "output = workchain.execute()\n", "\n", "workchain = IfWorkChain({\"run\": True})\n", "print('execute True')\n", "output = workchain.execute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The context and while conditional" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `while_` conditional allows steps to be run multiple times.\n", "To achieve this we need to define and iterate an internal variable.\n", "It is important that these variables are saved on the `WorkChain.ctx` attribute (an {py:class}`~plumpy.utils.AttributesDict`), since this can be persisted in between steps as we will discuss later.\n", "\n", "```{important}\n", "Arbitrary WorkChain instance attributes will not be persisted.\n", "Internal variables should always be saved to `self.ctx`.\n", "```" ] }, { "cell_type": "code", "execution_count": 149, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "step 1\n", "step 2\n", "step 3\n" ] } ], "source": [ "class WhileWorkChain(plumpy.WorkChain):\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.input('steps', valid_type=int, default=3)\n", "\n", " spec.outline(\n", " cls.init_step,\n", " plumpy.while_(cls.while_step)(\n", " cls.conditional_step\n", " )\n", " )\n", " \n", " def init_step(self):\n", " self.ctx.iterator = 0\n", "\n", " def while_step(self):\n", " self.ctx.iterator += 1\n", " return (self.ctx.iterator <= self.inputs.steps)\n", "\n", " def conditional_step(self):\n", " print('step', self.ctx.iterator)\n", " \n", "workchain = WhileWorkChain()\n", "output = workchain.execute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Interstep processes and asynchronicity" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default, each \"non-terminal\" step will return a `Continue` command, to progress to the next step (as a `RUNNING` state).\n", "In terms of asynchronicity, this means that no step will relinquish control to the asyncio event loop.\n", "\n", "By using the `to_context` method though, we can add \"awaitable\" objects.\n", "If the step adds any of these objects, then the workchain is transitioned into a `WAITING` state until the awaitables are complete, at which point the workchain moves on to the next step.\n", "\n", "The following example shows how to add an arbitrary asychronous function and a `Process` as awaitables. You could even add nested `WrokChain` with their own interstep processes." ] }, { "cell_type": "code", "execution_count": 152, "metadata": {}, "outputs": [], "source": [ "async def awaitable_func(msg):\n", " await asyncio.sleep(.01)\n", " print(msg)\n", " return True\n", " \n", "\n", "class InternalProcess(plumpy.Process):\n", "\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.input('name', valid_type=str, default='process')\n", " spec.output('value')\n", "\n", " def run(self):\n", " print(self.inputs.name)\n", " self.out('value', 'value')\n", "\n", "\n", "class InterstepWorkChain(plumpy.WorkChain):\n", "\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.input('name', valid_type=str, default='workchain')\n", " spec.input('process', valid_type=bool, default=False)\n", " spec.input('awaitable', valid_type=bool, default=False)\n", " spec.outline(\n", " cls.step1,\n", " cls.step2,\n", " cls.step3,\n", " )\n", " \n", " def step1(self):\n", " print(self.inputs.name, 'step1')\n", "\n", " def step2(self):\n", " print(self.inputs.name, 'step2')\n", " time.sleep(.01)\n", " \n", " if self.inputs.awaitable:\n", " self.to_context(\n", " awaitable=asyncio.ensure_future(\n", " awaitable_func(f'{self.inputs.name} step2 awaitable'),\n", " loop=self.loop\n", " )\n", " )\n", " if self.inputs.process:\n", " self.to_context(\n", " process=self.launch(\n", " InternalProcess, \n", " inputs={'name': f'{self.inputs.name} step2 process'})\n", " )\n", "\n", " def step3(self):\n", " print(self.inputs.name, 'step3')\n", " print(f\" ctx={self.ctx}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Without awaitables, we can see that each workchain runs to completion before the next workchain starts." ] }, { "cell_type": "code", "execution_count": 151, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "wkchain1 step1\n", "wkchain1 step2\n", "wkchain1 step3\n", " ctx=AttributesDict()\n", "wkchain2 step1\n", "wkchain2 step2\n", "wkchain2 step3\n", " ctx=AttributesDict()\n" ] } ], "source": [ "wkchain1 = InterstepWorkChain({'name': 'wkchain1'})\n", "wkchain2 = InterstepWorkChain({'name': 'wkchain2'})\n", "\n", "async def execute():\n", " return await asyncio.gather(\n", " wkchain1.step_until_terminated(),\n", " wkchain2.step_until_terminated()\n", " )\n", "\n", "output = plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By adding an interstep process we see that the workchain, in the `WAITING` state, relinquishes control of the event loop for other workchains to start.\n", "\n", "The outputs of interstep awaitables are saved on on the workchain's context for later use." ] }, { "cell_type": "code", "execution_count": 143, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "wkchain1 step1\n", "wkchain1 step2\n", "wkchain2 step1\n", "wkchain2 step2\n", "wkchain1 step2 process\n", "wkchain2 step2 process\n", "wkchain1 step3\n", " ctx=AttributesDict(process={'value': 'value'})\n", "wkchain2 step3\n", " ctx=AttributesDict(process={'value': 'value'})\n" ] } ], "source": [ "wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True})\n", "wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True})\n", "\n", "async def execute():\n", " return await asyncio.gather(\n", " wkchain1.step_until_terminated(),\n", " wkchain2.step_until_terminated()\n", " )\n", "\n", "output = plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "code", "execution_count": 144, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "wkchain1 step1\n", "wkchain1 step2\n", "wkchain2 step1\n", "wkchain2 step2\n", "wkchain1 step2 awaitable\n", "wkchain2 step2 awaitable\n", "wkchain1 step3\n", " ctx=AttributesDict(awaitable=True)\n", "wkchain2 step3\n", " ctx=AttributesDict(awaitable=True)\n" ] } ], "source": [ "wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'awaitable': True})\n", "wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'awaitable': True})\n", "\n", "async def execute():\n", " return await asyncio.gather(\n", " wkchain1.step_until_terminated(),\n", " wkchain2.step_until_terminated()\n", " )\n", "\n", "output = plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Although the steps of individual workchains always progress synchronusly, the order of interleaving of steps from different workchains is non-deterministic." ] }, { "cell_type": "code", "execution_count": 159, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "wkchain1 step1\n", "wkchain1 step2\n", "wkchain2 step1\n", "wkchain2 step2\n", "wkchain1 step2 process\n", "wkchain2 step2 process\n", "wkchain1 step2 awaitable\n", "wkchain1 step3\n", " ctx=AttributesDict(awaitable=True, process={'value': 'value'})\n", "wkchain2 step2 awaitable\n", "wkchain2 step3\n", " ctx=AttributesDict(awaitable=True, process={'value': 'value'})\n" ] } ], "source": [ "wkchain1 = InterstepWorkChain({'name': 'wkchain1', 'process': True, 'awaitable': True})\n", "wkchain2 = InterstepWorkChain({'name': 'wkchain2', 'process': True, 'awaitable': True})\n", "\n", "async def execute():\n", " return await asyncio.gather(\n", " wkchain1.step_until_terminated(),\n", " wkchain2.step_until_terminated()\n", " )\n", "\n", "output = plumpy.get_event_loop().run_until_complete(execute())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Persistence" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A process can be saved as a checkpoint by a {py:class}`~plumpy.persistence.Persister` implementation,\n", "recording enough information to fully recreate the current instance state.\n", "Out-of-the-box implemented persisters are the {py:class}`~plumpy.persistence.InMemoryPersister` and {py:class}`~plumpy.persistence.PicklePersister`.\n", "\n", "Below is a toy implementation, but they are primarily used as part of the `Controller` functionality described later." ] }, { "cell_type": "code", "execution_count": 163, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PersistWorkChain(ctx=AttributesDict(step=3))" ] }, "execution_count": 163, "metadata": {}, "output_type": "execute_result" } ], "source": [ "persister = plumpy.InMemoryPersister()\n", "\n", "class PersistWorkChain(plumpy.WorkChain):\n", "\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.outline(\n", " cls.init_step,\n", " cls.step2,\n", " cls.step3,\n", " )\n", " \n", " def __repr__(self):\n", " return f\"PersistWorkChain(ctx={self.ctx})\"\n", " \n", " def init_step(self):\n", " self.ctx.step = 1\n", " persister.save_checkpoint(self, 'init')\n", "\n", " def step2(self):\n", " self.ctx.step += 1\n", " persister.save_checkpoint(self, 'step2')\n", "\n", " def step3(self):\n", " self.ctx.step += 1\n", " persister.save_checkpoint(self, 'step3')\n", " \n", "workchain = PersistWorkChain()\n", "workchain.execute()\n", "workchain" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The persister now contains three checkpoints:" ] }, { "cell_type": "code", "execution_count": 164, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='init'),\n", " PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='step2'),\n", " PersistedCheckpoint(pid=UUID('9ceea019-8aef-4f23-aaf1-d31ca2a16773'), tag='step3')]" ] }, "execution_count": 164, "metadata": {}, "output_type": "execute_result" } ], "source": [ "persister.get_checkpoints()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Checkpoints can be accessed *via* the processes PID and the given checkpoint tag. This returns a `Bundle`, which can be used to recreate the workchain instance state at the point of the checkpoint:" ] }, { "cell_type": "code", "execution_count": 168, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PersistWorkChain(ctx=AttributesDict(step=2))" ] }, "execution_count": 168, "metadata": {}, "output_type": "execute_result" } ], "source": [ "workchain_checkpoint = persister.load_checkpoint(workchain.pid, 'step2').unbundle()\n", "workchain_checkpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Communicators and Controllers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`kiwipy` communicators can send messages to subscribers, to launch or control processes.\n", "Messages take one of three forms:\n", "\n", "Tasks are one to many messages.\n", ": This means that you sent out a task to a queue and there can be zero or more subscribers attached one of which will process the task when it is ready.\n", ": The result of the task can optionally be delivered to the sender.\n", "\n", "A Remote Procedure Call (RPC) is one-to-one.\n", ": This is used when you want to call a particular remote function/method and (usually) expect an immediate response.\n", ": For example imagine asking a remote process to pause itself.\n", ": Here you would make a RPC and wait to get confirmation that it has, indeed, paused.\n", "\n", "\n", "Broadcasters send messages to to zero or more consumers.\n", ": These are fire and forget calls that broadcast a message to anyone who is listening.\n", ": Consumers may optionally apply a filter to only receive messages that match some criteria." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a simple example, a process can take a communicator on initiation.\n", "On process creation, this will add the process to the communicator as an RPC and broadcast subscriber,\n", "the communicator can then send RPC messages *via* a processes PID (as a string) or broadcast to all subscribed processes.\n", "Four message types are available:\n", "\n", "- `STATUS` (RPC only)\n", "- `KILL`\n", "- `PAUSE`\n", "- `PLAY`\n", "\n", "\n", "```{important}\n", "On process termination/close it will remove the process subscriptions to the communicator.\n", "```" ] }, { "cell_type": "code", "execution_count": 250, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'ctime': 1610495402.981895,\n", " 'paused': False,\n", " 'process_string': ' (ProcessState.CREATED)',\n", " 'state': ,\n", " 'state_info': 'ProcessState.CREATED'}\n" ] } ], "source": [ "communicator = kiwipy.LocalCommunicator()\n", "\n", "class SimpleProcess(plumpy.Process):\n", " pass\n", "\n", "process = SimpleProcess(communicator=communicator)\n", "\n", "pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A `ProcessLauncher` can be subscribed to a communicator, for process launch and continue tasks.\n", "Process controllers can then wrap a communicator, to provide an improved interface." ] }, { "cell_type": "code", "execution_count": 332, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'result': 11}\n", "659b8b63-42a9-44a7-978f-07c6a2887f7e\n" ] } ], "source": [ "class ControlledWorkChain(plumpy.WorkChain):\n", "\n", " @classmethod\n", " def define(cls, spec):\n", " super().define(spec)\n", " spec.input('steps', valid_type=int, default=10)\n", " spec.output('result', valid_type=int)\n", "\n", " spec.outline(\n", " cls.init_step,\n", " plumpy.while_(cls.while_step)(cls.loop_step),\n", " cls.final_step\n", " )\n", " \n", " def init_step(self):\n", " self.ctx.iterator = 0\n", "\n", " def while_step(self):\n", " return (self.ctx.iterator <= self.inputs.steps)\n", " \n", " def loop_step(self):\n", " self.ctx.iterator += 1\n", "\n", " def final_step(self):\n", " self.out('result', self.ctx.iterator)\n", "\n", "loop_communicator = plumpy.wrap_communicator(kiwipy.LocalCommunicator())\n", "loop_communicator.add_task_subscriber(plumpy.ProcessLauncher())\n", "controller = plumpy.RemoteProcessController(loop_communicator)\n", "\n", "wkchain = ControlledWorkChain(communicator=loop_communicator)\n", " \n", "async def run_wait():\n", " return await controller.launch_process(ControlledWorkChain)\n", "\n", "async def run_nowait():\n", " return await controller.launch_process(ControlledWorkChain, nowait=True)\n", "\n", "print(plumpy.get_event_loop().run_until_complete(run_wait()))\n", "print(plumpy.get_event_loop().run_until_complete(run_nowait()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "more to come..." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.8" } }, "nbformat": 4, "nbformat_minor": 4 }