Apache Kafka 感測器¶
AwaitMessageSensor¶
一個感測器,會延遲直到特定訊息發布到 Kafka 主題。此感測器將建立一個消費者,從 Kafka 主題讀取訊息,直到找到符合在 apply_function
參數中定義的條件的訊息。如果 apply_function
返回任何資料,則會引發 TriggerEvent
,且 AwaitMessageSensor
會成功完成。
有關參數定義,請參閱 AwaitMessageSensor
。
使用感測器¶
t5 = AwaitMessageSensor(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
參考¶
如需更多資訊,請參閱 Apache Kafka Consumer 文件。
AwaitMessageTriggerFunctionSensor¶
與上述的 AwaitMessageSensor
類似,此感測器將延遲,直到它從 Kafka 主題消費符合其 apply_function
條件的訊息。一旦遇到正面事件,AwaitMessageTriggerFunctionSensor
將觸發提供給 event_triggered_function
的可調用物件。之後,感測器將再次延遲,繼續消費訊息。
有關參數定義,請參閱 AwaitMessageTriggerFunctionSensor
。
使用感測器¶
listen_for_message = AwaitMessageTriggerFunctionSensor(
kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=wait_for_event,
)
參考¶
如需更多資訊,請參閱 Apache Kafka Consumer 文件。