Airflow 的 Listener 外掛程式¶
Airflow 具有允許新增 listener 以使用外掛程式監控和追蹤任務狀態的功能。
這是一個簡單的 Airflow listener 外掛程式範例,有助於追蹤任務狀態並收集關於任務、DAG 執行和 DAG 的實用元數據資訊。
這是一個 Airflow 的外掛程式範例,允許建立 Airflow 的 listener 外掛程式。此外掛程式透過使用 SQLAlchemy 的事件機制運作。它監看表格層級的任務實例狀態變更並觸發事件。這將會針對所有 DAG 中的所有任務發出通知。
在這個外掛程式中,物件參考衍生自基底類別 airflow.plugins_manager.AirflowPlugin
。
Listener 外掛程式在底層使用 pluggy 應用程式。Pluggy 是一個為 Pytest 建立的外掛程式管理和 hook 呼叫應用程式。Pluggy 啟用函數 hooking,因此它允許使用您自己的自訂功能在該 hooking 上建構「可插拔」系統。
- 使用此外掛程式,可以監聽以下事件
任務實例處於執行中狀態。
任務實例處於成功狀態。
任務實例處於失敗狀態。
DAG 執行處於執行中狀態。
DAG 執行處於成功狀態。
DAG 執行處於失敗狀態。
在 airflow job、scheduler 或 backfilljob 等事件開始之前
在 airflow job、scheduler 或 backfilljob 等事件停止之前
Listener 註冊¶
具有 listener 物件的物件參考的 listener 外掛程式會註冊為 airflow 外掛程式的一部分。以下是我們實作新 listener 的骨架
from airflow.plugins_manager import AirflowPlugin
# This is the listener file created where custom code to monitor is added over hookimpl
import listener
class MetadataCollectionPlugin(AirflowPlugin):
name = "MetadataCollectionPlugin"
listeners = [listener]
接下來,我們可以檢查新增到 listener
中的程式碼,並查看每個 listener 的實作方法。實作完成後,listener 部分會在所有 DAG 的所有任務執行期間執行
作為參考,以下是 listener.py
類別中的外掛程式碼,顯示資料庫中的表格清單
此範例監聽任務實例何時處於執行中狀態
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
類似地,可以實作在 task_instance 成功和失敗後進行監聽的程式碼。
此範例監聽 DAG 執行何時變更為失敗狀態
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
類似地,可以實作在 dag_run 成功和執行中狀態後進行監聽的程式碼。
新增 listener 實作所需的 listener 外掛程式檔案會作為 Airflow 外掛程式的一部分新增到 $AIRFLOW_HOME/plugins/
資料夾中,並在 Airflow 啟動期間載入。