Google Cloud PubSub 運算子

Google Cloud Pub/Sub 是一項全託管的即時訊息服務,可讓您在獨立應用程式之間傳送和接收訊息。您可以運用 Cloud Pub/Sub 的彈性來解耦託管在 Google Cloud 或網際網路上其他位置的系統和組件。

發布者應用程式可以將訊息傳送到主題,而其他應用程式可以訂閱該主題以接收訊息。透過解耦發送者和接收者,Google Cloud PubSub 允許開發人員在獨立編寫的應用程式之間進行通訊。

先決條件任務

若要使用這些運算子,您必須執行幾項操作

建立 PubSub 主題

PubSub 主題是一個具名的資源,訊息發布者會將訊息傳送到該資源。 PubSubCreateTopicOperator 運算子會建立主題。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )

建立 PubSub 訂閱

Subscription 是一個具名的資源,代表來自單一特定主題的訊息流,將傳遞到訂閱應用程式。 PubSubCreateSubscriptionOperator 運算子會建立訂閱。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )

發布 PubSub 訊息

Message 是資料和(選擇性)屬性的組合,發布者將其傳送到主題,最終傳遞給訂閱者。 PubSubPublishMessageOperator 運算子將發布訊息。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )

從 PubSub 訂閱提取訊息

PubSubPullSensor 感測器從 PubSub 訂閱提取訊息,並透過 XCom 傳遞它們。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

此外,對於此操作,您可以使用可延遲模式的感測器

tests/system/google/cloud/pubsub/example_pubsub_deferrable.py[原始碼]

pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]


    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

若要從 XCom 提取訊息,請使用 BashOperator

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

刪除 PubSub 訂閱

PubSubDeleteSubscriptionOperator 運算子會刪除訂閱。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )

刪除 PubSub 主題

PubSubDeleteTopicOperator 運算子會刪除主題。

tests/system/google/cloud/pubsub/example_pubsub.py[原始碼]

    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)

參考

如需更多資訊,請查看

此條目是否有幫助?