airflow.providers.google.cloud.hooks.dataproc

此模組包含 Google Cloud Dataproc hook。

模組內容

類別

DataProcJobBuilder

用於建構 Dataproc 任務的輔助類別。

DataprocHook

Google Cloud Dataproc API。

DataprocAsyncHook

與 Google Cloud Dataproc API 的非同步互動。

exception airflow.providers.google.cloud.hooks.dataproc.DataprocResourceIsNotReadyError[source]

基底類別: airflow.exceptions.AirflowException

當資源尚未準備好建立 Dataproc 叢集時引發。

class airflow.providers.google.cloud.hooks.dataproc.DataProcJobBuilder(project_id, task_id, cluster_name, job_type, properties=None)[source]

用於建構 Dataproc 任務的輔助類別。

add_labels(labels=None)[source]

為 Dataproc 任務設定標籤。

參數

labels (dict | None) – 任務查詢的標籤。

add_variables(variables=None)[source]

為 Dataproc 任務設定變數。

參數

variables (dict | None) – 任務查詢的變數。

add_args(args=None)[source]

為 Dataproc 任務設定參數。

參數

args (list[str] | None) – 任務查詢的參數。

add_query(query)[source]

為 Dataproc 任務新增查詢。

參數

query (str | list[str]) – 任務的查詢。

add_query_uri(query_uri)[source]

為 Dataproc 任務設定查詢 URI。

參數

query_uri (str) – 任務查詢的 URI。

add_jar_file_uris(jars=None)[source]

為 Dataproc 任務設定 jar URI。

參數

jars (list[str] | None) – jar URI 列表

add_archive_uris(archives=None)[source]

為 Dataproc 任務設定 archives URI。

參數

archives (list[str] | None) – archives URI 列表

add_file_uris(files=None)[source]

為 Dataproc 任務設定檔案 URI。

參數

files (list[str] | None) – 檔案 URI 列表

add_python_file_uris(pyfiles=None)[source]

為 Dataproc 任務設定 python 檔案 URI。

參數

pyfiles (list[str] | None) – python 檔案 URI 列表

set_main(main_jar=None, main_class=None)[source]

設定 Dataproc 主類別。

參數
  • main_jar (str | None) – 主要檔案的 URI。

  • main_class (str | None) – 主要類別的名稱。

引發

ValueError

set_python_main(main)[source]

設定 Dataproc 主要 python 檔案 URI。

參數

main (str) – python 主要檔案的 URI。

set_job_name(name)[source]

設定 Dataproc 任務名稱。

任務名稱會經過清理,將點替換為底線。

參數

name (str) – 任務名稱。

build()[source]

傳回 Dataproc 任務。

傳回

Dataproc 任務

回傳類型

dict

class airflow.providers.google.cloud.hooks.dataproc.DataprocHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底類別: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Google Cloud Dataproc API。

hook 中使用 project_id 的所有方法都必須使用關鍵字引數而非位置引數來呼叫。

get_cluster_client(region=None)[source]

建立 ClusterControllerClient。

get_template_client(region=None)[source]

建立 WorkflowTemplateServiceClient。

get_job_client(region=None)[source]

建立 JobControllerClient。

get_batch_client(region=None)[source]

建立 BatchControllerClient。

get_operations_client(region)[source]

建立 OperationsClient。

dataproc_options_to_args(options)[source]

從引數字典傳回格式化的叢集參數。

參數

options (dict) – 包含選項的字典

傳回

引數列表

回傳類型

list[str]

wait_for_operation(operation, timeout=None, result_retry=DEFAULT)[source]

等待長時間執行的操作完成。

create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在指定的專案中建立叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要建立的叢集名稱。

  • labels (dict[str, str] | None) – 將指派給已建立叢集的標籤。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要建立的叢集組態。如果提供 dict,則其格式必須與 protobuf 訊息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虛擬叢集組態,用於建立不直接控制底層運算資源的 Dataproc 叢集,例如,使用 VirtualClusterConfig 建立 Dataproc-on-GKE 叢集時。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 CreateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

刪除專案中的叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要刪除的叢集名稱。

  • cluster_uuid (str | None) – 如果指定,則當具有 UUID 的叢集不存在時,RPC 應失敗。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 DeleteClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[source]

取得叢集診斷資訊。

作業完成後,回應會包含診斷輸出報告的 Cloud Storage URI,其中包含收集到的診斷摘要。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • tarball_gcs_dir (str | None) – 診斷 tarball 的輸出 Cloud Storage 目錄。如果未指定,將使用叢集暫存儲存貯體中的特定工作目錄。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 應在叢集上執行診斷的時間間隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的任務列表。格式:projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的 yarn 應用程式列表。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

取得專案中叢集的資源表示法。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[source]

列出專案中所有 regions/{region}/clusters。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • filter – 用於約束叢集。區分大小寫。

  • page_size (int | None) – 底層 API 回應中包含的資源最大數量。如果頁面串流是按資源執行,則此參數不會影響傳回值。如果頁面串流是按頁面執行,則這會決定頁面中資源的最大數量。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

更新專案中的叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 叢集的變更。如果提供 dict,則其格式必須與 protobuf 訊息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定要更新欄位的路徑,相對於 Cluster。例如,若要將叢集中的工作站數量變更為 5,則應將其指定為 config.worker_config.num_instances,且 PATCH 請求主體會指定新值

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同樣地,若要將叢集中的可預佔工作站數量變更為 5,則應將其指定為 config.secondary_worker_config.num_instances,且 PATCH 請求主體會是

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供 dict,則其格式必須與 protobuf 訊息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    YARN 正常終止逾時。正常終止允許從叢集中移除節點,而不會中斷進行中的任務。逾時指定在強制移除節點 (並可能中斷任務) 之前,等待進行中任務完成的時間長度。預設逾時為 0 (用於強制終止),而允許的最大逾時為一天。

    僅在 Dataproc 映像檔版本 1.2 和更高版本上支援。

    如果提供 dict,則其格式必須與 protobuf 訊息 Duration 相同。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

start_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在專案中啟動叢集。

參數
  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • cluster_name (str) – 叢集名稱。

  • cluster_uuid (str | None) – 叢集 UUID

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

傳回

google.api_core.operation.Operation 的實例

回傳類型

google.api_core.operation.Operation

stop_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在專案中啟動叢集。

參數
  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • cluster_name (str) – 叢集名稱。

  • cluster_uuid (str | None) – 叢集 UUID

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

傳回

google.api_core.operation.Operation 的實例

回傳類型

google.api_core.operation.Operation

create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

建立新的工作流程範本。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要建立的 Dataproc 工作流程範本。如果提供 dict,則其格式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

實例化範本並開始執行。

參數
  • template_name (str) – 要實例化的範本名稱。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • version (int | None) – 要實例化的工作流程範本版本。如果指定,則僅當工作流程範本的目前版本具有提供的版本時,才會實例化工作流程。此選項不能用於實例化先前版本的工作流程範本。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • parameters (dict[str, str] | None) – 從參數名稱到應為這些參數使用的值的映射。值不得超過 100 個字元。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

實例化範本並開始執行。

參數
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要實例化的工作流程範本。如果提供 dict,則其格式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

wait_for_job(job_id, project_id, region, wait_time=10, timeout=None)[source]

輪詢作業以檢查是否已完成。

參數
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • wait_time (int) – 檢查之間間隔的秒數。

  • timeout (int | None) – 等待作業準備就緒的秒數。

get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

取得專案中作業的資源表示法。

參數
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

將作業提交至叢集。

參數
  • job (dict | google.cloud.dataproc_v1.Job) – 作業資源。如果提供 dict,則其格式必須與 protobuf 訊息 Job 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

啟動作業取消請求。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str | None) – Cloud Dataproc 區域以處理請求。

  • job_id (str) – 作業 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立批次工作負載。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要建立的批次。

  • batch_id (str | None) – 要用於批次的 ID,它將成為批次資源名稱的最後一個組件。此值必須為 4-63 個字元。有效字元為 [a-z][0-9]-

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 CreateBatchRequest 請求,則第二個請求將被忽略,並且傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

刪除批次工作負載資源。

參數
  • batch_id (str) – 批次 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

取得批次工作負載資源表示法。

參數
  • batch_id (str) – 批次 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批次工作負載。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • page_size (int | None) – 每個回應中傳回的批次數量上限。服務可能會傳回小於此值的值。預設頁面大小為 20;最大頁面大小為 1000。

  • page_token (str | None) – 從先前的 ListBatches 呼叫收到的頁面符記。提供此符記以擷取後續頁面。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • filter (str | None) – ListBatchesRequest 中指定的結果篩選器

  • order_by (str | None) – ListBatchesRequest 中指定的結果排序方式

wait_for_batch(batch_id, region, project_id, wait_check_interval=10, retry=DEFAULT, timeout=None, metadata=())[source]

等待批次作業完成。

在提交批次作業後,運算子會等待作業完成。但是,當 Airflow 重新啟動或由於任何原因而終止任務 pid 時,此 Hook 非常有用。在這種情況下,將再次發生建立,捕獲引發的 AlreadyExists,並失敗以使用此功能等待完成。

參數
  • batch_id (str) – 批次 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • wait_check_interval (int) – 檢查作業完成之間暫停的時間量。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

check_error_for_resource_is_not_ready_msg(error_msg)[source]

檢查錯誤原因是否為資源尚未就緒。

class airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基底類別: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

與 Google Cloud Dataproc API 的非同步互動。

hook 中使用 project_id 的所有方法都必須使用關鍵字引數而非位置引數來呼叫。

get_cluster_client(region=None)[source]

建立 ClusterControllerAsyncClient。

get_template_client(region=None)[source]

建立 WorkflowTemplateServiceAsyncClient。

get_job_client(region=None)[source]

建立 JobControllerAsyncClient。

get_batch_client(region=None)[source]

建立 BatchControllerAsyncClient。

get_operations_client(region)[source]

建立 OperationsClient。

async create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在專案中建立叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要建立的叢集名稱。

  • labels (dict[str, str] | None) – 將指派給已建立叢集的標籤。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要建立的叢集組態。如果提供 dict,則其格式必須與 protobuf 訊息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虛擬叢集組態,用於建立不直接控制底層運算資源的 Dataproc 叢集,例如,使用 VirtualClusterConfig 建立 Dataproc-on-GKE 叢集時。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 CreateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

刪除專案中的叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要刪除的叢集名稱。

  • cluster_uuid (str | None) – 如果指定,則當具有 UUID 的叢集不存在時,RPC 應失敗。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 DeleteClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[source]

取得叢集診斷資訊。

作業完成後,回應會包含診斷輸出報告的 Cloud Storage URI,其中包含收集到的診斷摘要。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • tarball_gcs_dir (str | None) – 診斷 tarball 的輸出 Cloud Storage 目錄。如果未指定,將使用叢集暫存儲存貯體中的特定工作目錄。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 應在叢集上執行診斷的時間間隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的任務列表。格式:projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要在其上執行診斷的 yarn 應用程式列表。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

取得專案中叢集的資源表示法。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[source]

列出專案中所有 regions/{region}/clusters。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • filter – 用於約束叢集。區分大小寫。

  • page_size (int | None) – 底層 API 回應中包含的資源最大數量。如果頁面串流是按資源執行,則此參數不會影響傳回值。如果頁面串流是按頁面執行,則這會決定頁面中資源的最大數量。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

更新專案中的叢集。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 叢集的變更。如果提供 dict,則其格式必須與 protobuf 訊息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定要更新欄位的路徑,相對於 Cluster。例如,若要將叢集中的工作站數量變更為 5,則應將其指定為 config.worker_config.num_instances,且 PATCH 請求主體會指定新值

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同樣地,若要將叢集中的可預佔工作站數量變更為 5,則應將其指定為 config.secondary_worker_config.num_instances,且 PATCH 請求主體會是

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供 dict,則其格式必須與 protobuf 訊息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    YARN 正常終止逾時。正常終止允許從叢集中移除節點,而不會中斷進行中的任務。逾時指定在強制移除節點 (並可能中斷任務) 之前,等待進行中任務完成的時間長度。預設逾時為 0 (用於強制終止),而允許的最大逾時為一天。

    僅在 Dataproc 映像檔版本 1.2 和更高版本上支援。

    如果提供 dict,則其格式必須與 protobuf 訊息 Duration 相同。

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

建立新的工作流程範本。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要建立的 Dataproc 工作流程範本。如果提供 dict,則其格式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

實例化範本並開始執行。

參數
  • template_name (str) – 要實例化的範本名稱。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • version (int | None) – 要實例化的工作流程範本版本。如果指定,則僅當工作流程範本的目前版本具有提供的版本時,才會實例化工作流程。此選項不能用於實例化先前版本的工作流程範本。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • parameters (dict[str, str] | None) – 從參數名稱到應為這些參數使用的值的映射。值不得超過 100 個字元。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

實例化範本並開始執行。

參數
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要實例化的工作流程範本。如果提供 dict,則其格式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async get_operation(region, operation_name)[source]
async get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

取得專案中作業的資源表示法。

參數
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

將作業提交至叢集。

參數
  • job (dict | google.cloud.dataproc_v1.Job) – 作業資源。如果提供 dict,則其格式必須與 protobuf 訊息 Job 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 一個標籤,用於防止多個具有相同標籤的並行工作流程實例執行。這可以降低由於重試而啟動並行實例的風險。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

啟動作業取消請求。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str | None) – Cloud Dataproc 區域以處理請求。

  • job_id (str) – 作業 ID。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立批次工作負載。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要建立的批次。

  • batch_id (str | None) – 要用於批次的 ID,它將成為批次資源名稱的最後一個組件。此值必須為 4-63 個字元。有效字元為 [a-z][0-9]-

  • request_id (str | None) – 用於識別請求的唯一 ID。如果伺服器收到兩個具有相同 ID 的 CreateBatchRequest 請求,則第二個請求將被忽略,並且傳回為第一個請求建立並儲存在後端的作業。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

刪除批次工作負載資源。

參數
  • batch_id (str) – 批次 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

取得批次工作負載資源表示法。

參數
  • batch_id (str) – 批次 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

async list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批次工作負載。

參數
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 處理請求的 Cloud Dataproc 區域。

  • page_size (int | None) – 每個回應中傳回的批次數量上限。服務可能會傳回小於此值的值。預設頁面大小為 20;最大頁面大小為 1000。

  • page_token (str | None) – 從先前的 ListBatches 呼叫收到的頁面符記。提供此符記以擷取後續頁面。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的秒數。如果指定 retry,則逾時適用於每個個別嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的其他中繼資料。

  • filter (str | None) – ListBatchesRequest 中指定的結果篩選器

  • order_by (str | None) – ListBatchesRequest 中指定的結果排序方式

這個條目是否有幫助?