Scheduler¶
Workflows includes an optional scheduler. The scheduler is a long-running process that
starts DAGs based on their crontab-like schedule in workflow.yaml, for example
this schedule runs the DAG every minute:
description: Example DAG that runs every minute
schedule: '* * * * *'
The scheduler uses the cronitor library to parse cron expressions and the website crontab.guru is handy when writing them.
The scheduler manages DAG runs using two database models, and as a result requires a
PostgreSQL database to function. The environment variable DATABASE_URL must be
populated with a connection string in the form
postgres://<username>:<password>@<host>:<port>/<db>. For example, if you have
postgres running locally with trusted linux auth, you can create a database for
workflows to use and start the scheduler like this:
createdb workflows
export DATABASE_URL=postgres://localhost/workflows
workflows <config> schedule worker
The scheduler uses the Django ORM to interact with the database. On every start,
it runs any new database migrations. The two models used are dag_run and task_run
which track the execution status of DAGs and tasks respectively. The scheduler creates
a new dag_run when the cron schedule time is reached; if a scheduled time was
missed when the scheduler was not running one (and only one) run will be created.
The scheduler then iterates through the DAG runs with status running, considering only
the earliest run for each individual DAG. This means parallel executions of the same
DAG are not possible. For each task in the DAG for which all the input tasks within
the same DAG were successful, the scheduler creates a task_run record.
The scheduler then calls a task manager to start and monitor task executions. The oldest
task run is started first, and failed tasks are retried two times.
There are two task manager implementations which provide different methods of execution
and users can write additional implementations by subclassing the TaskManager
abstract base class and implementing the required methods. When starting the scheduler
the desired task manager is specified as the third argument on the command line.
The two included managers are worker and kubernetes. For example:
workflows <config> schedule <worker|kubernetes>
Worker Task Manager¶
The worker task manager executes tasks in a sub-process. Each worker executes only one task at a time, but multiple workers (ie scheduler instances) can be run to provide the desired concurrency.
Kubernetes Task Manager¶
The kubernetes task manager executes tasks by scheduling pods onto a Kubernetes cluster. It uses the Kubernetes API via the official python library to do so.
There are a number of configuration options to configure how pods are created. See
managers.Kubernetes for full details on how the pod is created.
- kube_in_cluster
If true, the manager will use the kubernetes API of the cluster it is running inside of. If false, the manager will use a kubernetes config file to determine which cluster API to connect to.
- kube_namespace
The kubernetes namespace to schedule the pod into.
- kube_image
The container image and tag to use on in the pod.
- kube_command
The command, if any, to provide to the pod container. The arguments will always be a
workflows <config> run <dag> -o <task>. This can be used to wrap the execution, for example retrieving secrets using secrets-init- kube_env
Dictionary of environment variables to set in the pod container.
- kube_<requests/limits>_<cpu/memory>
Set the CPU or memory requests or limits to control container resource usage.
To specify these in your config.py, set them on the DagRegistry like this:
DAGS = dags.register_directory(os.path.dirname(__file__))
DAGS.worker_config = {
"kube_in_cluster": bool(os.environ.get("KUBERNETES_PORT")),
"kube_image": "my_project:latest",
"kube_env": {
"SLACK_WEBHOOK": "https://some/secret/url",
},
}
Pod settings can be overridden at the DAG and task levels if they differ from the registry’s
global setting. To set them there, provide the key in the DAG workflow.yaml file or the task
YAML header like this:
worker_config:
kube_limits_cpu: 200m
kube_limits_memory: 1Gi
The kubernetes manager needs permission to create pods. A common pattern is to grant permissions to the default service account in a namespace used for running the scheduler:
kubectl create role edit-pods -n <namespace> --verb='*' --resource=pods
kubectl create rolebinding edit-pods-binding -n <namespace> --role=edit-pods --serviceaccount=<namespace>:default
Lastly, run the scheduler as a deployment in your kubernetes cluster. The command is:
workflows <config> schedule kubernetes
Manual triggers¶
A DAG run can be triggered manually, outside of a schedule, using Python or via the command line.
The Python API takes as arguments the dag object and requires the creator as a keyword. Parameters
may be passed as a keyword but is not required:
from workflows import scheduler
scheduler.trigger_dag(dag, creator="some process")
DAGs can also be triggered via the CLI, with optional parameters to be stored:
workflows <config> trigger <dag> [-p VAR=1 ...]