airflow.providers.google.cloud.sensors.pubsub

此模組包含 Google PubSub 感測器。

模組內容

類別

PubSubPullSensor

從 PubSub 訂閱中提取訊息並透過 XCom 傳遞。

exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[原始碼]

繼承自: airflow.exceptions.AirflowException

當訊息無法轉換 PubSub 接收格式時引發。

class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[原始碼]

繼承自: airflow.sensors.base.BaseSensorOperator

從 PubSub 訂閱中提取訊息並透過 XCom 傳遞。

總是等待至少一個訊息從訂閱中返回。

另請參閱

有關如何使用此運算子的更多資訊,請查看指南: 從 PubSub 訂閱中提取訊息

另請參閱

如果您不想等待至少一個訊息到來,請改用運算子: PubSubPullOperator

此感測器運算子將從指定的 PubSub 訂閱中提取最多 max_messages 條訊息。當訂閱返回訊息時,poke 方法的條件將被滿足,並且訊息將從運算子返回並透過 XCom 傳遞給下游任務。

如果 ack_messages 設定為 True,則訊息將在返回之前立即確認,否則,下游任務將負責確認它們。

如果您想要一個非阻塞任務,不需要等待訊息,請改用 PubSubPullOperator

project_idsubscription 是範本化的,因此您可以在其中使用變數。

參數
  • project_id (str) – 訂閱的 Google Cloud 專案 ID (已範本化)

  • subscription (str) – Pub/Sub 訂閱名稱。請勿包含完整的訂閱路徑。

  • max_messages (int) – 每個 PubSub pull 請求要檢索的最大訊息數

  • return_immediately (bool) – 如果此欄位設定為 true,即使在 Pull 回應中沒有可返回的訊息,系統也會立即回應。否則,系統可能會等待(在有限的時間內)直到至少有一條訊息可用,而不是返回沒有訊息。警告:不建議將此欄位設定為 true,因為它會對 Pull 操作的效能產生不利影響。我們建議使用者不要設定此欄位。

  • ack_messages (bool) – 如果為 True,則每個訊息將立即確認,而不是由任何下游任務確認

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可選)用於處理接收訊息的回調函數。其傳回值將儲存到 XCom。如果您正在提取大型訊息,您可能需要提供自訂的回調函數。如果未提供,預設實作將使用 google.protobuf.json_format.MessageToDict 函數將 ReceivedMessage 物件轉換為可 JSON 序列化的字典。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬,或鏈式帳戶列表,用於取得列表中最後一個帳戶的 access_token,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶(已範本化)。

  • deferrable (bool) – 以可延遲模式執行感測器

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[原始碼]
ui_color : '#ff7f50'[原始碼]
poke(context)[原始碼]

在衍生此類別時覆寫。

execute(context)[原始碼]

Airflow 在 worker 上執行此方法,如果 deferrable 為 True,則使用觸發器延遲。

execute_complete(context, event)[原始碼]

如果提供了 messages_callback,則執行它;否則,立即返回觸發器事件訊息。

此條目是否有幫助?