Task types

Workflows includes a number of task types which are described in more detail here. In addition, you can define custom task types. For an introduction to writing tasks, see Writing tasks and DAGs.

All tasks must define the following fields in their YAML:

type

The snake-case version of the task class, ie BigQuerySql is referenced as big_query_sql.

description

Documentation describing what the task does.

inputs

A list of other tasks which should run before this task. Tasks in other DAGs are referenced by prefixing them with the DAG name, like other_dag:task1.py

The following are optional fields. Task types can also define additional fields.

outputs

A list of output objects, each of which defines the output type, name, etc. See Output types.

parameters

An list of environment variables required by the task. At runtime this list is merged with the Task’s class_parameters to make a dict of parameters -> environment variable values (ie a subset of os.environ) which is passed to the Task.execute() method. If a parameter is not provided by an environment variable at runtime, workflows will raise an exception before starting execution.

Sql

The SQL task type executes the SQL in the file against the database you provide in environment variable DATABASE_URL. The SQL is executed in a transaction, along with any validation queries on the outputs, so if there is an error the transaction is rolled back and the changes are not committed.

Python

The Python task type executes the function named run in your module. The module is imported with importlib at runtime. You can import other modules and you can test your Python code using standard techniques.

A single parameter context is passed to your run method. It is a dictionary of the environment variables that were listed in the parameters field. If DATABASE_URL was in the parameters list, an additional key conn is passed in the context with an database connection in an open transaction.

BigQuerySql

The BigQuery SQL task runs the provided SQL in BigQuery. Unlike the regular SQL task, there is no concept of a transaction in BigQuery so changes are not rolled back in the case of an exception. This task requires environment variable GCP_PROJECT to determine which project the query should be billed against and the default table search path. The task needs to run in an environment where Google application default credentials can be used for authentication. To configure credentials, run:

gcloud auth application-default login

Writing your own

To make additional task types, subclass the workflows.bases.Task class and implement the execute() method. Your execute method should call validate; this is done inside execute so additional context like an open database transaction can be passed to the output classes.

The task can define additional dataclasses attributes to be specified in the YAML, but they must be optional attributes because of the way Python’s dataclasses work. You can instead check the value in a __post_init__ method. The task can define required environment variables by setting the class_parameters attribute.

For .sql files, the text of the file is stored in an attribute named sql. For .py files, the run method defined in the module is stored in an attribute named run.

This is a simplified implementation of the Python task type:

import dataclasses
from workflows import bases

@dataclasses.dataclass
class Python(bases.Task):
    """A Python transform. Calls the run method."""

    run: typing.Callable

    def execute(self, context: dict):
        self.run(context)
        self.validate(context)

Changing the database engine

The function workflows.tasks.get_db_connection receives the DATABASE_URL connection string and returns a DB API compatible connection object. By default it uses the psycopg2 library to connect to a PostgreSQL database. You can override the method to use a different database engine like this:

import sqlite3

from workflows import tasks

tasks.get_db_connection = lambda database_url: sqlite3.connect(database_url)