DatabricksCreateJobsOperator¶
Use the DatabricksCreateJobsOperator
to create
(or reset) a Databricks job. This operator relies on past XComs to remember the job_id
that
was created so that repeated calls with this operator will update the existing job rather than
creating new ones. When paired with the DatabricksRunNowOperator all runs will fall under the same
job within the Databricks UI.
Using the Operator¶
There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
to call the api/2.1/jobs/create
endpoint and pass it directly to our DatabricksCreateJobsOperator
through the
json
parameter. With this approach you get full control over the underlying payload to Jobs REST API, including
execution of Databricks jobs with multiple tasks, but it’s harder to detect errors because of the lack of the type checking.
The second way to accomplish the same thing is to use the named parameters of the DatabricksCreateJobsOperator
directly. Note that there is exactly
one named parameter for each top level parameter in the api/2.1/jobs/create
endpoint.
The third way is to use both the json parameter AND the named parameters. They will be merged
together. If there are conflicts during the merge, the named parameters will take precedence and
override the top level json
keys.
- Currently the named parameters that
DatabricksCreateJobsOperator
supports are: name
tags
tasks
job_clusters
email_notifications
webhook_notifications
timeout_seconds
schedule
max_concurrent_runs
git_source
access_control_list
Examples¶
Specifying parameters as JSON¶
An example usage of the DatabricksCreateJobsOperator is as follows:
# Example of using the JSON parameter to initialize the operator.
job = {
"tasks": [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
],
"job_clusters": [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
],
}
jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)
Using named parameters¶
You can also use named parameters to initialize the operator and run the job.
# Example of using the named parameters to initialize the operator.
tasks = [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
]
job_clusters = [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
jobs_create_named = DatabricksCreateJobsOperator(
task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
)
Pairing with DatabricksRunNowOperator¶
You can use the job_id
that is returned by the DatabricksCreateJobsOperator in the
return_value XCom as an argument to the DatabricksRunNowOperator to run the job.
# Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
run_now = DatabricksRunNowOperator(
task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
)
jobs_create_named >> run_now