Integrations

Hooks

You can register event hooks that run before and after DAGs and tasks execute. Event hooks should take four positional arguments like this:

def my_hook(event: bases.Event, dag: bases.DAG, task: bases.Task = None, exc: Exception = None):
    <do something>

DAGS.register_hook(my_hook)

Slack hook

There is a convenience method for making a Slack task-failure event notification. It accepts two arguments: the webhook URL to use and a message to send, which can include three format string variables dag, task, exception which will be subsituted in the message:

from workflows import dags, hooks

DAGS = dags.register_directory('dags')
DAGS.register_hook(hooks.make_slack_notification_hook(
    os.environ['SLACK_WEBHOOK'], "Task {dag}:{task} failed with exception: {exception}. "
    f"Go to {os.environ['APP_DASHBOARD']} to see the full log output."
))

Executors

There are multiple ways to deploy workflows on your system. This section covers deployments that do not use the workflows scheduler and therefore require no external database.

To use the workflows scheduler see Scheduler.

Local

Workflows comes with a local, command line executor which runs the tasks in a sequence based on the DAG dependency graph. If a task fails the entire execution will stop, even if other tasks can run that don’t depend on that task. This is the workflows <config> run <dag> command we have been using in the examples. The local executor can be triggered by cron or another system.

Airflow

Workflows can be integrated with Airflow by creating Airflow tasks for each workflows task. The below example make the tasks dynamically inside an Airflow DAG file:

import datetime

import airflow
import airflow.operators.python
from workflows.executor import execute_dag, get_sorted_tasks

from example_project import config

dag = airflow.DAG(
    'trader_bot',
    start_date=datetime.datetime(2021, 1, 1),
    catchup=False
)
workflow = config.DAGS['trader_bot']

for task in get_sorted_tasks(workflow.graph):
    operator = airflow.operators.python.PythonOperator(
        task_id=task.name,
        dag=dag,
        python_callable=execute_dag,
        op_kwargs={
            "dag": workflow,
            "start": task.name,
            "end": task.name
        }
    )
    for upstream in task.inputs:
        if ":" in upstream:
            continue  # belongs to a different DAG
        operator.set_upstream(dag.task_dict[upstream])

Kubernetes

For a lightweight integration with Kubernetes, you can run a DAG using a Job or CronJob resource:

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: my-workflow
spec:
  schedule: '30 8 * * *'
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: app
            image: your_image
            tag: latest
            command:
            - workflows
            - your_project.config
            - run
            - your_dag

There is also a Kubernetes scheduler implementation which creates Kubernetes Pods for each task, allowing them to run in parallel. See see Scheduler.

Django

Workflows can be used with database libraries such as SQLAlchemy and the Django ORM. These libraries can simplify database operations but they are not required.

The workflows scheduler uses Django models internally but Django is not installed unless you request the scheduler add-on. To use Django in your tasks, you should run the Django setup in the config.py file before the individual task files are loaded like this:

import os, django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_app.settings")
django.setup()

Then, import and use your models in your Python task files.