airflow.providers.google.cloud.sensors.pubsub
¶
此模組包含 Google PubSub 感測器。
模組內容¶
類別¶
從 PubSub 訂閱中提取訊息並透過 XCom 傳遞。 |
- exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[原始碼]¶
繼承自:
airflow.exceptions.AirflowException
當訊息無法轉換 PubSub 接收格式時引發。
- class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[原始碼]¶
繼承自:
airflow.sensors.base.BaseSensorOperator
從 PubSub 訂閱中提取訊息並透過 XCom 傳遞。
總是等待至少一個訊息從訂閱中返回。
另請參閱
有關如何使用此運算子的更多資訊,請查看指南: 從 PubSub 訂閱中提取訊息
另請參閱
如果您不想等待至少一個訊息到來,請改用運算子:
PubSubPullOperator
此感測器運算子將從指定的 PubSub 訂閱中提取最多
max_messages
條訊息。當訂閱返回訊息時,poke 方法的條件將被滿足,並且訊息將從運算子返回並透過 XCom 傳遞給下游任務。如果
ack_messages
設定為 True,則訊息將在返回之前立即確認,否則,下游任務將負責確認它們。如果您想要一個非阻塞任務,不需要等待訊息,請改用
PubSubPullOperator
。project_id
和subscription
是範本化的,因此您可以在其中使用變數。- 參數
project_id (str) – 訂閱的 Google Cloud 專案 ID (已範本化)
subscription (str) – Pub/Sub 訂閱名稱。請勿包含完整的訂閱路徑。
max_messages (int) – 每個 PubSub pull 請求要檢索的最大訊息數
return_immediately (bool) – 如果此欄位設定為 true,即使在
Pull
回應中沒有可返回的訊息,系統也會立即回應。否則,系統可能會等待(在有限的時間內)直到至少有一條訊息可用,而不是返回沒有訊息。警告:不建議將此欄位設定為true
,因為它會對Pull
操作的效能產生不利影響。我們建議使用者不要設定此欄位。ack_messages (bool) – 如果為 True,則每個訊息將立即確認,而不是由任何下游任務確認
gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可選)用於處理接收訊息的回調函數。其傳回值將儲存到 XCom。如果您正在提取大型訊息,您可能需要提供自訂的回調函數。如果未提供,預設實作將使用 google.protobuf.json_format.MessageToDict 函數將 ReceivedMessage 物件轉換為可 JSON 序列化的字典。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬,或鏈式帳戶列表,用於取得列表中最後一個帳戶的 access_token,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶(已範本化)。
deferrable (bool) – 以可延遲模式執行感測器
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[原始碼]¶