Apache Kafka 感測器

AwaitMessageSensor

一個感測器,會延遲直到特定訊息發布到 Kafka 主題。此感測器將建立一個消費者,從 Kafka 主題讀取訊息,直到找到符合在 apply_function 參數中定義的條件的訊息。如果 apply_function 返回任何資料,則會引發 TriggerEvent,且 AwaitMessageSensor 會成功完成。

有關參數定義,請參閱 AwaitMessageSensor

使用感測器

tests/system/apache/kafka/example_dag_hello_kafka.py[原始碼]

t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)

參考

如需更多資訊,請參閱 Apache Kafka Consumer 文件

AwaitMessageTriggerFunctionSensor

與上述的 AwaitMessageSensor 類似,此感測器將延遲,直到它從 Kafka 主題消費符合其 apply_function 條件的訊息。一旦遇到正面事件,AwaitMessageTriggerFunctionSensor 將觸發提供給 event_triggered_function 的可調用物件。之後,感測器將再次延遲,繼續消費訊息。

有關參數定義,請參閱 AwaitMessageTriggerFunctionSensor

使用感測器

tests/system/apache/kafka/example_dag_event_listener.py[原始碼]

listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)

參考

如需更多資訊,請參閱 Apache Kafka Consumer 文件

此條目是否有幫助?