DatabricksCreateJobsOperator¶
使用 DatabricksCreateJobsOperator
來建立(或重設)Databricks 任務。此運算子依賴過去的 XCom 來記住已建立的 job_id
,以便使用此運算子重複呼叫將更新現有的任務,而不是建立新的任務。當與 DatabricksRunNowOperator 配對使用時,所有執行都會歸在 Databricks UI 中的同一個任務下。
使用運算子¶
有三種方法可以實例化此運算子。第一種方法是,您可以取得通常用於呼叫 api/2.1/jobs/create
端點的 JSON 酬載,並透過 json
參數直接將其傳遞給我們的 DatabricksCreateJobsOperator
。使用這種方法,您可以完全控制 Jobs REST API 的底層酬載,包括執行具有多個任務的 Databricks 任務,但由於缺少類型檢查,因此更難偵測錯誤。
完成相同事情的第二種方法是直接使用 DatabricksCreateJobsOperator
的具名參數。請注意,api/2.1/jobs/create
端點中的每個頂層參數都恰好有一個具名參數。
第三種方法是同時使用 json 參數和具名參數。它們將合併在一起。如果在合併期間發生衝突,則具名參數將優先並覆寫頂層 json
鍵。
- 目前
DatabricksCreateJobsOperator
支援的具名參數為 name
description
tags
tasks
job_clusters
email_notifications
webhook_notifications
notification_settings
timeout_seconds
schedule
max_concurrent_runs
git_source
access_control_list
範例¶
將參數指定為 JSON¶
DatabricksCreateJobsOperator 的範例用法如下
# Example of using the JSON parameter to initialize the operator.
job = {
"tasks": [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
],
"job_clusters": [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
],
}
jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)
使用具名參數¶
您也可以使用具名參數來初始化運算子並執行任務。
# Example of using the named parameters to initialize the operator.
tasks = [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
]
job_clusters = [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
jobs_create_named = DatabricksCreateJobsOperator(
task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
)
與 DatabricksRunNowOperator 配對使用¶
您可以使用 DatabricksCreateJobsOperator 在 return_value XCom 中傳回的 job_id
作為 DatabricksRunNowOperator 的引數來執行任務。
# Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
run_now = DatabricksRunNowOperator(
task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
)
jobs_create_named >> run_now