airflow.providers.apache.kafka.operators.consume
¶
模組內容¶
類別¶
一個從 Kafka 主題 (或多個主題) 消費並處理訊息的 operator。 |
屬性¶
- class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[原始碼]¶
基底類別:
airflow.models.BaseOperator
一個從 Kafka 主題 (或多個主題) 消費並處理訊息的 operator。
此 operator 建立一個 Kafka 消費者,從叢集讀取一批訊息,並使用使用者提供的可呼叫函式處理這些訊息。消費者將持續批量讀取,直到到達日誌結尾或達到最大訊息數。
- 參數
kafka_config_id (str) – 要使用的連線物件,預設為 “kafka_default”
topics (str | collections.abc.Sequence[str]) – 消費者應訂閱的主題或正則表達式模式列表。
apply_function (Callable[Ellipsis, Any] | str | None) – 應一次應用於一個提取項目的函式。dag 檔案名稱(執行函式)和函式名稱以 . 分隔。
apply_function_batch (Callable[Ellipsis, Any] | str | None) – 應應用於一批提取訊息的函式。不能與 apply_function 一起使用。適用於交易型工作負載,在對訊息執行操作之前或之後可能會呼叫昂貴的任務。
apply_function_args (collections.abc.Sequence[Any] | None) – 應應用於可呼叫物件的其他引數,預設為 None
apply_function_kwargs (dict[Any, Any] | None) – 應應用於可呼叫物件的其他關鍵字引數,預設為 None
commit_cadence (str | None) – 消費者應何時提交偏移量 (“never”, “end_of_batch”, “end_of_operator”),預設為 “end_of_operator”;如果為 end_of_operator,則 commit() 會根據 max_messages 引數呼叫。提交會在 operator 針對 operator 中的最大訊息數處理完 apply_function 方法後進行。如果為 end_of_batch,則 commit() 會根據 max_batch_size 引數呼叫。提交會在每個批次由 apply_function 方法處理完批次中的所有訊息後進行。如果為 never,則會呼叫 close() 而不呼叫 commit() 方法。
max_messages (int | None) – operator 應從 Kafka 讀取的最大訊息總數,預設為 None,表示讀取到主題結尾。
max_batch_size (int) – 消費者在輪詢時應讀取的最大訊息數,預設為 1000
poll_timeout (float) – Kafka 消費者應等待多久才確定沒有更多訊息可用,預設為 60
另請參閱
如需更多關於如何使用此 operator 的資訊,請查看指南: ConsumeFromTopicOperator