airflow.providers.google.cloud.operators.pubsub
¶
此模組包含 Google PubSub 運算子。
模組內容¶
類別¶
建立 PubSub 主題。 |
|
建立 PubSub 訂閱。 |
|
刪除 PubSub 主題。 |
|
刪除 PubSub 訂閱。 |
|
發布訊息至 PubSub 主題。 |
|
從 PubSub 訂閱提取訊息,並透過 XCom 傳遞它們。 |
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, gcp_conn_id='google_cloud_default', labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
建立 PubSub 主題。
參閱
關於如何使用此運算子的更多資訊,請查看指南: 建立 PubSub 主題
預設情況下,如果主題已存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic >> create_topic_again
可以將運算子配置為在主題已存在時失敗。
with DAG("failing DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator( project_id="my-project", topic="my_new_topic", fail_if_exists=True ) create_topic >> create_topic_again
project_id
和topic
都是範本化的,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – (選填) Google Cloud 專案 ID,將在其中建立主題。如果設定為 None 或遺失,則會使用來自 Google Cloud 連線的預設 project_id。
topic (str) – 要建立的主題。請勿包含完整的主題路徑。換句話說,請僅提供
{topic}
,而不是projects/{project}/topics/{topic}
。(已範本化)gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
labels (dict[str, str] | None) – 客戶端指定的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels
message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制發布到主題的訊息可以儲存的 Google Cloud 區域集合的策略。如果不存在,則不會生效任何限制。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
kms_key_name (str | None) – Cloud KMS CryptoKey 的資源名稱,用於保護對此主題上發布的訊息的存取。預期格式為
projects/*/locations/*/keyRings/*/cryptoKeys/*
。retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定 None,則不會重試請求。
timeout (float | None) – (選填) 等待請求完成的秒數。請注意,如果指定了 retry,則逾時適用於每次個別嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他元數據。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
建立 PubSub 訂閱。
參閱
關於如何使用此運算子的更多資訊,請查看指南: 建立 PubSub 訂閱
預設情況下,訂閱將在
project_id
中建立。如果指定了subscription_project_id
且 Google Cloud 憑證允許,則可以在與其主題不同的專案中建立訂閱。預設情況下,如果訂閱已存在,此運算子不會導致 DAG 失敗。但是,主題必須存在於專案中。
with DAG("successful DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription >> create_subscription_again
可以將運算子配置為在訂閱已存在時失敗。
with DAG("failing DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True ) create_subscription >> create_subscription_again
最後,訂閱不是必需的。如果未傳遞,運算子將為訂閱名稱產生通用唯一識別碼。
with DAG("DAG") as dag: PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")
project_id
、topic
、subscription
、subscription_project_id
和impersonation_chain
都是範本化的,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – (選填) Google Cloud 專案 ID,主題存在於其中。如果設定為 None 或遺失,則會使用來自 Google Cloud 連線的預設 project_id。
topic (str) – 要建立的主題。請勿包含完整的主題路徑。換句話說,請僅提供
{topic}
,而不是projects/{project}/topics/{topic}
。(已範本化)subscription (str | None) – Pub/Sub 訂閱名稱。如果為空,將使用 uuid 模組產生隨機名稱
subscription_project_id (str | None) – Google Cloud 專案 ID,訂閱將在其中建立。如果為空,將使用
topic_project
。ack_deadline_secs (int) – 訂閱者必須確認從訂閱中提取的每則訊息的秒數
gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此訂閱使用推送傳遞,則此欄位用於配置它。空的
pushConfig
表示訂閱者將使用 API 方法提取和確認訊息。retain_acked_messages (bool | None) – 指示是否保留已確認的訊息。如果為 true,則即使訊息已被確認,訊息也不會從訂閱的待辦項目中清除,直到它們超出
message_retention_duration
視窗。如果您想要 Seek 到時間戳記,則這必須為 true。message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 從訊息發布的那一刻起,在訂閱的待辦項目中保留未確認訊息的時間長度。如果
retain_acked_messages
為 true,則這也會配置已確認訊息的保留,因此配置Seek
可以回溯多遠的時間。預設值為 7 天。不能超過 7 天或少於 10 分鐘。labels (dict[str, str] | None) – 客戶端指定的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels
enable_message_ordering (bool) – 如果為 true,則在 PubsubMessage 中使用相同 ordering_key 發布的訊息將按照 Pub/Sub 系統接收它們的順序傳遞給訂閱者。否則,它們可能會以任何順序傳遞。
expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 一項策略,指定此訂閱到期的條件。只要任何連線的訂閱者成功地從訂閱中取用訊息或正在對訂閱發出操作,訂閱就會被視為有效。如果未設定 expiration_policy,則將使用 ttl 為 31 天的預設策略。expiration_policy.ttl 的最小允許值為 1 天。
filter – 以 Cloud Pub/Sub 過濾器語言撰寫的運算式。如果為非空值,則只有屬性欄位符合過濾器的 PubsubMessages 才會在此訂閱上傳遞。如果為空值,則不會過濾掉任何訊息。
dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 一項策略,指定在此訂閱中死信訊息的條件。如果未設定 dead_letter_policy,則會停用死信處理。
retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 一項策略,指定 Pub/Sub 如何重試此訂閱的訊息傳遞。如果未設定,則會套用預設重試策略。這通常表示對於健康的訂閱者,訊息將盡快重試。RetryPolicy 將在給定訊息的 NACK 或確認期限超出事件時觸發。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定 None,則不會重試請求。
timeout (float | None) – (選填) 等待請求完成的秒數。請注意,如果指定了 retry,則逾時適用於每次個別嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他元數據。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
刪除 PubSub 主題。
參閱
關於如何使用此運算子的更多資訊,請查看指南: 刪除 PubSub 主題
預設情況下,如果主題不存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")
可以將運算子配置為在主題不存在時失敗。
with DAG("failing DAG") as dag: PubSubDeleteTopicOperator( project_id="my-project", topic="non_existing_topic", fail_if_not_exists=True, )
project_id
和topic
都是範本化的,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – (選填) 要在其中工作的 Google Cloud 專案 ID(已範本化)。如果設定為 None 或遺失,則會使用來自 Google Cloud 連線的預設 project_id。
topic (str) – 要刪除的主題。請勿包含完整的主題路徑。換句話說,請僅提供
{topic}
,而不是projects/{project}/topics/{topic}
。(已範本化)fail_if_not_exists (bool) – 如果為 True 且主題不存在,則任務失敗
gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定 None,則不會重試請求。
timeout (float | None) – (選填) 等待請求完成的秒數。請注意,如果指定了 retry,則逾時適用於每次個別嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他元數據。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
刪除 PubSub 訂閱。
參閱
關於如何使用此運算子的更多資訊,請查看指南: 刪除 PubSub 訂閱
預設情況下,如果訂閱不存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")
可以將運算子配置為在訂閱已存在時失敗。
with DAG("failing DAG") as dag: PubSubDeleteSubscriptionOperator( project_id="my-project", subscription="non-existing", fail_if_not_exists=True, )
project_id
和subscription
都是範本化的,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – (選填) 要在其中工作的 Google Cloud 專案 ID(已範本化)。如果設定為 None 或遺失,則會使用來自 Google Cloud 連線的預設 project_id。
subscription (str) – 要刪除的訂閱。請勿包含完整的訂閱路徑。換句話說,請僅提供
{subscription}
,而不是projects/{project}/subscription/{subscription}
。(已範本化)fail_if_not_exists (bool) – 如果為 True 且訂閱不存在,則任務失敗
gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (選填) 用於重試請求的重試物件。如果未指定 None,則不會重試請求。
timeout (float | None) – (選填) 等待請求完成的秒數。請注意,如果指定了 retry,則逾時適用於每次個別嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (選填) 提供給方法的其他元數據。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', enable_message_ordering=False, impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
發布訊息至 PubSub 主題。
參閱
如需更多關於如何使用此運算子的資訊,請參閱指南:發布 PubSub 訊息
每個任務都會將所有提供的訊息發布到單一 Google Cloud 專案中的相同主題。如果主題不存在,此任務將會失敗。
m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}} m2 = {"data": b"Knock, knock"} m3 = {"attributes": {"foo": ""}} m4 = {"data": b"Who's there?", "attributes": {"ordering_key": "knock_knock"}} t1 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m1, m2, m3], create_topic=True, dag=dag, ) t2 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m4], create_topic=True, enable_message_ordering=True, dag=dag, )
project_id
、topic
和messages
皆已範本化,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – (選填) 要在其中工作的 Google Cloud 專案 ID(已範本化)。如果設定為 None 或遺失,則會使用來自 Google Cloud 連線的預設 project_id。
topic (str) – 要發布訊息的主題。請勿包含完整主題路徑。換句話說,請僅提供
{topic}
,而不是projects/{project}/topics/{topic}
。(已範本化)messages (list) – 要發布到主題的訊息清單。每個訊息都是一個字典,包含以下一或多個鍵值對應:* ‘data’:位元組字串 (utf-8 編碼) * ‘attributes’:{‘key1’: ‘value1’, …}。每個訊息必須至少包含一個非空的 ‘data’ 值或至少包含一個鍵的屬性字典(已範本化)。請參閱 https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
enable_message_ordering (bool) – 若為 true,則在 PubsubMessage 中使用相同 ordering_key 發布的訊息,將會按照 Pub/Sub 系統接收的順序傳遞給訂閱者。否則,它們可能會以任何順序傳遞。預設值為 False。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'messages', 'enable_message_ordering', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
基底類別:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
從 PubSub 訂閱提取訊息,並透過 XCom 傳遞它們。
如果佇列是空的,則會傳回空清單 - 永遠不會等待訊息。如果您確實需要等待,請改用
airflow.providers.google.cloud.sensors.PubSubPullSensor
。參閱
如需更多關於如何使用此運算子和 PubSubPullSensor 的資訊,請參閱指南:從 PubSub 訂閱中拉取訊息
此運算子將從指定的 PubSub 訂閱中拉取最多
max_messages
則訊息。當訂閱傳回訊息時,訊息將會立即從運算子傳回,並通過 XCom 傳遞以供下游任務使用。如果
ack_messages
設定為 True,訊息將會在傳回之前立即應答,否則,下游任務將負責應答這些訊息。project_id
和subscription
皆已範本化,因此您可以在其值中使用 Jinja 範本。- 參數
project_id (str) – 訂閱的 Google Cloud 專案 ID(已範本化)
subscription (str) – Pub/Sub 訂閱名稱。請勿包含完整訂閱路徑。
max_messages (int) – 每次 PubSub 拉取請求要檢索的最大訊息數
ack_messages (bool) – 若為 True,則每個訊息將會立即應答,而不是由任何下游任務應答
gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (選用)用於處理接收訊息的回呼函數。其回傳值將會儲存到 XCom。如果您正在拉取大型訊息,您可能需要提供自訂回呼函數。如果未提供,預設實作將會使用 google.protobuf.json_format.MessageToDict 函數將 ReceivedMessage 物件轉換為可序列化為 JSON 的字典。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已範本化)。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶