跨 DAG 依賴關係

當兩個 DAG 具有依賴關係時,值得考慮將它們合併為單一 DAG,這樣通常更容易理解。Airflow 也為同一個 DAG 上的任務提供更好的依賴關係視覺化呈現。然而,有時將所有相關任務放在同一個 DAG 上並不實際。例如

  • 兩個 DAG 可能有不同的排程。例如,每週 DAG 可能有任務依賴於每日 DAG 上的其他任務。

  • 不同的團隊負責不同的 DAG,但這些 DAG 具有一些跨 DAG 依賴關係。

  • 任務可能依賴於同一個 DAG 上的另一個任務,但針對不同的 execution_date(資料間隔的開始)。

  • 針對在不同時間執行的任務使用 execution_delta,例如 execution_delta=timedelta(hours=1) 以檢查比目前任務早 1 小時執行的任務。

ExternalTaskSensor 可用於建立跨不同 DAG 的此類依賴關係。當與 ExternalTaskMarker 一起使用時,清除相依任務也可以跨不同 DAG 進行。

ExternalTaskSensor

使用 ExternalTaskSensor 使 DAG 上的任務等待不同 DAG 上針對特定 execution_date 的另一個任務。

ExternalTaskSensor 也提供選項來設定遠端 DAG 上的任務是否成功或失敗,透過 allowed_statesfailed_states 參數。

airflow/example_dags/example_external_task_marker_dag.py[原始碼]

child_task1 = ExternalTaskSensor(
    task_id="child_task1",
    external_dag_id=parent_dag.dag_id,
    external_task_id=parent_task.task_id,
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

此外,對於此操作,您可以使用 deferrable 模式的 sensor

tests/system/core/example_external_task_parent_deferrable.py[原始碼]

external_task_sensor = ExternalTaskSensor(
    task_id="parent_task_sensor",
    external_task_id="child_task",
    external_dag_id="child_dag",
    deferrable=True,
)

具有 task_group 依賴關係的 ExternalTaskSensor

此外,我們也可以使用 ExternalTaskSensor 使 DAG 上的任務等待不同 DAG 上針對特定 execution_date 的另一個 task_group

airflow/example_dags/example_external_task_marker_dag.py[原始碼]

child_task2 = ExternalTaskSensor(
    task_id="child_task2",
    external_dag_id=parent_dag.dag_id,
    external_task_group_id="parent_dag_task_group_id",
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

ExternalTaskMarker

如果希望每當清除 parent_dag 上的 parent_task 時,也應該清除 child_dag 上針對特定 execution_datechild_task1,則應使用 ExternalTaskMarker。請注意,只有在使用者清除 parent_task 時選擇「遞迴」時,才會清除 child_task1

airflow/example_dags/example_external_task_marker_dag.py[原始碼]

parent_task = ExternalTaskMarker(
    task_id="parent_task",
    external_dag_id="example_external_task_marker_child",
    external_task_id="child_task1",
)

這個條目對您有幫助嗎?