dbt Cloud Operators¶
These operators can execute dbt Cloud jobs, poll for status of a currently-executing job, and download run artifacts locally.
Each of the operators can be tied to a specific dbt Cloud Account in two ways:
Explicitly provide the Account ID (via the
account_id
parameter) to the operator.Or, specify the dbt Cloud Account in the Airflow Connection. The operators will fallback to using this automatically if the Account ID is not passed to the operator.
Trigger a dbt Cloud Job¶
Use the DbtCloudRunJobOperator
to trigger a run of a dbt
Cloud job. By default, the operator will periodically check on the status of the executed job to terminate
with a successful status every check_interval
seconds or until the job reaches a timeout
length of
execution time. This functionality is controlled by the wait_for_termination
parameter. Alternatively,
wait_for_termination
can be set to False to perform an asynchronous wait (typically paired with the
DbtCloudJobRunSensor
). Setting wait_for_termination
to
False is a good approach for long-running dbt Cloud jobs.
The deferrable
parameter along with wait_for_termination
will control the functionality
whether to poll the job status on the worker or defer using the Triggerer.
When wait_for_termination
is True and deferrable
is False,we submit the job and poll
for its status on the worker. This will keep the worker slot occupied till the job execution is done.
When wait_for_termination
is True and deferrable
is True,
we submit the job and defer
using Triggerer. This will release the worker slot leading to savings in
resource utilization while the job is running.
When wait_for_termination
is False and deferrable
is False, we just submit the job and can only
track the job status with the DbtCloudJobRunSensor
.
While schema_override
and steps_override
are explicit, optional parameters for the
DbtCloudRunJobOperator
, custom run configurations can also be passed to the operator using the
additional_run_config
dictionary. This parameter can be used to initialize additional runtime
configurations or overrides for the job run such as threads_override
, generate_docs_override
,
git_branch
, etc. For a complete list of the other configurations that can used at runtime, reference the
API documentation.
The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks with both synchronous and
asynchronous waiting for run termination, respectively. To note, the account_id
for the operators is
referenced within the default_args
of the example DAG.
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
This next example also shows how to pass in custom runtime configuration (in this case for threads_override
)
via the additional_run_config
dictionary.
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
Poll for status of a dbt Cloud Job run¶
Use the DbtCloudJobRunSensor
to periodically retrieve the
status of a dbt Cloud job run and check whether the run has succeeded. This sensor provides all of the same
functionality available with the BaseSensorOperator
.
In the example below, the run_id
value in the example below comes from the output of a previous
DbtCloudRunJobOperator task by utilizing the .output
property exposed for all operators. Also, to note,
the account_id
for the task is referenced within the default_args
of the example DAG.
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
Also you can use deferrable mode in this sensor if you would like to free up the worker slots while the sensor is running.
job_run_sensor_defered = DbtCloudJobRunSensor(
task_id="job_run_sensor_defered", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)
Poll for status of a dbt Cloud Job run asynchronously¶
Note
DbtCloudJobRunAsyncSensor
is deprecated and will be removed in a future release. Please use DbtCloudJobRunSensor
and use the deferrable mode in that operator.
job_run_async_sensor = DbtCloudJobRunAsyncSensor(
task_id="job_run_async_sensor", run_id=trigger_job_run2.output, timeout=20
)
Download run artifacts¶
Use the DbtCloudGetJobRunArtifactOperator
to download
dbt-generated artifacts for a dbt Cloud job run. The specified path
value should be rooted at the
target/
directory. Typical artifacts include manifest.json
, catalog.json
, and
run_results.json
, but other artifacts such as raw SQL of models or sources.json
can also be
downloaded.
For more information on dbt Cloud artifacts, reference this documentation.
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
List jobs¶
Use the DbtCloudListJobsOperator
to list
all jobs tied to a specified dbt Cloud account. The account_id
must be supplied either
through the connection or supplied as a parameter to the task.
If a project_id
is supplied, only jobs pertaining to this project id will be retrieved.
For more information on dbt Cloud list jobs, reference this documentation.
list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)