Scheduler

Workflows includes an optional scheduler. The scheduler is a long-running process that starts DAGs based on their crontab-like schedule in workflow.yaml, for example this schedule runs the DAG every minute:

description: Example DAG that runs every minute
schedule: '* * * * *'

The scheduler uses the cronitor library to parse cron expressions and the website crontab.guru is handy when writing them.

The scheduler manages DAG runs using two database models, and as a result requires a PostgreSQL database to function. The environment variable DATABASE_URL must be populated with a connection string in the form postgres://<username>:<password>@<host>:<port>/<db>. For example, if you have postgres running locally with trusted linux auth, you can create a database for workflows to use and start the scheduler like this:

createdb workflows
export DATABASE_URL=postgres://localhost/workflows
workflows <config> schedule worker

The scheduler uses the Django ORM to interact with the database. On every start, it runs any new database migrations. The two models used are dag_run and task_run which track the execution status of DAGs and tasks respectively. The scheduler creates a new dag_run when the cron schedule time is reached; if a scheduled time was missed when the scheduler was not running one (and only one) run will be created.

The scheduler then iterates through the DAG runs with status running, considering only the earliest run for each individual DAG. This means parallel executions of the same DAG are not possible. For each task in the DAG for which all the input tasks within the same DAG were successful, the scheduler creates a task_run record. The scheduler then calls a task manager to start and monitor task executions. The oldest task run is started first, and failed tasks are retried two times.

There are two task manager implementations which provide different methods of execution and users can write additional implementations by subclassing the TaskManager abstract base class and implementing the required methods. When starting the scheduler the desired task manager is specified as the third argument on the command line. The two included managers are worker and kubernetes. For example:

workflows <config> schedule <worker|kubernetes>

Worker Task Manager

The worker task manager executes tasks in a sub-process. Each worker executes only one task at a time, but multiple workers (ie scheduler instances) can be run to provide the desired concurrency.

Kubernetes Task Manager

The kubernetes task manager executes tasks by scheduling pods onto a Kubernetes cluster. It uses the Kubernetes API via the official python library to do so.

There are a number of configuration options to configure how pods are created. See managers.Kubernetes for full details on how the pod is created.

kube_in_cluster

If true, the manager will use the kubernetes API of the cluster it is running inside of. If false, the manager will use a kubernetes config file to determine which cluster API to connect to.

kube_namespace

The kubernetes namespace to schedule the pod into.

kube_image

The container image and tag to use on in the pod.

kube_command

The command, if any, to provide to the pod container. The arguments will always be a workflows <config> run <dag> -o <task> . This can be used to wrap the execution, for example retrieving secrets using secrets-init

kube_env

Dictionary of environment variables to set in the pod container.

kube_<requests/limits>_<cpu/memory>

Set the CPU or memory requests or limits to control container resource usage.

To specify these in your config.py, set them on the DagRegistry like this:

DAGS = dags.register_directory(os.path.dirname(__file__))
DAGS.worker_config = {
    "kube_in_cluster": bool(os.environ.get("KUBERNETES_PORT")),
    "kube_image": "my_project:latest",
    "kube_env": {
        "SLACK_WEBHOOK": "https://some/secret/url",
    },
}

Pod settings can be overridden at the DAG and task levels if they differ from the registry’s global setting. To set them there, provide the key in the DAG workflow.yaml file or the task YAML header like this:

worker_config:
    kube_limits_cpu: 200m
    kube_limits_memory: 1Gi

The kubernetes manager needs permission to create pods. A common pattern is to grant permissions to the default service account in a namespace used for running the scheduler:

kubectl create role edit-pods -n <namespace> --verb='*' --resource=pods
kubectl create rolebinding edit-pods-binding -n <namespace> --role=edit-pods --serviceaccount=<namespace>:default

Lastly, run the scheduler as a deployment in your kubernetes cluster. The command is:

workflows <config> schedule kubernetes

Manual triggers

A DAG run can be triggered manually, outside of a schedule, using Python or via the command line. The Python API takes as arguments the dag object and requires the creator as a keyword. Parameters may be passed as a keyword but is not required:

from workflows import scheduler

scheduler.trigger_dag(dag, creator="some process")

DAGs can also be triggered via the CLI, with optional parameters to be stored:

workflows <config> trigger <dag> [-p VAR=1 ...]