Airflow 的 Listener 外掛程式

Airflow 具有允許新增 listener 以使用外掛程式監控和追蹤任務狀態的功能。

這是一個簡單的 Airflow listener 外掛程式範例,有助於追蹤任務狀態並收集關於任務、DAG 執行和 DAG 的實用元數據資訊。

這是一個 Airflow 的外掛程式範例,允許建立 Airflow 的 listener 外掛程式。此外掛程式透過使用 SQLAlchemy 的事件機制運作。它監看表格層級的任務實例狀態變更並觸發事件。這將會針對所有 DAG 中的所有任務發出通知。

在這個外掛程式中,物件參考衍生自基底類別 airflow.plugins_manager.AirflowPlugin

Listener 外掛程式在底層使用 pluggy 應用程式。Pluggy 是一個為 Pytest 建立的外掛程式管理和 hook 呼叫應用程式。Pluggy 啟用函數 hooking,因此它允許使用您自己的自訂功能在該 hooking 上建構「可插拔」系統。

使用此外掛程式,可以監聽以下事件
  • 任務實例處於執行中狀態。

  • 任務實例處於成功狀態。

  • 任務實例處於失敗狀態。

  • DAG 執行處於執行中狀態。

  • DAG 執行處於成功狀態。

  • DAG 執行處於失敗狀態。

  • 在 airflow job、scheduler 或 backfilljob 等事件開始之前

  • 在 airflow job、scheduler 或 backfilljob 等事件停止之前

Listener 註冊

具有 listener 物件的物件參考的 listener 外掛程式會註冊為 airflow 外掛程式的一部分。以下是我們實作新 listener 的骨架

from airflow.plugins_manager import AirflowPlugin

# This is the listener file created where custom code to monitor is added over hookimpl
import listener


class MetadataCollectionPlugin(AirflowPlugin):
    name = "MetadataCollectionPlugin"
    listeners = [listener]

接下來,我們可以檢查新增到 listener 中的程式碼,並查看每個 listener 的實作方法。實作完成後,listener 部分會在所有 DAG 的所有任務執行期間執行

作為參考,以下是 listener.py 類別中的外掛程式碼,顯示資料庫中的表格清單

此範例監聽任務實例何時處於執行中狀態

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    state: TaskInstanceState = task_instance.state
    name: str = task_instance.task_id
    start_date = task_instance.start_date

    dagrun = task_instance.dag_run
    dagrun_status = dagrun.state

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name} state:{state} start_date:{start_date}")
    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")


類似地,可以實作在 task_instance 成功和失敗後進行監聽的程式碼。

此範例監聽 DAG 執行何時變更為失敗狀態

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger

    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
    print(f"Failed with message: {msg}")


類似地,可以實作在 dag_run 成功和執行中狀態後進行監聽的程式碼。

新增 listener 實作所需的 listener 外掛程式檔案會作為 Airflow 外掛程式的一部分新增到 $AIRFLOW_HOME/plugins/ 資料夾中,並在 Airflow 啟動期間載入。

這個條目是否有幫助?