回調函數¶
日誌記錄和監控的一個重要組成部分是使用任務回調函數,以便在給定任務或給定 DAG 中所有任務的狀態發生變化時採取行動。例如,您可能希望在某些任務失敗時發出警報,或者讓 DAG 中的最後一個任務在成功時調用回調函數。
警告
回調函數在任務完成後執行。回調函數中的錯誤將顯示在排程器日誌中,而不是任務日誌中。預設情況下,排程器日誌不會顯示在 UI 中,而是可以在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
中找到
回調函數類型¶
有五種類型的任務事件可以觸發回調函數
名稱 |
描述 |
---|---|
|
當任務成功時調用 |
|
當任務失敗時調用 |
|
當任務錯過其定義的 SLA 時調用 |
|
當任務準備重試時調用 |
|
在任務開始執行前立即調用。 |
|
當任務執行中且引發 AirflowSkipException 時調用。明確地說,如果任務由於 DAG 中的先前分支決策或導致執行跳過的觸發規則而未開始執行,以至於從未排程任務執行,則不會調用此回調函數。 |
範例¶
在以下範例中,任何任務中的失敗都會調用 task_failure_alert
函數,而最後一個任務中的成功會調用 dag_success_alert
函數
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
注意
從 Airflow 2.6.0 開始,回調函數現在支援回調函數列表,允許使用者指定多個函數在所需的事件中執行。只需在定義 DAG/任務回調函數時,將回調函數列表傳遞給回調函數參數即可:例如 on_failure_callback=[callback_func_1, callback_func_2]