airflow.providers.google.cloud.operators.dataproc

此模組包含 Google Dataproc 運算子。

模組內容

類別

PreemptibilityType

包含適用於叢集每個次要工作節點的 Preemptibility 可能類型值。

InstanceSelection

定義機器類型和機器類型所屬的等級。

InstanceFlexibilityPolicy

Instance Flexibility Policy 允許混合 VM 形狀和佈建模型。

ClusterGenerator

建立新的 Dataproc 叢集。

DataprocCreateClusterOperator

在 Google Cloud Dataproc 上建立新的叢集。

DataprocScaleClusterOperator

在 Google Cloud Dataproc 上擴展或縮減叢集。

DataprocDeleteClusterOperator

刪除專案中的叢集。

DataprocStartClusterOperator

啟動專案中的叢集。

DataprocStopClusterOperator

停止專案中的叢集。

DataprocJobBaseOperator

在 DataProc 上啟動任務的運算子基底類別。

DataprocCreateWorkflowTemplateOperator

建立新的工作流程範本。

DataprocInstantiateWorkflowTemplateOperator

在 Google Cloud Dataproc 上實例化 WorkflowTemplate。

DataprocInstantiateInlineWorkflowTemplateOperator

在 Google Cloud Dataproc 上實例化 Inline WorkflowTemplate。

DataprocSubmitJobOperator

將任務提交到叢集。

DataprocUpdateClusterOperator

更新專案中的叢集。

DataprocDiagnoseClusterOperator

診斷專案中的叢集。

DataprocCreateBatchOperator

建立批次工作負載。

DataprocDeleteBatchOperator

刪除批次工作負載資源。

DataprocGetBatchOperator

取得批次工作負載資源表示法。

DataprocListBatchesOperator

列出批次工作負載。

DataprocCancelOperationOperator

取消批次工作負載資源。

class airflow.providers.google.cloud.operators.dataproc.PreemptibilityType[原始碼]

基底: enum.Enum

包含適用於叢集每個次要工作節點的 Preemptibility 可能類型值。

PREEMPTIBLE = 'PREEMPTIBLE'[原始碼]
SPOT = 'SPOT'[原始碼]
PREEMPTIBILITY_UNSPECIFIED = 'PREEMPTIBILITY_UNSPECIFIED'[原始碼]
NON_PREEMPTIBLE = 'NON_PREEMPTIBLE'[原始碼]
class airflow.providers.google.cloud.operators.dataproc.InstanceSelection[原始碼]

定義機器類型和機器類型所屬的等級。

google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceFlexibilityPolicy.InstanceSelection 的表示法。

參數
  • machine_types – 完整機器類型名稱,例如 “n1-standard-16”。

  • rank – 此執行個體選取的偏好順序。數字越小表示偏好順序越高。Dataproc 將首先嘗試根據優先順序最高的機器類型建立 VM,並根據可用性回退到下一個順序。具有相同優先順序的機器類型和執行個體選取具有相同的偏好順序。

machine_types: list[str][原始碼]
rank: int = 0[原始碼]
class airflow.providers.google.cloud.operators.dataproc.InstanceFlexibilityPolicy[原始碼]

Instance Flexibility Policy 允許混合 VM 形狀和佈建模型。

google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceFlexibilityPolicy 的表示法。

參數

instance_selection_list – 執行個體選取選項的清單,群組在建立新的 VM 時將使用這些選項。

instance_selection_list: list[InstanceSelection][原始碼]
class airflow.providers.google.cloud.operators.dataproc.ClusterGenerator(project_id, num_workers=None, min_num_workers=None, zone=None, network_uri=None, subnetwork_uri=None, internal_ip_only=None, tags=None, storage_bucket=None, init_actions_uris=None, init_action_timeout='10m', metadata=None, custom_image=None, custom_image_project_id=None, custom_image_family=None, image_version=None, autoscaling_policy=None, properties=None, optional_components=None, num_masters=1, master_machine_type='n1-standard-4', master_disk_type='pd-standard', master_disk_size=1024, master_accelerator_type=None, master_accelerator_count=None, worker_machine_type='n1-standard-4', worker_disk_type='pd-standard', worker_disk_size=1024, worker_accelerator_type=None, worker_accelerator_count=None, num_preemptible_workers=0, preemptibility=PreemptibilityType.PREEMPTIBLE.value, service_account=None, service_account_scopes=None, idle_delete_ttl=None, auto_delete_time=None, auto_delete_ttl=None, customer_managed_key=None, enable_component_gateway=False, driver_pool_size=0, driver_pool_id=None, secondary_worker_instance_flexibility_policy=None, secondary_worker_accelerator_type=None, secondary_worker_accelerator_count=None, **kwargs)[原始碼]

建立新的 Dataproc 叢集。

參數
  • cluster_name – 要建立的 DataProc 叢集名稱。(已套用範本)

  • project_id (str) – 要在其中建立叢集的 Google Cloud 專案 ID。(已套用範本)

  • num_workers (int | None) – 要啟動的工作節點數量。如果設定為零,將以單一節點模式啟動叢集

  • min_num_workers (int | None) – 要建立的主要工作執行個體的最小數量。如果從 num_workers 中建立的 VM 數量超過 min_num_workers,則將刪除失敗的 VM,叢集大小將調整為可用的 VM 並設定為 RUNNING 狀態。如果建立的 VM 數量少於 min_num_workers,則叢集將置於 ERROR 狀態。失敗的 VM 不會被刪除。

  • storage_bucket (str | None) – 要使用的儲存貯體,設定為 None 讓 dataproc 為您產生自訂的貯體

  • init_actions_uris (list[str] | None) – 包含 dataproc 初始化腳本的 GCS URI 清單

  • init_action_timeout (str) – init_actions_uris 中可執行腳本完成的時限

  • metadata (dict | None) – 要新增至所有執行個體的鍵值 Google Compute Engine 中繼資料項目字典

  • image_version (str | None) – Dataproc 叢集中軟體的版本

  • custom_image (str | None) – 自訂 Dataproc 映像檔,如需更多資訊,請參閱 https://cloud.google.com/dataproc/docs/guides/dataproc-images

  • custom_image_project_id (str | None) – 自訂 Dataproc 映像檔的專案 ID,如需更多資訊,請參閱 https://cloud.google.com/dataproc/docs/guides/dataproc-images

  • custom_image_family (str | None) – 自訂 Dataproc 映像檔的系列,可以使用 –family 旗標在建立自訂映像檔時提供系列名稱,如需更多資訊,請參閱 https://cloud.google.com/dataproc/docs/guides/dataproc-images

  • autoscaling_policy (str | None) – 叢集使用的自動擴充政策。只有包含專案 ID 和位置 (區域) 的資源名稱才有效。範例: projects/[projectId]/locations/[dataproc_region]/autoscalingPolicies/[policy_id]

  • properties (dict | None) – 要在組態檔 (例如 spark-defaults.conf) 上設定的屬性字典,請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig

  • optional_components (list[str] | None) – 選用叢集元件的清單,如需更多資訊,請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig#Component

  • num_masters (int) – 要啟動的主要節點數量

  • master_machine_type (str) – 主要節點要使用的 Compute Engine 機器類型

  • master_disk_type (str) – 主要節點的開機磁碟類型 (預設為 pd-standard)。有效值: pd-ssd (Persistent Disk Solid State Drive,永久磁碟固態硬碟) 或 pd-standard (Persistent Disk Hard Disk Drive,永久磁碟硬碟)。

  • master_disk_size (int) – 主要節點的磁碟大小

  • master_accelerator_type (str | None) – 要連接到主要節點的加速器卡 (GPU) 類型,請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig

  • master_accelerator_count (int | None) – 要連接到主要節點的加速器卡 (GPU) 數量

  • worker_machine_type (str) – 工作節點要使用的 Compute Engine 機器類型

  • worker_disk_type (str) – 工作節點的開機磁碟類型 (預設為 pd-standard)。有效值: pd-ssd (Persistent Disk Solid State Drive,永久磁碟固態硬碟) 或 pd-standard (Persistent Disk Hard Disk Drive,永久磁碟硬碟)。

  • worker_disk_size (int) – 工作節點的磁碟大小

  • worker_accelerator_type (str | None) – 要連接到工作節點的加速器卡 (GPU) 類型,請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig

  • worker_accelerator_count (int | None) – 要連接到工作節點的加速器卡 (GPU) 數量

  • num_preemptible_workers (int) – 叢集中作為次要工作節點的執行個體群組中的 VM 執行個體數量,預設啟用 Preemptibility。請注意,在一個叢集中無法混合使用非搶佔式和搶佔式次要工作節點。

  • preemptibility (str) – 適用於每個次要工作節點的 Preemptibility 類型,請參閱 https://cloud.google.com/dataproc/docs/reference/rpc/ google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceGroupConfig.Preemptibility

  • zone (str | None) – 叢集所在的區域。設定為 None 以自動選擇區域。(已套用範本)

  • network_uri (str | None) – 用於機器通訊的網路 URI,不能與 subnetwork_uri 一起指定

  • subnetwork_uri (str | None) – 用於機器通訊的子網路 URI,不能與 network_uri 一起指定

  • internal_ip_only (bool | None) – 如果為 true,則叢集中的所有執行個體都將只有內部 IP 位址。這只能針對啟用子網路的網路啟用

  • tags (list[str] | None) – 要新增至所有執行個體的 GCE 標籤

  • region – 建立 dataproc 叢集的指定區域。

  • gcp_conn_id – 用於連線至 Google Cloud 的連線 ID。

  • service_account (str | None) – dataproc 執行個體的服務帳戶。

  • service_account_scopes (list[str] | None) – 要包含的服務帳戶範圍 URI。

  • idle_delete_ttl (int | None) – 叢集在閒置狀態下保持運作的最長持續時間。超過此閾值將導致叢集自動刪除。持續時間以秒為單位。

  • auto_delete_time (datetime.datetime | None) – 叢集將自動刪除的時間。

  • auto_delete_ttl (int | None) – 叢集的生命週期,叢集將在此持續時間結束時自動刪除。持續時間以秒為單位。(如果設定 auto_delete_time,則將忽略此參數)

  • customer_managed_key (str | None) – 用於磁碟加密的客戶管理金鑰 projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME] # noqa

  • enable_component_gateway (bool | None) – 提供對叢集上預設和選定選用元件的 Web 介面的存取權。

  • driver_pool_size (int) – 節點群組中驅動程式節點的數量。

  • driver_pool_id (str | None) – 驅動程式集區的 ID。在叢集內必須是唯一的。使用此 ID 來識別未來操作中的驅動程式群組,例如調整節點群組大小。

  • secondary_worker_instance_flexibility_policy (InstanceFlexibilityPolicy | None) – Instance Flexibility Policy 允許混合 VM 形狀和佈建模型。

  • secondary_worker_accelerator_type (str | None) – 要連接到次要工作節點的加速器卡 (GPU) 類型,請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig

  • secondary_worker_accelerator_count (int | None) – 要連接到次要工作節點的加速器卡 (GPU) 數量

make()[原始碼]

作為更輕鬆移轉的輔助方法。

傳回

代表 Dataproc 叢集的字典。

class airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator(*, cluster_name, region, project_id=PROVIDE_PROJECT_ID, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, delete_on_error=True, use_if_exists=True, num_retries_if_resource_is_not_ready=0, retry=DEFAULT, timeout=1 * 60 * 60, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, **kwargs)[原始碼]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 Google Cloud Dataproc 上建立新的叢集。

此運算子將等待直到建立成功或建立過程中發生錯誤。

如果叢集已存在且 use_if_exists 為 True,則運算子將: - 如果叢集狀態為 ERROR,則在指定的情況下刪除它並引發錯誤 - 如果叢集狀態為 CREATING,則等待它,然後檢查 ERROR 狀態 - 如果叢集狀態為 DELETING,則等待它,然後建立新的叢集

請參閱 https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters

以取得不同參數的詳細說明。連結中詳細說明的許多組態參數都可用作此運算子的參數。

另請參閱

如需如何使用此運算子的更多資訊,請參閱指南: 建立叢集

參數
  • project_id (str) – 要在其中建立叢集的 Google Cloud 專案 ID。(已套用範本)

  • cluster_name (str) – 要建立的叢集名稱

  • labels (dict | None) – 將指派給已建立叢集的標籤。請注意,在 cluster_config 參數中將標籤新增至 ClusterConfig 物件,並不會導致將標籤新增至叢集。叢集的標籤只能透過將值傳遞至 DataprocCreateCluster 運算子的參數來設定。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 必要參數。要建立的叢集設定。如果提供的是 dict,則其格式必須與 protobuf 訊息 ClusterConfig 相同

  • virtual_cluster_config (dict | None) – 選用參數。虛擬叢集設定,用於建立不直接控制底層運算資源的 Dataproc 叢集,例如,建立 Dataproc-on-GKE 叢集 <https://cloud.google.com/dataproc/docs/concepts/jobs/dataproc-gke#create-a-dataproc-on-gke-cluster>

  • region (str) – 指定建立 dataproc 叢集的區域。

  • delete_on_error (bool) – 如果為 true,則在叢集以 ERROR 狀態建立時將會刪除。預設值為 true。

  • use_if_exists (bool) – 如果為 true,則使用現有叢集

  • num_retries_if_resource_is_not_ready (int) – 選用參數。當出現資源未就緒錯誤時,叢集建立請求的重試次數。

  • request_id (str | None) – 選用參數。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 DeleteClusterRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 google.longrunning.Operation

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault | google.api_core.retry.Retry) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float) – 等待請求完成的時間量 (以秒為單位)。請注意,如果指定 retry,則逾時適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查執行狀態的呼叫之間等待的時間 (秒)。

template_fields: collections.abc.Sequence[str] = ('project_id', 'region', 'cluster_config', 'virtual_cluster_config', 'cluster_name', 'labels',...[source]
template_fields_renderers[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event)[source]

充當觸發器觸發時的回呼 — 立即傳回。

依賴觸發器擲回例外狀況,否則會假設執行成功。

class airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator(*, cluster_name, project_id=PROVIDE_PROJECT_ID, region='global', num_workers=2, num_preemptible_workers=0, graceful_decommission_timeout=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 Google Cloud Dataproc 上擴展或縮減叢集。

運算子將等待直到叢集重新調整規模。

使用範例

t1 = DataprocClusterScaleOperator(
    task_id="dataproc_scale",
    project_id="my-project",
    cluster_name="cluster-1",
    num_workers=10,
    num_preemptible_workers=10,
    graceful_decommission_timeout="1h",
)

另請參閱

如需有關調整叢集規模的詳細資訊,請參閱參考資料: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters

參數
  • cluster_name (str) – 要調整規模的叢集名稱。(已套用範本)

  • project_id (str) – 叢集執行的 Google Cloud 專案 ID。(已套用範本)

  • region (str) – Dataproc 叢集的區域。(已套用範本)

  • num_workers (int) – 工作站的新數量

  • num_preemptible_workers (int) – 可搶佔式工作站的新數量

  • graceful_decommission_timeout (str | None) – YARN 正常終止的逾時時間。最大值為 1 天

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('cluster_name', 'project_id', 'region', 'impersonation_chain')[source]
execute(context)[source]

在 Google Cloud Dataproc 上擴展或縮減叢集。

class airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator(*, region, cluster_name, project_id=PROVIDE_PROJECT_ID, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=1 * 60 * 60, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

刪除專案中的叢集。

參數
  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域 (已套用範本)。

  • cluster_name (str) – 必要參數。叢集名稱 (已套用範本)。

  • project_id (str) – 選用參數。叢集所屬的 Google Cloud 專案 ID (已套用範本)。

  • cluster_uuid (str | None) – 選用參數。指定 cluster_uuid 表示如果具有指定 UUID 的叢集不存在,RPC 應該失敗。

  • request_id (str | None) – 選用參數。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 DeleteClusterRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 google.longrunning.Operation

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float) – 等待請求完成的時間量 (以秒為單位)。請注意,如果指定 retry,則逾時適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查叢集狀態的呼叫之間等待的時間 (秒)。

template_fields: collections.abc.Sequence[str] = ('project_id', 'region', 'cluster_name', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

充當觸發器觸發時的回呼 — 立即傳回。

依賴觸發器擲回例外狀況,否則會假設執行成功。

class airflow.providers.google.cloud.operators.dataproc.DataprocStartClusterOperator(*, cluster_name, region, project_id=PROVIDE_PROJECT_ID, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=1 * 60 * 60, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基礎類別: _DataprocStartStopClusterBaseOperator

啟動專案中的叢集。

execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocStopClusterOperator(*, cluster_name, region, project_id=PROVIDE_PROJECT_ID, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=1 * 60 * 60, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基礎類別: _DataprocStartStopClusterBaseOperator

停止專案中的叢集。

execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator(*, region, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', project_id=PROVIDE_PROJECT_ID, dataproc_properties=None, dataproc_jars=None, gcp_conn_id='google_cloud_default', labels=None, job_error_states=None, impersonation_chain=None, asynchronous=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 DataProc 上啟動任務的運算子基底類別。

參數
  • region (str) – 指定建立 dataproc 叢集的區域。

  • job_name (str) – DataProc 叢集中使用的工作名稱。此名稱預設為 task_id 附加執行資料,但可以套用範本。名稱將始終附加一個隨機數字,以避免名稱衝突。

  • cluster_name (str) – DataProc 叢集的名稱。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID,如果未指定,專案將從提供的 GCP 連線中推斷。

  • dataproc_properties (dict | None) – Hive 屬性的對應。非常適合放在預設引數中 (已套用範本)

  • dataproc_jars (list[str] | None) – 要新增至 Hive 伺服器和 Hadoop MapReduce (MR) 工作之 CLASSPATH 的 HCFS URI。可以包含 Hive SerDes 和 UDF。(已套用範本)

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • labels (dict | None) – 要與此工作建立關聯的標籤。標籤鍵必須包含 1 到 63 個字元,並且必須符合 RFC 1035。標籤值可以為空,但如果存在,則必須包含 1 到 63 個字元,並且必須符合 RFC 1035。一個工作最多可以關聯 32 個標籤。

  • job_error_states (set[str] | None) – 應視為錯誤狀態的工作狀態。此集合中的任何狀態都將導致引發錯誤並導致任務失敗。例如,如果 CANCELLED 狀態也應視為任務失敗,請傳入 {'ERROR', 'CANCELLED'}。目前可能的值僅為 'ERROR''CANCELLED',但未來可能會變更。預設為 {'ERROR'}

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • asynchronous (bool) – 提交工作至 Dataproc API 後傳回的旗標。這對於提交長時間執行的工作並使用 DataprocJobSensor 非同步等待它們非常有用

  • deferrable (bool) – 以可延遲模式執行運算子

  • polling_interval_seconds (int) – 輪詢工作完成之間的時間 (秒)。該值僅在以可延遲模式執行時才被考慮。必須大於 0。

變數

dataproc_job_id (str) – 提交至 Dataproc API 的實際 “jobId”。這對於識別或連結至 Google Cloud Console Dataproc UI 中的工作非常有用,因為提交至 Dataproc API 的實際 “jobId” 會附加一個 8 個字元的隨機字串。

job_type = ''[source]
create_job_template()[source]

使用預設值初始化 self.job_template

execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

充當觸發器觸發時的回呼 — 立即傳回。

依賴觸發器擲回例外狀況,否則會假設執行成功。

on_kill()[source]

充當在運算子被終止時呼叫的回呼;取消任何正在執行的工作。

class airflow.providers.google.cloud.operators.dataproc.DataprocCreateWorkflowTemplateOperator(*, template, region, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

建立新的工作流程範本。

參數
  • project_id (str) – 選用參數。叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • template (dict) – 要建立的 Dataproc 工作流程範本。如果提供 dict,則其格式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

template_fields: collections.abc.Sequence[str] = ('region', 'template')[source]
template_fields_renderers[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator(*, template_id, region, project_id=PROVIDE_PROJECT_ID, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, cancel_on_kill=True, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 Google Cloud Dataproc 上實例化 WorkflowTemplate。

此運算子將等待 WorkflowTemplate 執行完成。

參數
  • template_id (str) – 範本的 ID。(可使用範本)

  • project_id (str) – 範本在其中執行的 Google Cloud 專案 ID

  • region (str) – 指定建立 dataproc 叢集的區域。

  • parameters (dict[str, str] | None) – Dataproc 範本的參數地圖,格式為鍵值對:map (key: string, value: string) 範例:{ “date_from”: “2019-08-01”, “date_to”: “2019-08-02”}。值不得超過 100 個字元。請參閱:https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters

  • request_id (str | None) – 選用。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 SubmitJobRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 Job。建議始終將此值設定為 UUID。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查執行狀態的呼叫之間等待的時間 (秒)。

  • cancel_on_kill (bool) – 旗標,指示在呼叫 on_kill 時是否取消工作流程

template_fields: collections.abc.Sequence[str] = ('template_id', 'impersonation_chain', 'request_id', 'parameters')[source]
template_fields_renderers[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

作為觸發器啟動時的回呼。

這會立即傳回。它依賴觸發器擲回例外,否則它會假設執行成功。

on_kill()[source]

覆寫此方法以在任務執行個體被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈程序。

class airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator(*, template, region, project_id=PROVIDE_PROJECT_ID, request_id=None, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, cancel_on_kill=True, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 Google Cloud Dataproc 上實例化 Inline WorkflowTemplate。

此運算子將等待 WorkflowTemplate 執行完成。

另請參閱

如需如何使用此運算子的更多資訊,請查看指南:建立叢集

如需關於具現化內嵌 (inline) 的更多詳細資訊,請參閱參考資料:https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.workflowTemplates/instantiateInline

參數
  • template (dict) – 範本內容。(可使用範本)

  • project_id (str) – 範本在其中執行的 Google Cloud 專案 ID

  • region (str) – 指定建立 dataproc 叢集的區域。

  • parameters – Dataproc 範本的參數地圖,格式為鍵值對:map (key: string, value: string) 範例:{ “date_from”: “2019-08-01”, “date_to”: “2019-08-02”}。值不得超過 100 個字元。請參閱:https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters

  • request_id (str | None) – 選用。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 SubmitJobRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 Job。建議始終將此值設定為 UUID。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查執行狀態的呼叫之間等待的時間 (秒)。

  • cancel_on_kill (bool) – 旗標,指示在呼叫 on_kill 時是否取消工作流程

template_fields: collections.abc.Sequence[str] = ('template', 'impersonation_chain')[source]
template_fields_renderers[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

作為觸發器啟動時的回呼。

這會立即傳回。它依賴觸發器擲回例外,否則它會假設執行成功。

on_kill()[source]

覆寫此方法以在任務執行個體被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈程序。

class airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator(*, job, region, project_id=PROVIDE_PROJECT_ID, request_id=None, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, asynchronous=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, cancel_on_kill=True, wait_timeout=None, openlineage_inject_parent_job_info=conf.getboolean('openlineage', 'spark_inject_parent_job_info', fallback=False), **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

將任務提交到叢集。

參數
  • project_id (str) – 選用。工作所屬的 Google Cloud 專案 ID。

  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • job (dict) – 必要。工作資源。如果提供 dict,則其格式必須與 protobuf 訊息 Job 相同。如需支援的工作類型及其組態的完整清單,請參閱此處 https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs

  • request_id (str | None) – 選用。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 SubmitJobRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 Job。建議始終將此值設定為 UUID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) –

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • asynchronous (bool) – 在將工作提交到 Dataproc API 後傳回的旗標。這對於提交長時間執行的工作並使用 DataprocJobSensor 非同步等待它們非常有用

  • deferrable (bool) – 以可延遲模式執行運算子

  • polling_interval_seconds (int) – 輪詢工作完成之間的時間 (秒)。該值僅在以可延遲模式執行時才被考慮。必須大於 0。

  • cancel_on_kill (bool) – 旗標,指示在呼叫 on_kill 時是否取消 Hook 的工作

  • wait_timeout (int | None) – 等待工作準備就緒的秒數。僅在 asynchronous 為 False 時使用

template_fields: collections.abc.Sequence[str] = ('project_id', 'region', 'job', 'impersonation_chain', 'request_id')[source]
template_fields_renderers[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

作為觸發器啟動時的回呼。

這會立即傳回。它依賴觸發器擲回例外,否則它會假設執行成功。

on_kill()[source]

覆寫此方法以在任務執行個體被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈程序。

class airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator(*, cluster_name, cluster, update_mask, graceful_decommission_timeout, region, request_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新專案中的叢集。

參數
  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 選用參數。叢集所屬的 Google Cloud 專案 ID。

  • cluster_name (str) – 必要。叢集名稱。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) –

    必要。對叢集的變更。

    如果提供 dict,則其格式必須與 protobuf 訊息 Cluster 相同

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) – 必要。指定要更新的欄位路徑,相對於 Cluster。例如,若要將叢集中的工作站數量變更為 5,則 update_mask 參數將指定為 config.worker_config.num_instances,而 PATCH 請求主體將指定新值。如果提供 dict,則其格式必須與 protobuf 訊息 FieldMask 相同

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration) – 選用。YARN 正常解除委任的逾時。正常解除委任允許從叢集中移除節點,而不會中斷進行中的工作。Timeout 指定在強制移除節點(並可能中斷工作)之前,等待進行中的工作完成的時間長度。預設逾時為 0(用於強制解除委任),而允許的最大逾時為 1 天。

  • request_id (str | None) – 選用。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 google.long-running.Operation

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault | google.api_core.retry.Retry) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查執行狀態的呼叫之間等待的時間 (秒)。

template_fields: collections.abc.Sequence[str] = ('cluster_name', 'cluster', 'region', 'request_id', 'project_id', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event)[source]

充當觸發器觸發時的回呼 — 立即傳回。

依賴觸發器擲回例外狀況,否則會假設執行成功。

class airflow.providers.google.cloud.operators.dataproc.DataprocDiagnoseClusterOperator(*, region, cluster_name, project_id=PROVIDE_PROJECT_ID, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=1 * 60 * 60, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=10, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

診斷專案中的叢集。

操作完成後,回應會包含診斷輸出報告的 Cloud Storage URI,其中包含收集的診斷摘要。

參數
  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域 (已套用範本)。

  • project_id (str) – 選用參數。叢集所屬的 Google Cloud 專案 ID (已套用範本)。

  • cluster_name (str) – 必要參數。叢集名稱 (已套用範本)。

  • tarball_gcs_dir (str | None) – 診斷 tarball 的輸出 Cloud Storage 目錄。如果未指定,將使用叢集暫存儲存貯體中的特定工作目錄。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 應在叢集上執行診斷的時間間隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的工作清單。格式:projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的 yarn 應用程式清單。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果指定 None,則請求將不會重試。

  • timeout (float) – 等待請求完成的時間量 (以秒為單位)。請注意,如果指定 retry,則逾時適用於每次個別嘗試。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查叢集狀態的呼叫之間等待的時間 (秒)。

template_fields: collections.abc.Sequence[str] = ('project_id', 'region', 'cluster_name', 'impersonation_chain', 'tarball_gcs_dir',...[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

execute_complete(context, event=None)[source]

作為觸發器啟動時的回呼。

這會立即傳回。它依賴觸發器擲回例外,否則它會假設執行成功。

class airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator(*, region, project_id=PROVIDE_PROJECT_ID, batch, batch_id=None, request_id=None, num_retries_if_resource_is_not_ready=0, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, result_retry=DEFAULT, asynchronous=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), polling_interval_seconds=5, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

建立批次工作負載。

參數
  • project_id (str) – 選擇性。叢集所屬的 Google Cloud 專案 ID。(已套用範本)

  • region (str) – 必要。處理請求的 Cloud Dataproc 區域。(已套用範本)

  • batch (dict | google.cloud.dataproc_v1.Batch) – 必要。要建立的批次。(已套用範本)

  • batch_id (str | None) – 必要。用於批次的 ID,這將成為批次資源名稱的最後一個元件。此值必須為 4-63 個字元。有效字元為 /[a-z][0-9]-/。(已套用範本)

  • request_id (str | None) – 選擇性。用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 CreateBatchRequest 請求,則第二個請求將被忽略,並傳回後端建立和儲存的第一個 google.longrunning.Operation

  • num_retries_if_resource_is_not_ready (int) – 選用參數。當出現資源未就緒錯誤時,叢集建立請求的重試次數。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • result_retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault | google.api_core.retry.Retry) – 用於重試請求的結果重試物件。用於透過指定確切的執行秒數來減少 DAG 中執行鏈式任務之間的延遲。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • asynchronous (bool) – 在建立批次到 Dataproc API 後傳回的旗標。這對於建立長時間執行的批次並使用 DataprocBatchSensor 非同步等待它們非常有用

  • deferrable (bool) – 以可延遲模式執行運算子。

  • polling_interval_seconds (int) – 檢查執行狀態的呼叫之間等待的時間 (秒)。

template_fields: collections.abc.Sequence[str] = ('project_id', 'batch', 'batch_id', 'region', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

hook()[source]
execute_complete(context, event=None)[source]

作為觸發器啟動時的回呼。

這會立即傳回。它依賴觸發器擲回例外,否則它會假設執行成功。

on_kill()[source]

覆寫此方法以在任務執行個體被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈程序。

handle_batch_status(context, state, batch_id, state_message=None)[source]
retry_batch_creation(previous_batch_id)[source]
class airflow.providers.google.cloud.operators.dataproc.DataprocDeleteBatchOperator(*, batch_id, region, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

刪除批次工作負載資源。

參數
  • batch_id (str) – 必要。用於批次的 ID,這將成為批次資源名稱的最後一個元件。此值必須為 4-63 個字元。有效字元為 /[a-z][0-9]-/。

  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 選擇性。叢集所屬的 Google Cloud 專案 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('batch_id', 'region', 'project_id', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocGetBatchOperator(*, batch_id, region, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

取得批次工作負載資源表示法。

參數
  • batch_id (str) – 必要。用於批次的 ID,這將成為批次資源名稱的最後一個元件。此值必須為 4-63 個字元。有效字元為 /[a-z][0-9]-/。

  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 選擇性。叢集所屬的 Google Cloud 專案 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('batch_id', 'region', 'project_id', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocListBatchesOperator(*, region, project_id=PROVIDE_PROJECT_ID, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, filter=None, order_by=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

列出批次工作負載。

參數
  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 選擇性。叢集所屬的 Google Cloud 專案 ID。

  • page_size (int | None) – 選擇性。每個回應中傳回的批次數量上限。服務可能會傳回小於此值。預設頁面大小為 20;最大頁面大小為 1000。

  • page_token (str | None) – 選擇性。從先前的 ListBatches 呼叫收到的分頁符記。提供此符記以擷取後續頁面。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 選擇性,用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – 選擇性,請求完成的等待時間量(以秒為單位)。請注意,如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 選擇性,提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 選擇性,用於連線至 Google Cloud Platform 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • filter (str | None) – 結果篩選器,如 ListBatchesRequest 中所指定

  • order_by (str | None) – 如何排序結果,如 ListBatchesRequest 中所指定

template_fields: collections.abc.Sequence[str] = ('region', 'project_id', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.dataproc.DataprocCancelOperationOperator(*, operation_name, region, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT, timeout=None, metadata=(), gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

取消批次工作負載資源。

參數
  • operation_name (str) – 必要。要取消的操作資源名稱。

  • region (str) – 必要參數。要處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 選擇性。叢集所屬的 Google Cloud 專案 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的 retry 物件。如果指定 None,則請求將不會重試。

  • timeout (float | None) – 等待請求完成的秒數。請注意,如果指定 retry,則 timeout 適用於每次個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,清單中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('operation_name', 'region', 'project_id', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多上下文資訊。

這個條目是否有幫助?