Google Cloud Dataproc 運算子

Dataproc 是一項受管理的 Apache Spark 和 Apache Hadoop 服務,可讓您利用開放原始碼資料工具進行批次處理、查詢、串流和機器學習。Dataproc 自動化功能可協助您快速建立叢集、輕鬆管理叢集,並在不需要叢集時關閉叢集,以節省成本。

如需有關此服務的詳細資訊,請造訪Dataproc 產品文件 <產品文件

先決條件工作

若要使用這些運算子,您必須執行以下幾項操作

建立叢集

當您建立 Dataproc 叢集時,您可以選擇 Compute Engine 作為部署平台。在此組態中,Dataproc 會自動佈建必要的 Compute Engine VM 執行個體來執行叢集。VM 執行個體用於主節點、主要工作站節點和次要工作站節點(如果已指定)。這些 VM 執行個體由 Compute Engine 建立和管理,而 Dataproc 負責設定大數據處理任務所需的軟體和協調。透過提供節點的組態,您可以描述主要和次要節點的組態,以及 Compute Engine 執行個體叢集的狀態。設定次要工作站節點時,您可以指定工作站的數量及其類型。透過啟用「可搶佔式」選項以針對這些節點使用可搶佔式 VM(相當於 Spot 執行個體),您可以利用這些執行個體為您的 Dataproc 工作負載帶來的成本節省優勢。主節點通常託管叢集主節點和各種控制服務,不具備「可搶佔式」選項,因為主節點的穩定性和可用性至關重要。叢集建立後,組態設定(包括次要工作站節點的可搶佔性)將無法直接修改。

如需更多關於建立叢集時可傳遞的可用欄位資訊,請造訪 Dataproc 建立叢集 API。

叢集組態可能如下所示

tests/system/google/cloud/dataproc/example_dataproc_hive.py[原始碼]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

透過此組態,我們可以建立叢集: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py[原始碼]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

GKE 上的 Dataproc 會在 GKE 叢集上部署 Dataproc 虛擬叢集。與 Compute Engine 叢集上的 Dataproc 不同,GKE 虛擬叢集上的 Dataproc 不包含個別的主節點和工作站 VM。相反地,當您在 GKE 上建立 Dataproc 虛擬叢集時,GKE 上的 Dataproc 會在 GKE 叢集內建立節點池。GKE 上的 Dataproc 工作會以 Pod 的形式在這些節點池上執行。節點池和 Pod 在節點池上的排程由 GKE 管理。

建立 GKE Dataproc 叢集時,您可以指定底層運算資源是否使用可搶佔式 VM。GKE 支援使用可搶佔式 VM 作為節省成本的措施。透過啟用可搶佔式 VM,GKE 將使用可搶佔式 VM 佈建叢集節點。或者,您可以將節點建立為 Spot VM 執行個體,這是舊版可搶佔式 VM 的最新更新。這對於在 GKE 上執行 Dataproc 工作負載同時最佳化成本可能很有利。

若要在 Google Kubernetes Engine 中建立 Dataproc 叢集,您可以傳遞叢集組態

tests/system/google/cloud/dataproc/example_dataproc_gke.py[原始碼]


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": False,
                            "machine_type": "e2-standard-4",
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

透過此組態,我們可以建立叢集: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_gke.py[原始碼]

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

您也可以使用選用元件 Presto 建立 Dataproc 叢集。若要執行此操作,請使用以下組態。請注意,預設映像可能不支援選取的選用元件。如果屬於這種情況,請指定正確的 image_version,您可以在文件中找到。

tests/system/google/cloud/dataproc/example_dataproc_presto.py[原始碼]

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

您也可以使用選用元件 Trino 建立 Dataproc 叢集。若要執行此操作,請使用以下組態。請注意,預設映像可能不支援選取的選用元件。如果屬於這種情況,請指定正確的 image_version,您可以在文件中找到。

tests/system/google/cloud/dataproc/example_dataproc_trino.py[原始碼]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

您可以針對此動作使用可延遲模式,以便非同步執行運算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[原始碼]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

產生叢集配置

您也可以使用功能性 API 產生 CLUSTER_CONFIG,這可以使用 ClusterGeneratormake() 輕鬆完成。您可以產生並使用如下所示的配置

tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py[原始碼]

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[GCS_INIT_FILE],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
    internal_ip_only=False,
).make()

診斷叢集

Dataproc 支援收集叢集診斷資訊,例如系統、Spark、Hadoop 和 Dataproc 日誌、叢集設定檔,這些資訊可用於疑難排解 Dataproc 叢集或工作。請務必注意,此資訊只能在叢集刪除之前收集。如需更多關於診斷叢集時可傳遞的可用欄位資訊,請造訪 Dataproc 診斷叢集 API。

若要診斷 Dataproc 叢集,請使用: DataprocDiagnoseClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[原始碼]

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
    )

您也可以使用可延遲模式,以便非同步執行運算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[原始碼]

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        deferrable=True,
    )

更新叢集

您可以透過提供叢集配置和 updateMask 來擴增或縮減叢集。在 updateMask 引數中,您指定要更新的欄位路徑(相對於叢集)。如需關於 updateMask 和其他參數的詳細資訊,請參閱 Dataproc 更新叢集 API。

新的叢集配置和 updateMask 的範例

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[原始碼]

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

若要更新叢集,您可以使用: DataprocUpdateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[原始碼]

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

您可以針對此動作使用可延遲模式,以便非同步執行運算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[原始碼]

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

啟動叢集

若要啟動叢集,您可以使用 DataprocStartClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[原始碼]

start_cluster = DataprocStartClusterOperator(
    task_id="start_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

停止叢集

若要停止叢集,您可以使用 DataprocStopClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[原始碼]

stop_cluster = DataprocStopClusterOperator(
    task_id="stop_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

刪除叢集

若要刪除叢集,您可以使用: DataprocDeleteClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py[原始碼]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

您可以針對此動作使用可延遲模式,以便非同步執行運算子

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[原始碼]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

將工作提交至叢集

Dataproc 支援提交不同大數據元件的工作。目前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。如需關於版本和映像的詳細資訊,請參閱 Cloud Dataproc 映像版本列表

若要將工作提交至叢集,您需要提供工作來源檔案。工作來源檔案可以在 GCS、叢集或本機檔案系統上。您可以指定 file:/// 路徑來參照叢集主節點上的本機檔案。

可以使用以下方式提交工作配置: DataprocSubmitJobOperator

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[原始碼]

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

要提交的工作配置範例

我們在下方為每個框架提供了一個範例。工作中可以提供的引數比範例中顯示的更多。如需完整的引數列表,請參閱 DataProc 工作引數

PySpark 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[原始碼]

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}

SparkSQl 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py[原始碼]

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Spark 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_spark.py[原始碼]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

可延遲模式 中執行的 Spark 工作配置範例

tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py[原始碼]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Hive 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_hive.py[原始碼]

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Hadoop 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_hadoop.py[原始碼]

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Pig 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_pig.py[原始碼]

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

SparkR 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_sparkr.py[原始碼]

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}

Presto 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_presto.py[原始碼]

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Trino 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_trino.py[原始碼]

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Flink 工作的配置範例

tests/system/google/cloud/dataproc/example_dataproc_flink.py[原始碼]

FLINK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "flink_job": {
        "main_class": "org.apache.flink.examples.java.wordcount.WordCount",
        "jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
    },
}

使用工作流程範本

Dataproc 支援建立工作流程範本,稍後可以觸發這些範本。

可以使用以下方式建立工作流程範本: DataprocCreateWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[原始碼]

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

工作流程建立後,使用者可以使用 DataprocInstantiateWorkflowTemplateOperator 觸發它

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[原始碼]

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

此外,對於所有這些動作,您都可以使用可延遲模式的運算子

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[原始碼]

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

內嵌運算子是另一種選擇。它會建立工作流程、執行工作流程,然後在之後刪除它: DataprocInstantiateInlineWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[原始碼]

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

此外,對於所有這些動作,您都可以使用可延遲模式的運算子

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[原始碼]

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

建立批次

Dataproc 支援建立批次工作負載。

可以使用以下方式建立批次: DataprocCreateBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

若要建立具有永久歷史伺服器的批次,您應先使用特定參數建立 Dataproc 叢集。關於如何建立叢集的文件,您可以在這裡找到

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[原始碼]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

叢集建立後,您應將其新增至批次配置。

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[原始碼]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

若要檢查操作是否成功,您可以使用 DataprocBatchSensor

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

此外,對於所有這些動作,您都可以使用可延遲模式的運算子

tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py[原始碼]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

取得批次

若要取得批次,您可以使用: DataprocGetBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

列出批次

若要取得現有批次的列表,您可以使用: DataprocListBatchesOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

刪除批次

若要刪除批次,您可以使用: DataprocDeleteBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

取消批次操作

若要取消操作,您可以使用: DataprocCancelOperationOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py[原始碼]

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)

參考資料

如需進一步資訊,請參閱

此條目是否有幫助?