airflow.providers.standard.sensors.date_time

模組內容

類別

StartTriggerArgs

從觸發器啟動任務執行所需的參數。

DateTimeSensor

等待直到指定日期時間。

DateTimeSensorAsync

等待直到指定日期時間發生。

class airflow.providers.standard.sensors.date_time.StartTriggerArgs[原始碼]

從觸發器啟動任務執行所需的參數。

trigger_cls: str[原始碼]
next_method: str[原始碼]
trigger_kwargs: dict[str, Any] | None[原始碼]
next_kwargs: dict[str, Any] | None[原始碼]
timeout: datetime.timedelta | None[原始碼]
class airflow.providers.standard.sensors.date_time.DateTimeSensor(*, target_time, **kwargs)[原始碼]

基底類別: airflow.sensors.base.BaseSensorOperator

等待直到指定日期時間。

此感測器的主要優點是對 target_time 的冪等性。它可以處理 TimeSensorTimeDeltaSensor 不適用的某些情況。

範例 1 :

如果任務需要等待每個 logical_date 的上午 11 點。使用 TimeSensorTimeDeltaSensor,所有在凌晨 1 點啟動的回填任務都必須等待 10 小時。這是沒有必要的,例如,{{ ds }} = '1970-01-01' 的回填任務不需要等待,因為 1970-01-01T11:00:00 已經過了。

範例 2 :

如果 DAG 計劃每天 23:00 運行,但其中一項任務需要在隔天 01:00 運行,使用 TimeSensor 將立即返回 True,因為 23:00 > 01:00。相反地,我們可以這樣做

DateTimeSensor(
    task_id="wait_for_0100",
    target_time="{{ data_interval_end.tomorrow().replace(hour=1) }}",
)
參數

target_time (str | datetime.datetime) – 工作成功的日期時間。(已套用 Jinja 模板)

template_fields: collections.abc.Sequence[str] = ('target_time',)[原始碼]
poke(context)[原始碼]

覆寫以衍生此類別。

class airflow.providers.standard.sensors.date_time.DateTimeSensorAsync(*, start_from_trigger=False, end_from_trigger=False, trigger_kwargs=None, **kwargs)[原始碼]

基底類別: DateTimeSensor

等待直到指定日期時間發生。

延遲自身以避免在等待時佔用 Worker 插槽。它是 DateTimeSensor 的直接替換。

參數
  • target_time – 工作成功的日期時間。(已套用 Jinja 模板)

  • start_from_trigger (bool) – 直接從觸發器啟動任務,而無需進入 Worker。

  • trigger_kwargs (dict[str, Any] | None) – 當 start_from_trigger 設定為 True 以進行動態任務映射時,傳遞給觸發器的關鍵字參數。此參數在標準用法中未使用。

  • end_from_trigger (bool) – 直接從觸發器結束任務,而無需進入 Worker。

start_trigger_args[原始碼]
start_from_trigger = False[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與渲染 Jinja 模板時相同的字典。

請參閱 get_template_context 以取得更多上下文。

execute_complete(context, event=None)[原始碼]

當觸發器觸發時處理事件並立即返回。

此條目是否有幫助?