Listeners(監聽器)¶
您可以編寫監聽器,讓 Airflow 在事件發生時通知您。Pluggy 為這些監聽器提供技術支持。
警告
監聽器是 Airflow 的進階功能。它們並未與其運行的 Airflow 組件隔離,並且可能會減慢速度,或在某些情況下拖垮您的 Airflow 實例。因此,在編寫監聽器時應格外小心。
Airflow 支援以下事件的通知
生命週期事件¶
on_starting(啟動中)
before_stopping(停止前)
生命週期事件讓您可以對 Airflow Job
(作業)的啟動和停止事件做出反應,例如 SchedulerJob
(排程器作業)或 BackfillJob
(回填作業)。
DagRun 狀態變更事件¶
當 DagRun
變更狀態時,會發生 DagRun 狀態變更事件。
on_dag_run_running(DagRun 執行中)
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
dag_hash_info = dag_run.dag_hash
print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
on_dag_run_success(DagRun 成功)
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
print("Dag run in success state")
start_date = dag_run.start_date
end_date = dag_run.end_date
print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed(DagRun 失敗)
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
TaskInstance 狀態變更事件¶
當 TaskInstance
變更狀態時,會發生 TaskInstance 狀態變更事件。您可以使用這些事件來對 LocalTaskJob
狀態變更做出反應。
on_task_instance_running(TaskInstance 執行中)
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
on_task_instance_success(TaskInstance 成功)
@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to SUCCESS.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has succeeded its
dag_run, task and dag information.
"""
print("Task instance in success state")
print(" Previous state of the Task instance:", previous_state)
dag_id = task_instance.dag_id
hostname = task_instance.hostname
operator = task_instance.operator
dagrun = task_instance.dag_run
queued_at = dagrun.queued_at
print(f"Dag name:{dag_id} queued_at:{queued_at}")
print(f"Task hostname:{hostname} operator:{operator}")
on_task_instance_failed(TaskInstance 失敗)
@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
"""
This method is called when task state changes to FAILED.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has failed its dag_run,
task and dag information.
"""
print("Task instance in failure state")
start_date = task_instance.start_date
end_date = task_instance.end_date
duration = task_instance.duration
dagrun = task_instance.dag_run
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
print(f"Task start:{start_date} end:{end_date} duration:{duration}")
print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
if error:
print(f"Failure caused by {error}")
Dataset 事件¶
on_dataset_created(Dataset 已建立)
on_dataset_changed(Dataset 已變更)
當 Dataset 管理操作執行時,會發生 Dataset 事件。
DAG 導入錯誤事件¶
on_new_dag_import_error(新的 DAG 導入錯誤)
on_existing_dag_import_error(現有的 DAG 導入錯誤)
當 dag 處理器在 Dag 程式碼中發現導入錯誤並更新元數據資料庫表時,會發生 Dag 導入錯誤事件。
這是一個實驗性功能。
用法¶
要建立監聽器
import
airflow.listeners.hookimpl
為您想要產生通知的事件實作
hookimpls
Airflow 將規範定義為 hookspec。您的實作必須接受與 hookspec 中定義的相同命名參數。如果您沒有使用與 hookspec 相同的參數,當您嘗試使用您的外掛程式時,Pluggy 會拋出錯誤。但您不需要實作每個方法。許多監聽器僅實作一個方法或方法子集。
若要將監聽器包含在您的 Airflow 安裝中,請將其作為 Airflow 外掛程式 的一部分包含進來。
監聽器 API 旨在跨所有 DAG 和所有運算子調用。您無法監聽特定 DAG 產生的事件。對於這種行為,請嘗試 on_success_callback
和 pre_execute
等方法。這些為特定的 DAG 作者或運算子建立者提供回呼。日誌和 print()
調用將作為監聽器的一部分處理。
相容性注意事項¶
監聽器介面可能會隨著時間推移而變更。我們正在使用 pluggy
規範,這表示為舊版介面編寫的監聽器實作應向前相容於未來版本的 Airflow。
但是,反之則不保證,因此如果您的監聽器是針對較新版本的介面實作的,則可能無法與舊版本的 Airflow 搭配使用。如果您目標是單一版本的 Airflow,這不是問題,因為您可以將您的實作調整為您使用的 Airflow 版本,但如果您正在編寫可用於不同版本 Airflow 的外掛程式或擴充功能,則這很重要。
例如,如果在介面中新增了一個新欄位(例如 2.10.0 版本中 on_task_instance_failed
方法中的 error
欄位),則監聽器實作將無法處理事件物件中不存在該欄位的情況,並且此類監聽器僅適用於 Airflow 2.10.0 及更高版本。
為了實作與多個 Airflow 版本相容的監聽器,包括使用較新 Airflow 版本中新增的功能和欄位,您應該檢查使用的 Airflow 版本並使用較新版本的介面實作,但對於舊版本的 Airflow,您應該使用舊版本的介面。
例如,如果您想要實作一個使用 on_task_instance_failed
中的 error
欄位的監聽器,您應該使用如下程式碼
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
class ClassBasedListener:
...
@hookimpl
def on_task_instance_failed(
self, previous_state, task_instance, error: None | str | BaseException, session
):
# Handle error case here
pass
else:
class ClassBasedListener: # type: ignore[no-redef]
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
# Handle no error case here
pass
自 2.8.0 版本引入監聽器介面以來,介面變更列表
Airflow 版本 |
受影響的方法 |
變更 |
---|---|---|
2.10.0 |
|
介面中新增了錯誤欄位 |