Using workflows¶
Install the workflows library using pip:
pip install dag-workflows
Configuring your project¶
To use workflows, you need a folder for your DAGs. It should contain one sub-folder for each individual DAG. These sub-folders contain files representing the individual tasks within that DAG. You can also put a tests folder inside your DAG, and a config file in the top-level folder. For example:
dags/
trader_bot/
tests/
test_get_stock_prices.py
calculate_trades.sql
get_stock_prices.py
workflow.yaml
config.py
The workflow.yaml file should contain metadata about that DAG:
description: Calculate something
The config.py file tells workflows where to look for your DAGs, and should set a module
variable DAGS:
import os
import workflows.dags
# register the DAGs folder
DAGS = workflows.dags.register_directory(os.path.dirname(__file__))
Writing tasks and DAGs¶
The individual task files can be .sql, .py, or .yaml files with a YAML header that defines
the task. A SQL task for example looks like this:
/*
description: Make a table
type: sql
inputs: []
*/
CREATE TABLE IF NOT EXISTS example AS
SELECT 'test' AS name, 1 AS value;
And a Python task should have a run method that looks like this:
"""
description: Copy a file
type: python
inputs: []
"""
import shutil
def run(context):
shutil.copy('fileA', 'fileB')
For more information on the different task types, see Task types.
Defining inputs and outputs¶
The task metadata YAML has fields for inputs and outputs. The inputs field defines what tasks are upstream
and should run before the this task. The format of the inputs is a list of task names. Tasks from a
different DAG are prefixed by the DAG name and a : before
the task name. For example:
inputs:
- task1.py
- dag2:task1.py
The outputs field defines what is created by the task. There are some output types provided with workflows
but you can write your own. Outputs are useful for running validation against the task results and
for providing testing helpers.
For more information on the different output types, see Output types.
Executing DAGs¶
You can run your DAGs from the command line. The interface is:
workflows <config module> run <dag name> [-s start_task -e end_task -o only_task]
For example:
workflows example_project.config run trader_bot
Writing tests¶
If you are using pytest and django there is a workflows test fixture that makes writing tests easier.
It uses pytest-django to setup your test database, and then provides four helpers methods:
setup_test_inputs: creates tables for upstream tasks and populates them with dataexecute_task: runs a single task and returns the rows createdexecute_dag: runs multiple tasks within a DAGexecute_sql: executes a SQL statement and returns the rows, if any
For example:
from example_project import config
def test_calculate_trades(workflows):
task = config.DAGS["trader_bot"].get_task("calculate_trades.sql")
initial_data = {
"stock_prices": [{"date": "2020-01-01", "close": 34}, {"date": "2020-01-02", "close": 43}],
"limits": [{"max_bid": 100}],
}
workflows.setup_task_inputs(task, initial_data)
results = workflows.execute_task(task)
assert len(results["new_trades"]) == 2