DatabricksSubmitRunOperator¶
使用 DatabricksSubmitRunOperator
通過 Databricks api/2.1/jobs/runs/submit API 端點提交新的 Databricks 任務。
使用運算子¶
有三種方法可以實例化此運算子。第一種方法是採用您通常用於呼叫 api/2.1/jobs/runs/submit
端點的 JSON 酬載,並通過 json
參數直接傳遞給我們的 DatabricksSubmitRunOperator
。通過這種方法,您可以完全控制底層酬載到 Jobs REST API,包括執行具有多個任務的 Databricks 任務,但由於缺乏類型檢查,因此更難檢測錯誤。
json = {
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)
完成相同操作的第二種方法是直接使用 DatabricksSubmitRunOperator
的命名參數。請注意,runs/submit
端點中的每個頂層參數都有一個命名參數。使用命名參數時,您必須指定以下內容
任務規格 - 應為以下之一
spark_jar_task
- JAR 任務的主類別和參數notebook_task
- 任務的筆記本路徑和參數spark_python_task
- python 檔案路徑和用於執行 python 檔案的參數spark_submit_task
- 執行spark-submit
命令所需的參數pipeline_task
- 執行 Delta Live Tables 管線所需的參數dbt_task
- 執行 dbt 專案所需的參數
叢集規格 - 應為以下之一:*
new_cluster
- 將在其上執行此任務的新叢集的規格 *existing_cluster_id
- 將在其上執行此任務的現有叢集的 IDpipeline_task
- 可能指pipeline_id
或pipeline_name
如果在同時提供 json 參數和命名參數的情況下,它們將合併在一起。如果在合併期間發生衝突,則命名參數將優先並覆蓋頂層 json
鍵。
- 目前
DatabricksSubmitRunOperator
支援的命名參數為 spark_jar_task
notebook_task
spark_python_task
spark_submit_task
pipeline_task
dbt_task
git_source
new_cluster
existing_cluster_id
libraries
run_name
timeout_seconds
new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
"notebook_path": "/Users/airflow@example.com/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)
另一種方法是使用 param tasks 傳遞物件陣列以實例化此運算子。此處用於調用 api/2.1/jobs/runs/submit
端點的 tasks 參數值通過 DatabricksSubmitRunOperator
中的 tasks
參數傳遞。您可以傳遞任務陣列並提交一次性執行,而不是調用單個任務。
tasks = [
{
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {"notebook_path": "/Users/airflow@example.com/PrepareData"},
}
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)
範例¶
以 JSON 格式指定參數¶
DatabricksSubmitRunOperator 的範例用法如下
# Example of using the JSON parameter to initialize the operator.
new_cluster = {
"spark_version": "9.1.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {"availability": "ON_DEMAND"},
"num_workers": 8,
}
notebook_task_params = {
"new_cluster": new_cluster,
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
使用命名參數¶
您也可以使用命名參數來初始化運算子並執行任務。
# Example of using the named parameters of DatabricksSubmitRunOperator
# to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id="spark_jar_task",
new_cluster=new_cluster,
spark_jar_task={"main_class_name": "com.example.ProcessData"},
libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
)
DatabricksSubmitRunDeferrableOperator¶
DatabricksSubmitRunOperator
運算子的可延遲版本。
它允許更有效地利用 Airflow 工作人員,使用 Airflow 2.2.0 中引入的新功能