Apache Kafka Operators¶
ConsumeFromTopicOperator¶
一個從一個或多個 Kafka 主題消費並處理訊息的 operator。此 operator 建立一個 Kafka Consumer,從叢集讀取一批訊息,並使用使用者提供的可呼叫 apply_function
處理它們。Consumer 將持續分批讀取,直到到達日誌末端或達到最大訊息讀取數量 (max_messages
)。
關於參數定義,請查看 ConsumeFromTopicOperator
。
使用此 operator¶
t2 = ConsumeFromTopicOperator(
kafka_config_id="t2",
task_id="consume_from_topic",
topics=["test_1"],
apply_function="example_dag_hello_kafka.consumer_function",
apply_function_kwargs={"prefix": "consumed:::"},
commit_cadence="end_of_batch",
max_messages=10,
max_batch_size=2,
)
參考¶
更多資訊,請參閱 Apache Kafka Consumer 文件。
ProduceToTopicOperator¶
一個將訊息生產到 Kafka 主題的 operator。此 operator 將生產由使用者提供的 producer_function
建立的鍵/值對訊息。
關於參數定義,請查看 ProduceToTopicOperator
。
使用此 operator¶
t1 = ProduceToTopicOperator(
kafka_config_id="t1-3",
task_id="produce_to_topic",
topic="test_1",
producer_function="example_dag_hello_kafka.producer_function",
)
參考¶
更多資訊,請參閱 Apache Kafka Producer 文件。