airflow.providers.apache.kafka.operators.consume

模組內容

類別

ConsumeFromTopicOperator

一個從 Kafka 主題 (或多個主題) 消費並處理訊息的 operator。

屬性

VALID_COMMIT_CADENCE

airflow.providers.apache.kafka.operators.consume.VALID_COMMIT_CADENCE[原始碼]
class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[原始碼]

基底類別: airflow.models.BaseOperator

一個從 Kafka 主題 (或多個主題) 消費並處理訊息的 operator。

此 operator 建立一個 Kafka 消費者,從叢集讀取一批訊息,並使用使用者提供的可呼叫函式處理這些訊息。消費者將持續批量讀取,直到到達日誌結尾或達到最大訊息數。

參數
  • kafka_config_id (str) – 要使用的連線物件,預設為 “kafka_default”

  • topics (str | collections.abc.Sequence[str]) – 消費者應訂閱的主題或正則表達式模式列表。

  • apply_function (Callable[Ellipsis, Any] | str | None) – 應一次應用於一個提取項目的函式。dag 檔案名稱(執行函式)和函式名稱以 . 分隔。

  • apply_function_batch (Callable[Ellipsis, Any] | str | None) – 應應用於一批提取訊息的函式。不能與 apply_function 一起使用。適用於交易型工作負載,在對訊息執行操作之前或之後可能會呼叫昂貴的任務。

  • apply_function_args (collections.abc.Sequence[Any] | None) – 應應用於可呼叫物件的其他引數,預設為 None

  • apply_function_kwargs (dict[Any, Any] | None) – 應應用於可呼叫物件的其他關鍵字引數,預設為 None

  • commit_cadence (str | None) – 消費者應何時提交偏移量 (“never”, “end_of_batch”, “end_of_operator”),預設為 “end_of_operator”;如果為 end_of_operator,則 commit() 會根據 max_messages 引數呼叫。提交會在 operator 針對 operator 中的最大訊息數處理完 apply_function 方法後進行。如果為 end_of_batch,則 commit() 會根據 max_batch_size 引數呼叫。提交會在每個批次由 apply_function 方法處理完批次中的所有訊息後進行。如果為 never,則會呼叫 close() 而不呼叫 commit() 方法。

  • max_messages (int | None) – operator 應從 Kafka 讀取的最大訊息總數,預設為 None,表示讀取到主題結尾。

  • max_batch_size (int) – 消費者在輪詢時應讀取的最大訊息數,預設為 1000

  • poll_timeout (float) – Kafka 消費者應等待多久才確定沒有更多訊息可用,預設為 60

另請參閱

如需更多關於如何使用此 operator 的資訊,請查看指南: ConsumeFromTopicOperator

BLUE = '#ffefeb'[原始碼]
ui_color[原始碼]
template_fields = ('topics', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[原始碼]
execute(context)[原始碼]

在建立 operator 時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多 context。

此條目是否有幫助?