Listeners(監聽器)

您可以編寫監聽器,讓 Airflow 在事件發生時通知您。Pluggy 為這些監聽器提供技術支持。

警告

監聽器是 Airflow 的進階功能。它們並未與其運行的 Airflow 組件隔離,並且可能會減慢速度,或在某些情況下拖垮您的 Airflow 實例。因此,在編寫監聽器時應格外小心。

Airflow 支援以下事件的通知

生命週期事件

  • on_starting(啟動中)

  • before_stopping(停止前)

生命週期事件讓您可以對 Airflow Job(作業)的啟動和停止事件做出反應,例如 SchedulerJob(排程器作業)或 BackfillJob(回填作業)。

DagRun 狀態變更事件

DagRun 變更狀態時,會發生 DagRun 狀態變更事件。

  • on_dag_run_running(DagRun 執行中)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at
    dag_hash_info = dag_run.dag_hash

    print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")


  • on_dag_run_success(DagRun 成功)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed(DagRun 失敗)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger

    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
    print(f"Failed with message: {msg}")


TaskInstance 狀態變更事件

TaskInstance 變更狀態時,會發生 TaskInstance 狀態變更事件。您可以使用這些事件來對 LocalTaskJob 狀態變更做出反應。

  • on_task_instance_running(TaskInstance 執行中)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    state: TaskInstanceState = task_instance.state
    name: str = task_instance.task_id
    start_date = task_instance.start_date

    dagrun = task_instance.dag_run
    dagrun_status = dagrun.state

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name} state:{state} start_date:{start_date}")
    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")


  • on_task_instance_success(TaskInstance 成功)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to SUCCESS.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has succeeded its
    dag_run, task and dag information.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    dag_id = task_instance.dag_id
    hostname = task_instance.hostname
    operator = task_instance.operator

    dagrun = task_instance.dag_run
    queued_at = dagrun.queued_at
    print(f"Dag name:{dag_id} queued_at:{queued_at}")
    print(f"Task hostname:{hostname} operator:{operator}")


  • on_task_instance_failed(TaskInstance 失敗)

airflow/example_dags/plugins/event_listener.py[原始碼]

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    print("Task instance in failure state")

    start_date = task_instance.start_date
    end_date = task_instance.end_date
    duration = task_instance.duration

    dagrun = task_instance.dag_run

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag

    print(f"Task start:{start_date} end:{end_date} duration:{duration}")
    print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
    if error:
        print(f"Failure caused by {error}")


Dataset 事件

  • on_dataset_created(Dataset 已建立)

  • on_dataset_changed(Dataset 已變更)

當 Dataset 管理操作執行時,會發生 Dataset 事件。

DAG 導入錯誤事件

  • on_new_dag_import_error(新的 DAG 導入錯誤)

  • on_existing_dag_import_error(現有的 DAG 導入錯誤)

當 dag 處理器在 Dag 程式碼中發現導入錯誤並更新元數據資料庫表時,會發生 Dag 導入錯誤事件。

這是一個實驗性功能

用法

要建立監聽器

  • import airflow.listeners.hookimpl

  • 為您想要產生通知的事件實作 hookimpls

Airflow 將規範定義為 hookspec。您的實作必須接受與 hookspec 中定義的相同命名參數。如果您沒有使用與 hookspec 相同的參數,當您嘗試使用您的外掛程式時,Pluggy 會拋出錯誤。但您不需要實作每個方法。許多監聽器僅實作一個方法或方法子集。

若要將監聽器包含在您的 Airflow 安裝中,請將其作為 Airflow 外掛程式 的一部分包含進來。

監聽器 API 旨在跨所有 DAG 和所有運算子調用。您無法監聽特定 DAG 產生的事件。對於這種行為,請嘗試 on_success_callbackpre_execute 等方法。這些為特定的 DAG 作者或運算子建立者提供回呼。日誌和 print() 調用將作為監聽器的一部分處理。

相容性注意事項

監聽器介面可能會隨著時間推移而變更。我們正在使用 pluggy 規範,這表示為舊版介面編寫的監聽器實作應向前相容於未來版本的 Airflow。

但是,反之則不保證,因此如果您的監聽器是針對較新版本的介面實作的,則可能無法與舊版本的 Airflow 搭配使用。如果您目標是單一版本的 Airflow,這不是問題,因為您可以將您的實作調整為您使用的 Airflow 版本,但如果您正在編寫可用於不同版本 Airflow 的外掛程式或擴充功能,則這很重要。

例如,如果在介面中新增了一個新欄位(例如 2.10.0 版本中 on_task_instance_failed 方法中的 error 欄位),則監聽器實作將無法處理事件物件中不存在該欄位的情況,並且此類監聽器僅適用於 Airflow 2.10.0 及更高版本。

為了實作與多個 Airflow 版本相容的監聽器,包括使用較新 Airflow 版本中新增的功能和欄位,您應該檢查使用的 Airflow 版本並使用較新版本的介面實作,但對於舊版本的 Airflow,您應該使用舊版本的介面。

例如,如果您想要實作一個使用 on_task_instance_failed 中的 error 欄位的監聽器,您應該使用如下程式碼

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(
            self, previous_state, task_instance, error: None | str | BaseException, session
        ):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, session):
            # Handle no error case here
            pass

自 2.8.0 版本引入監聽器介面以來,介面變更列表

Airflow 版本

受影響的方法

變更

2.10.0

on_task_instance_failed(TaskInstance 失敗)

介面中新增了錯誤欄位

此條目是否有幫助?