airflow.triggers.base

模組內容

類別

StartTriggerArgs

從觸發器啟動任務執行所需的引數。

BaseTrigger

所有觸發器的基礎類別。

TriggerEvent

觸發器在其條件滿足時可以觸發的事件。

TaskSuccessEvent

產生此事件以成功結束任務。

TaskFailedEvent

產生此事件以失敗結束任務。

TaskSkippedEvent

產生此事件以 'skipped' 狀態結束任務。

屬性

log

airflow.triggers.base.log[source]
class airflow.triggers.base.StartTriggerArgs[source]

從觸發器啟動任務執行所需的引數。

trigger_cls: str[source]
next_method: str[source]
trigger_kwargs: dict[str, Any] | None[source]
next_kwargs: dict[str, Any] | None[source]
timeout: datetime.timedelta | None[source]
class airflow.triggers.base.BaseTrigger(**kwargs)[source]

Bases: abc.ABC, airflow.utils.log.logging_mixin.LoggingMixin

所有觸發器的基礎類別。

觸發器可以存在於兩種情境中

  • 在 Operator 內部,當它傳遞給 TaskDeferred 時

  • 在觸發器 worker 中主動執行

我們在兩種情況下都使用相同的類別,並依賴所有 Trigger 類別都能夠傳回引數(可以使用 Airflow-JSON 編碼),以便它們可以在其他地方重新實例化。

abstract serialize()[source]

傳回重建此 Trigger 所需的資訊。

返回

(類別路徑,重新實例化所需的關鍵字引數)的元組。

返回類型

tuple[str, dict[str, Any]]

abstract async run()[source]

在非同步環境中執行觸發器。

每當觸發器想要觸發事件時,它應該產生一個 Event,如果完成則傳回 None。因此,單一事件觸發器應產生然後立即傳回。

如果它產生,則很可能很快就會恢復,但可能不會(例如,如果工作負載正在移至另一個觸發器程序,或者多事件觸發器用於單事件任務延遲)。

在任何一種情況下,Trigger 類別都應假設它們將被持久化,然後依賴於在不再需要它們時呼叫 cleanup()。

async cleanup()[source]

清理觸發器。

當不再需要觸發器並且要從活動觸發器程序中移除時呼叫。

此方法遵循 async/await 模式,以允許在觸發器主事件迴圈中執行清理。清理方法引發的例外情況會被忽略,因此如果您希望能夠偵錯它們並收到清理方法失敗的通知,您應該使用 try/except 區塊包裝您的程式碼並適當地處理它(以非同步相容的方式)。

__repr__()[source]

傳回 repr(self)。

class airflow.triggers.base.TriggerEvent(payload)[source]

觸發器在其條件滿足時可以觸發的事件。

事件必須具有唯一識別值,該值在觸發器運行的任何地方都相同;這是為了確保如果同一個觸發器在兩個位置(出於 HA 原因)運行,我們可以對其事件進行去重。

__repr__()[source]

傳回 repr(self)。

__eq__(other)[source]

傳回 self==value。

handle_submit(*, task_instance, session=NEW_SESSION)[source]

處理給定任務實例的提交事件。

此函數設定任務實例的下一個方法和下一個 kwargs,以及其狀態為排程中。它還將事件的 payload 新增到任務的 kwargs 中。

參數
  • task_instance (airflow.models.TaskInstance) – 要處理提交事件的任務實例。

  • session (sqlalchemy.orm.Session) – 用於資料庫回呼接收器的 session。

class airflow.triggers.base.TaskSuccessEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

產生此事件以成功結束任務。

task_instance_state[source]
class airflow.triggers.base.TaskFailedEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

產生此事件以失敗結束任務。

task_instance_state[source]
class airflow.triggers.base.TaskSkippedEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

產生此事件以 'skipped' 狀態結束任務。

task_instance_state[source]

此條目是否有幫助?