airflow.providers.google.cloud.hooks.pubsub

此模組包含 Google Pub/Sub Hook。

模組內容

類別

PubSubHook

用於存取 Google Pub/Sub 的 Hook。

PubSubAsyncHook

用於取得 Google Cloud PubSub 非同步 Hook 的類別。

exception airflow.providers.google.cloud.hooks.pubsub.PubSubException[原始碼]

基礎類別: Exception

Exception 的別名。

class airflow.providers.google.cloud.hooks.pubsub.PubSubHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, enable_message_ordering=False, **kwargs)[原始碼]

基礎類別: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

用於存取 Google Pub/Sub 的 Hook。

執行動作的 Google Cloud 專案由 gcp_conn_id 參考的連線中嵌入的專案決定。

get_conn()[原始碼]

取得 Google Cloud Pub/Sub 的連線。

返回值

Google Cloud Pub/Sub client 物件。

返回類型

google.cloud.pubsub_v1.PublisherClient

subscriber_client()[原始碼]

建立 SubscriberClient。

返回值

Google Cloud Pub/Sub client 物件。

返回類型

google.cloud.pubsub_v1.SubscriberClient

publish(topic, messages, project_id=PROVIDE_PROJECT_ID)[原始碼]

發布訊息到 Pub/Sub 主題。

參數
  • topic (str) – 要發布訊息的 Pub/Sub 主題;請勿包含 projects/{project}/topics/ 前綴。

  • messages (list[dict]) – 要發布的訊息;如果訊息中的 data 欄位已設定,則應為位元組字串 (utf-8 編碼) https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage

  • project_id (str) – (選填) 要在其中發布訊息的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

create_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

建立 Pub/Sub 主題,如果主題尚不存在。

參數
  • topic (str) – 要建立的 Pub/Sub 主題名稱;請勿包含 projects/{project}/topics/ 前綴。

  • project_id (str) – (選填) 要在其中建立主題的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_exists (bool) – 如果設定,則在主題已存在時引發例外。

  • labels (dict[str, str] | None) – 客戶端指定的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels

  • message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制訊息可儲存的 Google Cloud 區域集合的策略。如果不存在,則不生效任何限制。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]

  • kms_key_name (str | None) – 用於保護對此主題上發布的訊息之存取權的 Cloud KMS CryptoKey 資源名稱。預期格式為 projects/*/locations/*/keyRings/*/cryptoKeys/*。

  • schema_settings (dict | google.cloud.pubsub_v1.types.SchemaSettings) – (選填) 用於驗證針對現有結構描述發布的訊息之設定。預期格式為 projects/*/schemas/*。

  • message_retention_duration (str | None) – (選填) 指出在訊息發布到主題後保留訊息的最短持續時間。預期格式為秒數,最多九位小數,並以 ‘s’ 結尾。範例:“3.5s”。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

delete_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[原始碼]

刪除 Pub/Sub 主題,如果主題存在。

參數
  • topic (str) – 要刪除的 Pub/Sub 主題名稱;請勿包含 projects/{project}/topics/ 前綴。

  • project_id (str) – (選填) 要在其中刪除主題的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_not_exists (bool) – 如果設定,則在主題不存在時引發例外。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

create_subscription(topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

建立 Pub/Sub 訂閱,如果訂閱尚不存在。

參數
  • topic (str) – 訂閱將綁定以建立的 Pub/Sub 主題名稱;請勿包含 projects/{project}/subscriptions/ 前綴。

  • project_id (str) – (選填) 訂閱將綁定的主題之 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • subscription (str | None) – Pub/Sub 訂閱名稱。如果為空,將使用 uuid 模組產生隨機名稱。

  • subscription_project_id (str | None) – 將在其中建立訂閱的 Google Cloud 專案 ID。如果未指定,將使用 project_id

  • ack_deadline_secs (int) – 訂閱者必須確認從訂閱中提取的每則訊息之秒數。

  • fail_if_exists (bool) – 如果設定,則在主題已存在時引發例外。

  • push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此訂閱使用 push 傳遞,則此欄位用於設定它。空的 pushConfig 表示訂閱者將使用 API 方法提取和確認訊息。

  • retain_acked_messages (bool | None) – 指出是否保留已確認的訊息。如果為 true,即使訊息已確認,訊息也不會從訂閱的待辦項目中移除,直到它們超出 message_retention_duration 期限。如果您想要 Seek 到時間戳記,則必須為 true。

  • message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 從訊息發布時起,在訂閱的待辦項目中保留未確認訊息多久。如果 retain_acked_messages 為 true,則這也會設定已確認訊息的保留期限,並因此設定 Seek 可以回溯多遠的時間。預設為 7 天。不能超過 7 天或少於 10 分鐘。

  • labels (dict[str, str] | None) – 客戶端指定的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels

  • enable_message_ordering (bool) – 如果為 true,則在 PubsubMessage 中以相同 ordering_key 發布的訊息將以 Pub/Sub 系統接收它們的順序傳遞給訂閱者。否則,它們可能會以任何順序傳遞。

  • expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此訂閱到期條件的策略。只要任何連線的訂閱者成功地從訂閱中取用訊息或正在對訂閱發出操作,訂閱即被視為有效。如果未設定 expiration_policy,則將使用 ttl 為 31 天的預設策略。expiration_policy.ttl 的最小允許值為 1 天。

  • filter – 以 Cloud Pub/Sub 篩選器語言撰寫的運算式。如果非空,則只有 attributes 欄位符合篩選器的 PubsubMessages 會在此訂閱上傳遞。如果為空,則不會篩選掉任何訊息。

  • dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定在此訂閱中 dead-letter 訊息條件的策略。如果未設定 dead_letter_policy,則停用 dead-letter 功能。

  • retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何重試此訂閱的訊息傳遞之策略。如果未設定,則套用預設重試策略。這通常表示將盡快為健康的訂閱者重試訊息。RetryPolicy 將在特定訊息的 NACK 或確認期限超出事件時觸發。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

返回值

訂閱名稱,如果未提供 subscription 參數,則此名稱將為系統產生的值。

返回類型

str

delete_subscription(subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[原始碼]

刪除 Pub/Sub 訂閱,如果訂閱存在。

參數
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;請勿包含 projects/{project}/subscriptions/ 前綴。

  • project_id (str) – (選填) 訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_not_exists (bool) – 如果設定,則在主題不存在時引發例外。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[原始碼]

從 Pub/Sub 訂閱中提取最多 max_messages 則訊息。

參數
  • subscription (str) – 要從中提取訊息的 Pub/Sub 訂閱名稱;請勿包含 ‘projects/{project}/subscriptions/’ 前綴。

  • max_messages (int) – 從 Pub/Sub API 返回的最大訊息數。

  • project_id (str) – (選填) 訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • return_immediately (bool) – 如果設定,則在沒有可用訊息時,Pub/Sub API 將立即返回。否則,請求將會封鎖一段未公開但有界限的時間。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

返回值

Pub/Sub ReceivedMessage 物件的列表,每個物件都包含 ackId 屬性和 message 屬性,其中包括 base64 編碼的訊息內容。請參閱 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回類型

list[google.cloud.pubsub_v1.types.ReceivedMessage]

acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

確認與 Pub/Sub 訂閱中 ack_ids 相關聯的訊息。

參數
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;請勿包含 ‘projects/{project}/subscriptions/’ 前綴。

  • ack_ids (list[str] | None) – 來自先前 pull 回應的 ReceivedMessage ackIds 列表。與 messages 參數互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要確認的 ReceivedMessage 物件列表。與 ack_ids 參數互斥。

  • project_id (str) – (選填) 訂閱所在的 Google Cloud 專案名稱或 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

class airflow.providers.google.cloud.hooks.pubsub.PubSubAsyncHook(project_id=PROVIDE_PROJECT_ID, **kwargs)[原始碼]

基礎類別: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

用於取得 Google Cloud PubSub 非同步 Hook 的類別。

sync_hook_class[原始碼]
async acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

確認與 Pub/Sub 訂閱中 ack_ids 相關聯的訊息。

參數
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;請勿包含 ‘projects/{project}/subscriptions/’ 前綴。

  • ack_ids (list[str] | None) – 來自先前 pull 回應的 ReceivedMessage ackIds 列表。與 messages 參數互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要確認的 ReceivedMessage 物件列表。與 ack_ids 參數互斥。

  • project_id (str) – (選填) 訂閱所在的 Google Cloud 專案名稱或 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定,請求將不會重試。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

async pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[source]

從 Pub/Sub 訂閱中提取最多 max_messages 則訊息。

參數
  • subscription (str) – 要從中提取訊息的 Pub/Sub 訂閱名稱;請勿包含 ‘projects/{project}/subscriptions/’ 前綴。

  • max_messages (int) – 從 Pub/Sub API 返回的最大訊息數。

  • project_id (str) – (選填) 訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。

  • return_immediately (bool) – 如果設定,則在沒有可用訊息時,Pub/Sub API 將立即返回。否則,請求將會封鎖一段未公開但有界限的時間。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定,請求將不會重試。

  • timeout (float | None) – (選填) 等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他 metadata。

返回值

Pub/Sub ReceivedMessage 物件的列表,每個物件都包含 ackId 屬性和 message 屬性,其中包括 base64 編碼的訊息內容。請參閱 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回類型

list[google.cloud.pubsub_v1.types.ReceivedMessage]

這篇文章對您有幫助嗎?