Google Cloud PubSub 運算子¶
Google Cloud Pub/Sub 是一項全託管的即時訊息服務,可讓您在獨立應用程式之間傳送和接收訊息。您可以運用 Cloud Pub/Sub 的彈性來解耦託管在 Google Cloud 或網際網路上其他位置的系統和組件。
發布者應用程式可以將訊息傳送到主題,而其他應用程式可以訂閱該主題以接收訊息。透過解耦發送者和接收者,Google Cloud PubSub 允許開發人員在獨立編寫的應用程式之間進行通訊。
先決條件任務¶
若要使用這些運算子,您必須執行幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'如需詳細資訊,請參閱 安裝。
建立 PubSub 主題¶
PubSub 主題是一個具名的資源,訊息發布者會將訊息傳送到該資源。 PubSubCreateTopicOperator
運算子會建立主題。
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
)
建立 PubSub 訂閱¶
Subscription
是一個具名的資源,代表來自單一特定主題的訊息流,將傳遞到訂閱應用程式。 PubSubCreateSubscriptionOperator
運算子會建立訂閱。
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
)
發布 PubSub 訊息¶
Message
是資料和(選擇性)屬性的組合,發布者將其傳送到主題,最終傳遞給訂閱者。 PubSubPublishMessageOperator
運算子將發布訊息。
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=PROJECT_ID,
topic=TOPIC_ID,
messages=[MESSAGE, MESSAGE],
)
從 PubSub 訂閱提取訊息¶
PubSubPullSensor
感測器從 PubSub 訂閱提取訊息,並透過 XCom 傳遞它們。
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
此外,對於此操作,您可以使用可延遲模式的感測器
pull_messages_async = PubSubPullSensor(
task_id="pull_messages_async",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
deferrable=True,
)
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages_operator",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
若要從 XCom 提取訊息,請使用 BashOperator
。
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
刪除 PubSub 訂閱¶
PubSubDeleteSubscriptionOperator
運算子會刪除訂閱。
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=PROJECT_ID,
subscription=subscription,
)
刪除 PubSub 主題¶
PubSubDeleteTopicOperator
運算子會刪除主題。
delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)