Integrations¶
Hooks¶
You can register event hooks that run before and after DAGs and tasks execute. Event hooks should take four positional arguments like this:
def my_hook(event: bases.Event, dag: bases.DAG, task: bases.Task = None, exc: Exception = None):
<do something>
DAGS.register_hook(my_hook)
Slack hook¶
There is a convenience method for making a Slack task-failure event notification. It accepts
two arguments: the webhook URL to use and a message to send, which can include three format
string variables dag, task, exception which will be subsituted in the message:
from workflows import dags, hooks
DAGS = dags.register_directory('dags')
DAGS.register_hook(hooks.make_slack_notification_hook(
os.environ['SLACK_WEBHOOK'], "Task {dag}:{task} failed with exception: {exception}. "
f"Go to {os.environ['APP_DASHBOARD']} to see the full log output."
))
Executors¶
There are multiple ways to deploy workflows on your system. This section covers deployments that do not use the workflows scheduler and therefore require no external database.
To use the workflows scheduler see Scheduler.
Local¶
Workflows comes with a local, command line executor which runs the tasks in a sequence based on
the DAG dependency graph. If a task fails the entire execution will stop, even if other tasks
can run that don’t depend on that task. This is the workflows <config> run <dag> command
we have been using in the examples. The local executor can be triggered by cron or another system.
Airflow¶
Workflows can be integrated with Airflow by creating Airflow tasks for each workflows task. The below example make the tasks dynamically inside an Airflow DAG file:
import datetime
import airflow
import airflow.operators.python
from workflows.executor import execute_dag, get_sorted_tasks
from example_project import config
dag = airflow.DAG(
'trader_bot',
start_date=datetime.datetime(2021, 1, 1),
catchup=False
)
workflow = config.DAGS['trader_bot']
for task in get_sorted_tasks(workflow.graph):
operator = airflow.operators.python.PythonOperator(
task_id=task.name,
dag=dag,
python_callable=execute_dag,
op_kwargs={
"dag": workflow,
"start": task.name,
"end": task.name
}
)
for upstream in task.inputs:
if ":" in upstream:
continue # belongs to a different DAG
operator.set_upstream(dag.task_dict[upstream])
Kubernetes¶
For a lightweight integration with Kubernetes, you can run a DAG using a Job or CronJob resource:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: my-workflow
spec:
schedule: '30 8 * * *'
jobTemplate:
spec:
template:
spec:
containers:
- name: app
image: your_image
tag: latest
command:
- workflows
- your_project.config
- run
- your_dag
There is also a Kubernetes scheduler implementation which creates Kubernetes Pods for each task, allowing them to run in parallel. See see Scheduler.
Django¶
Workflows can be used with database libraries such as SQLAlchemy and the Django ORM. These libraries can simplify database operations but they are not required.
The workflows scheduler uses Django models internally but Django is not installed unless you request
the scheduler add-on. To use Django in your tasks, you should run the Django setup in the config.py
file before the individual task files are loaded like this:
import os, django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_app.settings")
django.setup()
Then, import and use your models in your Python task files.