airflow.providers.apache.kafka.triggers.await_message

模組內容

類別

AwaitMessageTrigger

一個等待符合特定條件的訊息到達 Kafka 的觸發器。

class airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5)[原始碼]

繼承自: airflow.triggers.base.BaseTrigger

一個等待符合特定條件的訊息到達 Kafka 的觸發器。

此觸發器的消費者行為如下: - 輪詢 Kafka 主題以獲取訊息,如果沒有訊息返回,則休眠 - 使用提供的可調用物件處理訊息並提交訊息偏移量

  • 如果可調用物件返回任何數據,則引發帶有返回數據的 TriggerEvent

  • 否則繼續下一個訊息

參數
  • kafka_config_id (str) – 要使用的連線物件,預設為 “kafka_default”

  • topics (collections.abc.Sequence[str]) – 應該在其中搜尋訊息的主題(或主題正則表達式)

  • apply_function (str) – 應用於訊息以確定匹配條件的函數位置。(以 Python 點表示法作為字串)

  • apply_function_args (collections.abc.Sequence[Any] | None) – 應用於可調用物件的一組參數,預設為 None

  • apply_function_kwargs (dict[Any, Any] | None) – 應用於可調用物件的一組關鍵字參數,預設為 None,預設為 None

  • poll_timeout (float) – Kafka 客戶端在從輪詢請求返回到 Kafka 之前應等待多長時間(秒),預設為 1

  • poll_interval (float) – 觸發器在達到 Kafka 日誌末尾後應休眠多長時間(秒),預設為 5

serialize()[原始碼]

返回重建此觸發器所需的信息。

返回

(類別路徑,重新實例化所需的關鍵字參數)的元組。

返回類型

tuple[str, dict[str, Any]]

async run()[原始碼]

在異步上下文中運行觸發器。

每當觸發器想要觸發事件時,它應該產生一個 Event,如果完成,則返回 None。單事件觸發器因此應該產生然後立即返回。

如果它產生,則很可能它會很快恢復,但可能不會(例如,如果工作負載正在移動到另一個觸發器進程,或者多事件觸發器正在用於單事件任務延遲)。

在任何情況下,Trigger 類別都應假設它們將被持久化,然後依賴於在不再需要它們時調用 cleanup()。

此條目是否有幫助?