可延遲運算子 & 觸發器¶
標準的 運算子 和 感測器 在整個執行期間,即使處於閒置狀態,也會佔用完整的工作節點插槽。例如,如果您只有 100 個工作節點插槽可用於執行任務,並且您有 100 個 DAG 正在等待目前正在執行但閒置的感測器,那麼您無法執行任何其他操作 - 即使您的整個 Airflow 集群基本上處於閒置狀態。reschedule
感測器的模式解決了部分問題,它允許感測器僅以固定的間隔執行,但它不夠靈活,並且僅允許使用時間作為恢復執行的原因,而不是其他條件。
這就是可延遲運算子的用武之地。當運算子沒有其他事情可做,只能等待時,它可以暫停自身並通過延遲來釋放工作節點以供其他進程使用。當運算子延遲時,執行會轉移到觸發器,運算子指定的觸發器將在此處運行。觸發器可以執行運算子所需的輪詢或等待。然後,當觸發器完成輪詢或等待時,它會發送信號讓運算子恢復執行。在執行的延遲階段,由於工作已卸載到觸發器,因此任務不再佔用工作節點插槽,您將有更多可用工作負載容量。預設情況下,處於延遲狀態的任務不會佔用資源池插槽。如果您希望它們佔用,您可以通過編輯相關的資源池來更改此設定。
觸發器是小型的非同步 Python 程式碼片段,旨在在單個 Python 進程中運行。由於它們是非同步的,因此它們可以全部有效地共存在 triggerer Airflow 元件中。
此流程如何運作的概觀
任務實例(正在運行的運算子)到達必須等待其他操作或條件的點,並使用綁定到事件的觸發器延遲自身以恢復它。這釋放了工作節點以運行其他任務。
新的觸發器實例由 Airflow 註冊,並由觸發器進程拾取。
觸發器運行直到它觸發,此時其來源任務由排程器重新排程。
排程器將任務排隊以在工作節點上恢復。
您可以作為 DAG 作者使用預先編寫的可延遲運算子,也可以編寫自己的運算子。但是,編寫它們需要它們滿足某些設計標準。
使用可延遲運算子¶
如果您想使用 Airflow 隨附的預先編寫的可延遲運算子,例如 TimeSensorAsync
,那麼您只需要完成兩個步驟
確保您的 Airflow 安裝至少運行一個
triggerer
進程,以及正常的scheduler
在您的 DAG 中使用可延遲運算子/感測器
Airflow 會自動處理和實作您的延遲流程。
如果您正在升級現有的 DAG 以使用可延遲運算子,Airflow 包含 API 相容的感測器變體,例如 TimeSensorAsync
用於 TimeSensor
。將這些變體添加到您的 DAG 中,即可使用可延遲運算子,而無需進行其他更改。
請注意,您無法從自訂 PythonOperator 或 TaskFlow Python 函數內部使用延遲功能。延遲僅適用於傳統的、基於類別的運算子。
撰寫可延遲運算子¶
撰寫可延遲運算子時,需要考慮以下要點
您的運算子必須使用觸發器延遲自身。您可以使用核心 Airflow 中包含的觸發器,也可以編寫自訂的觸發器。
您的運算子在延遲期間將被停止並從其工作節點中移除,並且不會自動保留狀態。您可以通過指示 Airflow 在特定方法處恢復運算子或傳遞某些 kwargs 來保留狀態。
您可以多次延遲,並且可以在運算子執行重要工作之前或之後延遲。或者,如果滿足某些條件,您可以延遲。例如,如果系統沒有立即的回應。延遲完全在您的控制之下。
任何運算子都可以延遲;其類別上不需要特殊標記,並且不限於感測器。
如果您想添加一個同時支援可延遲和不可延遲模式的運算子或感測器,建議添加
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)
到運算子的__init__
方法中,並使用它來決定是否在可延遲模式下運行運算子。您可以通過operator
區段中的default_deferrable
,為所有支援在可延遲和不可延遲模式之間切換的運算子和感測器配置deferrable
的預設值。以下是一個同時支援兩種模式的感測器範例。
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
撰寫觸發器¶
觸發器被編寫為從 BaseTrigger
繼承的類別,並實作三個方法
__init__
:一種從實例化它的運算子接收引數的方法。自 2.10.0 以來,我們能夠直接從預定義的觸發器開始任務執行。為了利用此功能,__init__
中的所有引數都必須是可序列化的。run
:一種非同步方法,它運行其邏輯並產生一個或多個TriggerEvent
實例作為非同步產生器。serialize
:傳回重建此觸發器所需的資訊,以 classpath 的元組和要傳遞給__init__
的關鍵字引數的形式。
此範例顯示了基本觸發器的結構,這是 Airflow 的 DateTimeTrigger
的簡化版本
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
super().__init__()
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)
程式碼範例顯示了幾件事
__init__
和serialize
被編寫為一對。觸發器在由運算子作為其延遲請求的一部分提交時實例化一次,然後序列化並在運行觸發器的任何觸發器進程上重新實例化。run
方法被宣告為async def
,因為它必須是非同步的,並且使用asyncio.sleep
而不是常規的time.sleep
(因為那會阻塞進程)。當它發出事件時,它會將
self.moment
打包在其中,因此如果此觸發器在多個主機上冗餘運行,則可以對事件進行重複資料刪除。
觸發器可以根據您的需要盡可能複雜或簡單,前提是它們滿足設計約束。它們可以以高可用性方式運行,並在運行觸發器的主機之間自動分發。我們鼓勵您避免在觸發器中使用任何形式的持久狀態。觸發器應從其 __init__
中獲取它們所需的一切,以便它們可以被序列化並自由移動。
如果您不熟悉編寫非同步 Python,請在編寫 run()
方法時非常小心。Python 的非同步模型意味著,如果程式碼在執行阻塞操作時沒有正確地 await
,則可能會阻塞整個進程。Airflow 會嘗試檢測進程阻塞程式碼,並在觸發器日誌中警告您何時發生這種情況。您可以通過在編寫觸發器時設定變數 PYTHONASYNCIODEBUG=1
來啟用 Python 的額外檢查,以確保您正在編寫非阻塞程式碼。在執行檔案系統調用時要格外小心,因為如果底層檔案系統是網路支援的,則可能會阻塞。
在編寫自己的觸發器時,需要注意一些設計約束
run
方法必須是非同步的(使用 Python 的 asyncio),並且在執行阻塞操作時始終正確地await
。run
必須yield
其 TriggerEvents,而不是傳回它們。如果它在產生至少一個事件之前傳回,Airflow 將認為這是一個錯誤,並使任何等待它的任務實例失敗。如果它拋出異常,Airflow 也會使任何依賴的任務實例失敗。您應該假設觸發器實例可以運行多次。如果發生網路分割,並且 Airflow 在隔離的機器上重新啟動觸發器,則可能會發生這種情況。因此,您必須注意副作用。例如,您可能不想使用觸發器來插入資料庫列。
如果您的觸發器旨在發出多個事件(目前不受支援),則每個發出的事件必須包含一個有效負載,如果觸發器在多個位置運行,則可以使用該有效負載來消除事件重複資料。如果您只觸發一個事件並且不需要將資訊傳回給運算子,您可以將有效負載設定為
None
。觸發器可能會突然從一個觸發器服務中移除,並在新的觸發器服務上啟動。例如,如果子網路發生變更並且導致網路分割,或者如果發生部署。如果需要,您可以實作
cleanup
方法,該方法始終在run
之後調用,無論觸發器是否乾淨地退出或其他情況。為了使對觸發器的任何變更都反映出來,每次修改觸發器時都需要重新啟動觸發器。
注意
目前,觸發器僅使用到它們的第一個事件,因為它們僅用於恢復延遲任務,並且任務在第一個事件觸發後恢復。但是,Airflow 計劃在未來允許從觸發器啟動 DAG,屆時多事件觸發器將更有用。
觸發器中的敏感資訊¶
自 Airflow 2.9.0 起,觸發器 kwargs 在儲存到資料庫之前會被序列化和加密。這意味著您傳遞給觸發器的任何敏感資訊都將以加密形式儲存在資料庫中,並在從資料庫讀取時解密。
觸發延遲¶
如果您想在運算子中的任何位置觸發延遲,您可以調用 self.defer(trigger, method_name, kwargs, timeout)
。這會為 Airflow 引發一個特殊的例外。引數為
trigger
:您要延遲到的觸發器的實例。它將被序列化到資料庫中。method_name
:您希望 Airflow 在恢復時調用的運算子上的方法名稱。kwargs
:(可選)在調用方法時要傳遞給方法的其他關鍵字引數。預設值為{}
。timeout
:(可選)一個 timedelta,指定延遲逾時時間,超過此時間延遲將失敗,並且任務實例將失敗。預設值為None
,表示沒有逾時。
以下是感測器如何觸發延遲的基本範例
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
當您選擇延遲時,您的運算子將在該點停止執行,並從其目前的工作節點中移除。不會保留任何狀態,例如局部變數或在 self
上設定的屬性。當您的運算子恢復時,它會作為其新實例恢復。您可以將狀態從運算子的舊實例傳遞到新實例的唯一方法是使用 method_name
和 kwargs
。
當您的運算子恢復時,Airflow 會將 context
物件和 event
物件添加到傳遞給 method_name
方法的 kwargs 中。此 event
物件包含來自恢復您的運算子的觸發器事件的有效負載。根據觸發器,這可能對您的運算子很有用,例如它是狀態碼或用於獲取結果的 URL。或者,它可能是不重要的資訊,例如日期時間。但是,您的 method_name
方法必須接受 context
和 event
作為關鍵字引數。
如果您的運算子從其第一個 execute()
方法(在它是新的時候)或由 method_name
指定的後續方法傳回,它將被視為已完成並完成執行。
如果您希望您的運算子只有一個進入點,您可以將 method_name
設定為 execute
,但它也必須接受 event
作為可選的關鍵字引數。
讓我們更深入地了解上面的 WaitOneHourSensor
範例。此感測器只是觸發器的精簡包裝。它延遲到觸發器,並指定一個不同的方法在觸發器觸發時返回。當它立即返回時,它將感測器標記為成功。
self.defer
調用引發 TaskDeferred
例外,因此它可以在您的運算子程式碼中的任何位置工作,即使在 execute()
內部多層調用中也是如此。您也可以手動引發 TaskDeferred
,它使用與 self.defer
相同的引數。
運算子的 execution_timeout
是從總運行時間確定的,而不是延遲之間的單獨執行。這意味著,如果設定了 execution_timeout
,則運算子可能會在其延遲期間或在延遲後運行時失敗,即使它僅恢復了幾秒鐘。
從任務開始觸發延遲¶
2.10.0 版本新增功能。
如果您想直接將任務延遲到觸發器,而無需進入工作節點,您可以將類別級別屬性 start_from_trigger
設定為 True
,並向可延遲運算子添加具有 StartTriggerArgs
物件的類別級別屬性 start_trigger_args
,其中包含以下 4 個屬性
trigger_cls
:觸發器類別的可匯入路徑。trigger_kwargs
:在初始化trigger_cls
時要傳遞給它的關鍵字引數。請注意,所有引數都需要是可序列化的。這是此功能的主要限制。next_method
:您希望 Airflow 在恢復時調用的運算子上的方法名稱。next_kwargs
:在調用next_method
時要傳遞給它的其他關鍵字引數。timeout
:(可選)一個 timedelta,指定延遲逾時時間,超過此時間延遲將失敗,並且任務實例將失敗。預設值為None
,表示沒有逾時。
當延遲是 execute
方法唯一要執行的操作時,這特別有用。以下是先前範例的基本改進。在先前的範例中,我們使用了 DateTimeTrigger
,它採用引數 delta
,類型為 datetime.timedelta
,它是不可序列化的。因此,我們需要建立一個新的觸發器,其中包含可序列化的引數。
from __future__ import annotations
import datetime
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
class HourDeltaTrigger(DateTimeTrigger):
def __init__(self, hours: int):
moment = timezone.utcnow() + datetime.timedelta(hours=hours)
super().__init__(moment=moment)
在感測器部分,我們需要提供 HourDeltaTrigger
的路徑作為 trigger_cls
。
from __future__ import annotations
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
start_from_trigger
和 trigger_kwargs
也可以在實例級別進行修改,以獲得更靈活的配置。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitTwoHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = {"hours": 2}
self.start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
映射任務的初始化階段在排程器將它們提交到執行器之後發生。因此,此功能提供的動態任務映射支援有限,並且其用法與標準實務不同。為了啟用動態任務映射支援,您需要在 __init__
方法中定義 start_from_trigger
和 trigger_kwargs
。請注意,您不需要同時定義它們才能使用此功能,但您需要使用完全相同的參數名稱。例如,如果您將引數定義為 t_kwargs
並將此值分配給 self.start_trigger_args.trigger_kwargs
,則它不會有任何效果。當映射 start_from_trigger
設定為 True 的任務時,將跳過整個 __init__
方法。排程器將使用提供的 start_from_trigger
和 trigger_kwargs
從 partial
和 expand
(如果未提供,則回退到類別屬性中的那些)來確定是否以及如何將任務提交到執行器或觸發器。請注意,XCom 值在此階段不會被解析。
觸發器完成執行後,任務可能會被發送回工作節點以執行 next_method
,或者任務實例可能會直接結束。(請參閱 從觸發器結束延遲任務)如果任務被發送回工作節點,則 __init__
方法中的引數仍然會在執行 next_method
之前生效,但它們不會影響觸發器的執行。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitHoursSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(
self,
*args: list[Any],
trigger_kwargs: dict[str, Any] | None,
start_from_trigger: bool,
**kwargs: dict[str, Any],
) -> None:
# This whole method will be skipped during dynamic task mapping.
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
這將擴展為 2 個任務,它們的“hours”引數分別設定為 1 和 2。
WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
從觸發器結束延遲任務¶
2.10.0 版本新增功能。
如果您想直接從觸發器結束任務,而無需進入工作節點,您可以指定實例級別屬性 end_from_trigger
以及可延遲運算子的屬性,如上所述。這可以節省啟動新工作節點所需的一些資源。
觸發器可以有兩個選項:它們可以將執行發送回工作節點,也可以直接結束任務實例。如果觸發器本身結束任務實例,則 method_name
無關緊要,可以是 None
。否則,請提供在任務中恢復執行時應使用的 method_name
。
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
TaskSuccessEvent
和 TaskFailureEvent
是可用於直接結束任務實例的兩個事件。這會將任務標記為狀態 task_instance_state
,並在適用時可選地推送 xcom。以下是如何使用這些事件的範例
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
在上面的範例中,如果 end_from_trigger
設定為 True
,觸發器將通過產生 TaskSuccessEvent
直接結束任務實例。否則,它將使用運算子中指定的方法恢復任務實例。
注意
僅當未為可延遲運算子整合監聽器時,從觸發器退出才有效。目前,當可延遲運算子的 end_from_trigger
屬性設定為 True
並且整合了監聽器時,它會在解析期間引發例外,以指示此限制。在編寫自訂觸發器時,請確保如果從外掛程式添加了監聽器,則觸發器未設定為直接結束任務實例。如果觸發器的作者將 end_from_trigger
屬性更改為不同的屬性,則 DAG 解析不會引發任何例外,並且依賴於此任務的監聽器將無法運作。此限制將在未來的版本中解決。
高可用性¶
觸發器旨在在高可用性 (HA) 架構中運作。如果您想運行高可用性設定,請在多個主機上運行多個 triggerer
副本。與 scheduler
非常相似,它們會自動與正確的鎖定和 HA 共存。
根據觸發器正在執行的工作量,您可以在單個 triggerer
主機上容納數百到數萬個觸發器。預設情況下,每個 triggerer
都有 1000 個觸發器的容量,它可以嘗試一次運行。您可以使用 --capacity
引數更改可以同時運行的觸發器數量。如果您嘗試運行的觸發器數量超過了所有 triggerer
進程的總容量,則某些觸發器的運行將被延遲,直到其他觸發器完成。
Airflow 嘗試一次只在一個位置運行觸發器,並維護對目前正在運行的所有 triggerer
的心跳訊號。如果 triggerer
停止運作,或與 Airflow 資料庫運行的網路分割,Airflow 會自動重新排程該主機上的觸發器以在其他地方運行。Airflow 會等待(2.1 * triggerer.job_heartbeat_sec
)秒,等待機器重新出現,然後再重新排程觸發器。
這意味著觸發器有可能(但不太可能)在多個位置同時運行。但是,此行為已設計到觸發器契約中,並且是預期行為。當觸發器在多個位置同時運行時,Airflow 會消除重複的事件,因此此過程對您的運算子是透明的。
請注意,您運行的每個額外 triggerer
都會導致與資料庫建立額外的持久連線。
感測器中 Mode='reschedule' 與 Deferrable=True 的差異¶
在 Airflow 中,感測器等待滿足特定條件後才繼續執行下游任務。感測器有兩個選項用於管理閒置期:mode='reschedule'
和 deferrable=True
。由於 mode='reschedule'
是 Airflow 中 BaseSensorOperator 特有的參數,因此它允許感測器在條件未滿足時重新排程自身。'deferrable=True'
是一些運算子使用的慣例,用於指示任務可以稍後重試(或延遲),但它不是 Airflow 中的內建參數或模式。重試任務的實際行為因特定的運算子實作而異。
mode='reschedule' |
deferrable=True |
---|---|
持續重新排程自身,直到滿足條件 |
閒置時暫停執行,條件變更時恢復 |
資源使用率較高(重複執行) |
資源使用率較低(閒置時暫停,釋放工作節點插槽) |
預期條件會隨時間變更(例如,檔案建立) |
等待外部事件或資源(例如,API 回應) |
用於重新排程的內建功能 |
需要自訂邏輯來延遲任務並處理外部變更 |