DatabricksWorkflowTaskGroup

使用 DatabricksWorkflowTaskGroup 以啟動和監控 Databricks 筆記本作業,作為 Airflow 任務。此任務群組啟動 Databricks 工作流程 並從中執行筆記本作業,相較於在 DatabricksWorkflowTaskGroup 之外執行 DatabricksNotebookOperator,可降低 75% 的成本 (通用運算為 $0.40/DBU,Jobs 運算為 $0.07/DBU)。

在 Airflow 中定義 Databricks 工作流程有幾個優勢

撰寫介面

透過 Databricks (基於 Web 且具有 Databricks UI)

透過 Airflow (使用 Airflow DAG 的程式碼)

工作流程運算定價

筆記本程式碼在原始碼控制中

工作流程結構在原始碼控制中

從頭重試

重試單一任務

工作流程中的任務群組

從其他 DAG 觸發工作流程

工作流程層級參數

範例

DatabricksWorkflowTaskGroup 的 DAG 範例

tests/system/databricks/example_databricks_workflow.py[原始碼]

    task_group = DatabricksWorkflowTaskGroup(
        group_id=f"test_workflow_{USER}_{GROUP_ID}",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
        notebook_params={"ts": "{{ ts }}"},
        notebook_packages=[
            {
                "pypi": {
                    "package": "simplejson==3.18.0",  # Pin specification version of a package like this.
                    "repo": "https://pypi.org/simple",  # You can specify your required Pypi index here.
                }
            },
        ],
        extra_job_params={
            "email_notifications": {
                "on_start": [DATABRICKS_NOTIFICATION_EMAIL],
            },
        },
    )
    with task_group:
        notebook_1 = DatabricksNotebookOperator(
            task_id="workflow_notebook_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_1",
            notebook_packages=[{"pypi": {"package": "Faker"}}],
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            execution_timeout=timedelta(seconds=600),
        )

        notebook_2 = DatabricksNotebookOperator(
            task_id="workflow_notebook_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_2",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            notebook_params={"foo": "bar", "ds": "{{ ds }}"},
        )

        task_operator_nb_1 = DatabricksTaskOperator(
            task_id="nb_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            job_cluster_key="Shared_job_cluster",
            task_config={
                "notebook_task": {
                    "notebook_path": "/Shared/Notebook_1",
                    "source": "WORKSPACE",
                },
                "libraries": [
                    {"pypi": {"package": "Faker"}},
                ],
            },
        )

        sql_query = DatabricksTaskOperator(
            task_id="sql_query",
            databricks_conn_id=DATABRICKS_CONN_ID,
            task_config={
                "sql_task": {
                    "query": {
                        "query_id": QUERY_ID,
                    },
                    "warehouse_id": WAREHOUSE_ID,
                }
            },
        )

        notebook_1 >> notebook_2 >> task_operator_nb_1 >> sql_query

透過此範例,Airflow 將產生一個名為 <dag_name>.test_workflow_<USER>_<GROUP_ID> 的作業,該作業將執行任務 notebook_1,然後執行 notebook_2。如果作業尚不存在,則將在 Databricks 工作區中建立該作業。如果作業已存在,它將會更新以符合 DAG 中定義的工作流程。

下圖顯示 Airflow UI 中產生的 Databricks 工作流程 (基於上述範例)

../_images/databricks_workflow_task_group_airflow_graph_view.png

下方描繪了從 Airflow DAG 觸發的執行在 Databricks UI 中對應的 Databricks 工作流程

../_images/workflow_run_databricks_graph_view.png

為了盡量減少更新衝突,我們建議您盡可能將參數保留在 DatabricksWorkflowTaskGroupnotebook_params 中,而不是 DatabricksNotebookOperator 中。這是因為 DatabricksWorkflowTaskGroup 中的任務是在作業觸發時傳入的,不會修改作業定義。

此條目是否有幫助?