Apache Kafka Operators

ConsumeFromTopicOperator

一個從一個或多個 Kafka 主題消費並處理訊息的 operator。此 operator 建立一個 Kafka Consumer,從叢集讀取一批訊息,並使用使用者提供的可呼叫 apply_function 處理它們。Consumer 將持續分批讀取,直到到達日誌末端或達到最大訊息讀取數量 (max_messages)。

關於參數定義,請查看 ConsumeFromTopicOperator

使用此 operator

tests/system/apache/kafka/example_dag_hello_kafka.py[原始碼]

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

參考

更多資訊,請參閱 Apache Kafka Consumer 文件

ProduceToTopicOperator

一個將訊息生產到 Kafka 主題的 operator。此 operator 將生產由使用者提供的 producer_function 建立的鍵/值對訊息。

關於參數定義,請查看 ProduceToTopicOperator

使用此 operator

tests/system/apache/kafka/example_dag_hello_kafka.py[原始碼]

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

參考

更多資訊,請參閱 Apache Kafka Producer 文件

此條目是否有幫助?