回調函數

日誌記錄和監控的一個重要組成部分是使用任務回調函數,以便在給定任務或給定 DAG 中所有任務的狀態發生變化時採取行動。例如,您可能希望在某些任務失敗時發出警報,或者讓 DAG 中的最後一個任務在成功時調用回調函數。

注意

回調函數僅在任務狀態因 worker 執行而更改時才會被調用。 因此,通過命令列介面 (CLI) 或使用者介面 (UI) 設定的任務變更不會執行回調函數。

警告

回調函數在任務完成後執行。回調函數中的錯誤將顯示在排程器日誌中,而不是任務日誌中。預設情況下,排程器日誌不會顯示在 UI 中,而是可以在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log 中找到

回調函數類型

有五種類型的任務事件可以觸發回調函數

名稱

描述

on_success_callback

當任務成功時調用

on_failure_callback

當任務失敗時調用

sla_miss_callback

當任務錯過其定義的 SLA 時調用

on_retry_callback

當任務準備重試時調用

on_execute_callback

在任務開始執行前立即調用。

on_skipped_callback

當任務執行中且引發 AirflowSkipException 時調用。明確地說,如果任務由於 DAG 中的先前分支決策或導致執行跳過的觸發規則而未開始執行,以至於從未排程任務執行,則不會調用此回調函數。

範例

在以下範例中,任何任務中的失敗都會調用 task_failure_alert 函數,而最後一個任務中的成功會調用 dag_success_alert 函數

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

注意

從 Airflow 2.6.0 開始,回調函數現在支援回調函數列表,允許使用者指定多個函數在所需的事件中執行。只需在定義 DAG/任務回調函數時,將回調函數列表傳遞給回調函數參數即可:例如 on_failure_callback=[callback_func_1, callback_func_2]

完整的 context 中可用變數列表,請參閱文件程式碼

這個條目對您有幫助嗎?