Using workflows

Install the workflows library using pip:

pip install dag-workflows

Configuring your project

To use workflows, you need a folder for your DAGs. It should contain one sub-folder for each individual DAG. These sub-folders contain files representing the individual tasks within that DAG. You can also put a tests folder inside your DAG, and a config file in the top-level folder. For example:

dags/
  trader_bot/
    tests/
      test_get_stock_prices.py
    calculate_trades.sql
    get_stock_prices.py
    workflow.yaml
  config.py

The workflow.yaml file should contain metadata about that DAG:

description: Calculate something

The config.py file tells workflows where to look for your DAGs, and should set a module variable DAGS:

import os
import workflows.dags

# register the DAGs folder
DAGS = workflows.dags.register_directory(os.path.dirname(__file__))

Writing tasks and DAGs

The individual task files can be .sql, .py, or .yaml files with a YAML header that defines the task. A SQL task for example looks like this:

/*
description: Make a table
type: sql
inputs: []
 */
CREATE TABLE IF NOT EXISTS example AS
SELECT 'test' AS name, 1 AS value;

And a Python task should have a run method that looks like this:

"""
description: Copy a file
type: python
inputs: []
"""
import shutil

def run(context):
    shutil.copy('fileA', 'fileB')

For more information on the different task types, see Task types.

Defining inputs and outputs

The task metadata YAML has fields for inputs and outputs. The inputs field defines what tasks are upstream and should run before the this task. The format of the inputs is a list of task names. Tasks from a different DAG are prefixed by the DAG name and a : before the task name. For example:

inputs:
- task1.py
- dag2:task1.py

The outputs field defines what is created by the task. There are some output types provided with workflows but you can write your own. Outputs are useful for running validation against the task results and for providing testing helpers.

For more information on the different output types, see Output types.

Executing DAGs

You can run your DAGs from the command line. The interface is:

workflows <config module> run <dag name> [-s start_task -e end_task -o only_task]

For example:

workflows example_project.config run trader_bot

Writing tests

If you are using pytest and django there is a workflows test fixture that makes writing tests easier. It uses pytest-django to setup your test database, and then provides four helpers methods:

  • setup_test_inputs: creates tables for upstream tasks and populates them with data

  • execute_task: runs a single task and returns the rows created

  • execute_dag: runs multiple tasks within a DAG

  • execute_sql: executes a SQL statement and returns the rows, if any

For example:

from example_project import config

def test_calculate_trades(workflows):
    task = config.DAGS["trader_bot"].get_task("calculate_trades.sql")
    initial_data = {
        "stock_prices": [{"date": "2020-01-01", "close": 34}, {"date": "2020-01-02", "close": 43}],
        "limits": [{"max_bid": 100}],
    }
    workflows.setup_task_inputs(task, initial_data)
    results = workflows.execute_task(task)
    assert len(results["new_trades"]) == 2