DatabricksSubmitRunOperator

使用 DatabricksSubmitRunOperator 通過 Databricks api/2.1/jobs/runs/submit API 端點提交新的 Databricks 任務。

使用運算子

有三種方法可以實例化此運算子。第一種方法是採用您通常用於呼叫 api/2.1/jobs/runs/submit 端點的 JSON 酬載,並通過 json 參數直接傳遞給我們的 DatabricksSubmitRunOperator。通過這種方法,您可以完全控制底層酬載到 Jobs REST API,包括執行具有多個任務的 Databricks 任務,但由於缺乏類型檢查,因此更難檢測錯誤。

json = {
    "new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
    "notebook_task": {
        "notebook_path": "/Users/airflow@example.com/PrepareData",
    },
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)

完成相同操作的第二種方法是直接使用 DatabricksSubmitRunOperator 的命名參數。請注意,runs/submit 端點中的每個頂層參數都有一個命名參數。使用命名參數時,您必須指定以下內容

  • 任務規格 - 應為以下之一

    • spark_jar_task - JAR 任務的主類別和參數

    • notebook_task - 任務的筆記本路徑和參數

    • spark_python_task - python 檔案路徑和用於執行 python 檔案的參數

    • spark_submit_task - 執行 spark-submit 命令所需的參數

    • pipeline_task - 執行 Delta Live Tables 管線所需的參數

    • dbt_task - 執行 dbt 專案所需的參數

  • 叢集規格 - 應為以下之一:* new_cluster - 將在其上執行此任務的新叢集的規格 * existing_cluster_id - 將在其上執行此任務的現有叢集的 ID

  • pipeline_task - 可能指 pipeline_idpipeline_name

如果在同時提供 json 參數命名參數的情況下,它們將合併在一起。如果在合併期間發生衝突,則命名參數將優先並覆蓋頂層 json 鍵。

目前 DatabricksSubmitRunOperator 支援的命名參數為
  • spark_jar_task

  • notebook_task

  • spark_python_task

  • spark_submit_task

  • pipeline_task

  • dbt_task

  • git_source

  • new_cluster

  • existing_cluster_id

  • libraries

  • run_name

  • timeout_seconds

new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
    "notebook_path": "/Users/airflow@example.com/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
    task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)

另一種方法是使用 param tasks 傳遞物件陣列以實例化此運算子。此處用於調用 api/2.1/jobs/runs/submit 端點的 tasks 參數值通過 DatabricksSubmitRunOperator 中的 tasks 參數傳遞。您可以傳遞任務陣列並提交一次性執行,而不是調用單個任務。

tasks = [
    {
        "new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
        "notebook_task": {"notebook_path": "/Users/airflow@example.com/PrepareData"},
    }
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)

範例

以 JSON 格式指定參數

DatabricksSubmitRunOperator 的範例用法如下

tests/system/databricks/example_databricks.py[原始碼]

    # Example of using the JSON parameter to initialize the operator.
    new_cluster = {
        "spark_version": "9.1.x-scala2.12",
        "node_type_id": "r3.xlarge",
        "aws_attributes": {"availability": "ON_DEMAND"},
        "num_workers": 8,
    }

    notebook_task_params = {
        "new_cluster": new_cluster,
        "notebook_task": {
            "notebook_path": "/Users/airflow@example.com/PrepareData",
        },
    }

    notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)

使用命名參數

您也可以使用命名參數來初始化運算子並執行任務。

tests/system/databricks/example_databricks.py[原始碼]

    # Example of using the named parameters of DatabricksSubmitRunOperator
    # to initialize the operator.
    spark_jar_task = DatabricksSubmitRunOperator(
        task_id="spark_jar_task",
        new_cluster=new_cluster,
        spark_jar_task={"main_class_name": "com.example.ProcessData"},
        libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
    )

DatabricksSubmitRunDeferrableOperator

DatabricksSubmitRunOperator 運算子的可延遲版本。

它允許更有效地利用 Airflow 工作人員,使用 Airflow 2.2.0 中引入的新功能

這個條目有幫助嗎?