Task types¶
Workflows includes a number of task types which are described in more detail here. In addition, you can define custom task types. For an introduction to writing tasks, see Writing tasks and DAGs.
All tasks must define the following fields in their YAML:
- type
The snake-case version of the task class, ie BigQuerySql is referenced as
big_query_sql.- description
Documentation describing what the task does.
- inputs
A list of other tasks which should run before this task. Tasks in other DAGs are referenced by prefixing them with the DAG name, like
other_dag:task1.py
The following are optional fields. Task types can also define additional fields.
- outputs
A list of output objects, each of which defines the output type, name, etc. See Output types.
- parameters
An list of environment variables required by the task. At runtime this list is merged with the Task’s
class_parametersto make a dict of parameters -> environment variable values (ie a subset of os.environ) which is passed to theTask.execute()method. If a parameter is not provided by an environment variable at runtime, workflows will raise an exception before starting execution.
Sql¶
The SQL task type executes the SQL in the file against the database you provide in environment variable
DATABASE_URL. The SQL is executed in a transaction, along with any validation queries on the outputs,
so if there is an error the transaction is rolled back and the changes are not committed.
Python¶
The Python task type executes the function named run in your module. The module is imported with
importlib at runtime. You can import other modules and you can test your Python code using
standard techniques.
A single parameter context is passed to your run method. It is a dictionary of the environment
variables that were listed in the parameters field. If DATABASE_URL was in the parameters list, an
additional key conn is passed in the context with an database connection in an open transaction.
BigQuerySql¶
The BigQuery SQL task runs the provided SQL in BigQuery. Unlike the regular SQL task, there is no
concept of a transaction in BigQuery so changes are not rolled back in the case of an exception.
This task requires environment variable GCP_PROJECT to determine which project the query should
be billed against and the default table search path. The task needs to run in an environment where
Google application default credentials can be used for authentication. To configure credentials, run:
gcloud auth application-default login
Writing your own¶
To make additional task types, subclass the workflows.bases.Task class and implement the execute()
method. Your execute method should call validate; this is done inside execute so additional context
like an open database transaction can be passed to the output classes.
The task can define additional dataclasses attributes to be specified in the YAML, but they must be
optional attributes because of the way Python’s dataclasses work. You can instead check the value
in a __post_init__ method. The task can define required environment variables by setting the
class_parameters attribute.
For .sql files, the text of the file is stored in an attribute named sql. For .py files,
the run method defined in the module is stored in an attribute named run.
This is a simplified implementation of the Python task type:
import dataclasses
from workflows import bases
@dataclasses.dataclass
class Python(bases.Task):
"""A Python transform. Calls the run method."""
run: typing.Callable
def execute(self, context: dict):
self.run(context)
self.validate(context)
Changing the database engine¶
The function workflows.tasks.get_db_connection receives the DATABASE_URL connection
string and returns a DB API compatible connection object. By default it uses the psycopg2 library
to connect to a PostgreSQL database. You can override the method to use a different database engine like
this:
import sqlite3
from workflows import tasks
tasks.get_db_connection = lambda database_url: sqlite3.connect(database_url)