Skip to main content

OpenFlow Spec

OpenFlow is an open standard for defining "Flows". Flows are directed graphs in which every node represents a step of computation. In other words, it is a declarative language for chaining scripts.

Windmill is the open-source reference implementation for it, providing an UI to build flows and a highly scalable executors. However everyone is welcome to build upon it and to develop new UIs that target OpenFlow, or create new executors.

Flows can be shared and showcased on the WindmillHub. To see an example of an OpenFlow in practice, go to WindmillHub and pick a flow (e.g Whenever an HN message contains a mention, publish it to slack), then Flow -> JSON.

OpenFlow

We provide an OpenAPI/Swagger definition file for the spec, it is hosted within the GitHub repository here.

We will use a TypeScript equivalent of the OpenAPI definition for ease of readability throughout the rest of the document.

OpenFlow is portable and its root object is defined as follows:

type OpenFlow = {
summary: string;
description?: string;
value: FlowValue;
schema?: any;
};

It contains a short line summary, a description, a schema which is the JSONSchema that constraints the JSON it takes as an input. FlowValue is where the logic of the flow is actually defined:

FlowValue

type FlowValue = {
modules: Array<FlowModule>;
failure_module?: FlowModule;
retry?: {
constant?: {
attempts: integer;
seconds: integer;
};
exponential?: {
attempts: integer;
multiplier: integer;
seconds: integer;
};
};
};

A Flow is just a sequence of modules, and an optional failure module that will be triggered to handle a failure at any point of the flow (think try/catch in terms of programming languages).

Retry sets the retry policy for the flow. It is optional and is reset on every successful run:

  • constant retry N times after a seconds delay.
  • exponential applies exponential backoff duration increase in between every retry. If all the retries are exhausted, the failure module, if any, is called.

FlowModule

An OpenFlow module is defined as follows:

type FlowModule = {
summary?: string;
input_transforms: Record<string, StaticTransform | JavascriptTransform>;
value: RawScript | PathScript | ForloopFlow;
stop_after_if?: { expr: string; skip_if_stopped: boolean };
suspend?: integer;
};

type StaticTransform = {
type: "static";
value?: any;
};

type JavascriptTransform = {
type: "javascript";
expr: string;
};

type RawScript = {
type: "rawscript";
content: string;
language: "deno" | "python3";
path?: string;
};

type PathScript = {
type: "script";
path: string;
};

type ForloopFlow = {
type: "forloopflow";
modules: Array<FlowModule>;
iterator: InputTransform;
skip_failures?: boolean;
};

Input transforms

Modules contain input_transforms, which is a mapping between fields (i.e. input of the module) to either a static JSON value, or a raw JavaScript expression.

The input_transforms is the way to do the piping from any other previous steps, variable, or resources to one of the inputs of your script/module. Since it is actual JavaScript (although a restricted JavaScript, for example, fetch is limited to getting secrets and variables), it is very flexible.

One interesting pattern that is allows is that you can compose complex strings directly from there so you could imagine composing your email body or SQL query directly using string interpolation and populating with previous previous result. The Windmill Editor makes it very easy to do so using the properties picker:

Prop picker

Conditional stop after

There's also the stop_after_if optional object

type stop_after_if = {
expr: string;
skip_if_stopped: boolean;
suspend?: integer;
};

If present:

  • stop_after_if.expr: Evaluate a JavaScript expression that takes the result as an input to decide if the flow should stop there. Useful to stop a flow that is meant to watch for changes if there are no changes.
  • stop_after_if.skip_if_stopped: Used to flag failed runs as skippable. It is useful in the context of flows being triggered very often to watch for changes as you might want to ignore the runs that have been skipped.
  • optional suspend value is a non-negative integer that determines the number of events (resume messages) needed to progress to the next step in the flow. This is useful for inserting user input during a flow's execution. Such as approving or disapproving a flow in response to an email or slack notification sent by a job in the flow.

Resume messages are sent to https://app.windmill.dev/w/<WORKSPACE>/jobs/resume/<JOB_ID> as POST or GET requests.
Requests must have JSON payload, either as the request body (with Content-Type: application/json header) for POST requests, or as the value to the payload query parameter as a base64url encoded JSON value (?payload=${base64url_encoded_json}) for GET requests.

When enough resume messages are received, the next job starts with its input_transforms evaluated with two notable variables in scope:

  • resume: the payload from the most recent resume message received.
  • resumes: a list of payloads from all resume messages received in the order they were received - most recent at the end of the list.

Alternatively, a job can be immediately cancelled by a request to a similar endpoint at ../jobs/cancel/... In this case, the flow will quit, with the cancellation payload as the result, without retrying or running further steps or the failure modules.

Value

Now let's see how how the Module value is itself defined.

There are 3 kinds of module currently (version 1.35.0):

  • rawscript: Embed a full Deno or JavaScript script inside the flow. Useful for ad-hoc scripts.
  • script: When you can refer to a script by its path (including a path to the hub using the hub/ prefix)
  • forloopflow: Trigger for-loops that will iterate over a list and trigger one flow per element. The list is built evaluating the JavaScript expression inside iterator taking result as an input being the result of the previous module.

In Windmill, most flows use the iterator result and expect the previous step to return a list. Flows triggered inside other Flows, take as an input the embedding flow's inputs. The inputs are extended with iter.value and iter.index as respectively the value being iterated and its corresponding index.

Et voilà, we have completed our tour of OpenFlow.