Google Cloud Dataproc 運算子¶
Dataproc 是一項受管理的 Apache Spark 和 Apache Hadoop 服務,可讓您利用開放原始碼資料工具進行批次處理、查詢、串流和機器學習。Dataproc 自動化功能可協助您快速建立叢集、輕鬆管理叢集,並在不需要叢集時關閉叢集,以節省成本。
如需有關此服務的詳細資訊,請造訪Dataproc 產品文件 <產品文件
先決條件工作¶
若要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選取或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件所述。
啟用 API,如 Cloud Console 文件所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'如需詳細資訊,請參閱 安裝。
建立叢集¶
當您建立 Dataproc 叢集時,您可以選擇 Compute Engine 作為部署平台。在此組態中,Dataproc 會自動佈建必要的 Compute Engine VM 執行個體來執行叢集。VM 執行個體用於主節點、主要工作站節點和次要工作站節點(如果已指定)。這些 VM 執行個體由 Compute Engine 建立和管理,而 Dataproc 負責設定大數據處理任務所需的軟體和協調。透過提供節點的組態,您可以描述主要和次要節點的組態,以及 Compute Engine 執行個體叢集的狀態。設定次要工作站節點時,您可以指定工作站的數量及其類型。透過啟用「可搶佔式」選項以針對這些節點使用可搶佔式 VM(相當於 Spot 執行個體),您可以利用這些執行個體為您的 Dataproc 工作負載帶來的成本節省優勢。主節點通常託管叢集主節點和各種控制服務,不具備「可搶佔式」選項,因為主節點的穩定性和可用性至關重要。叢集建立後,組態設定(包括次要工作站節點的可搶佔性)將無法直接修改。
如需更多關於建立叢集時可傳遞的可用欄位資訊,請造訪 Dataproc 建立叢集 API。
叢集組態可能如下所示
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 32,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}
透過此組態,我們可以建立叢集: DataprocCreateClusterOperator
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
GKE 上的 Dataproc 會在 GKE 叢集上部署 Dataproc 虛擬叢集。與 Compute Engine 叢集上的 Dataproc 不同,GKE 虛擬叢集上的 Dataproc 不包含個別的主節點和工作站 VM。相反地,當您在 GKE 上建立 Dataproc 虛擬叢集時,GKE 上的 Dataproc 會在 GKE 叢集內建立節點池。GKE 上的 Dataproc 工作會以 Pod 的形式在這些節點池上執行。節點池和 Pod 在節點池上的排程由 GKE 管理。
建立 GKE Dataproc 叢集時,您可以指定底層運算資源是否使用可搶佔式 VM。GKE 支援使用可搶佔式 VM 作為節省成本的措施。透過啟用可搶佔式 VM,GKE 將使用可搶佔式 VM 佈建叢集節點。或者,您可以將節點建立為 Spot VM 執行個體,這是舊版可搶佔式 VM 的最新更新。這對於在 GKE 上執行 Dataproc 工作負載同時最佳化成本可能很有利。
若要在 Google Kubernetes Engine 中建立 Dataproc 叢集,您可以傳遞叢集組態
VIRTUAL_CLUSTER_CONFIG = {
"kubernetes_cluster_config": {
"gke_cluster_config": {
"gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
"node_pool_target": [
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
"roles": ["DEFAULT"],
"node_pool_config": {
"config": {
"preemptible": False,
"machine_type": "e2-standard-4",
}
},
}
],
},
"kubernetes_software_config": {"component_version": {"SPARK": "3"}},
},
"staging_bucket": "test-staging-bucket",
}
透過此組態,我們可以建立叢集: DataprocCreateClusterOperator
create_cluster_in_gke = DataprocCreateClusterOperator(
task_id="create_cluster_in_gke",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
您也可以使用選用元件 Presto 建立 Dataproc 叢集。若要執行此操作,請使用以下組態。請注意,預設映像可能不支援選取的選用元件。如果屬於這種情況,請指定正確的 image_version
,您可以在文件中找到。
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"PRESTO",
],
"image_version": "2.0",
},
}
您也可以使用選用元件 Trino 建立 Dataproc 叢集。若要執行此操作,請使用以下組態。請注意,預設映像可能不支援選取的選用元件。如果屬於這種情況,請指定正確的 image_version
,您可以在文件中找到。
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"TRINO",
],
"image_version": "2.1",
},
}
您可以針對此動作使用可延遲模式,以便非同步執行運算子
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
產生叢集配置¶
您也可以使用功能性 API 產生 CLUSTER_CONFIG,這可以使用 ClusterGenerator
的 make() 輕鬆完成。您可以產生並使用如下所示的配置
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone=ZONE,
master_machine_type="n1-standard-4",
master_disk_size=32,
worker_machine_type="n1-standard-4",
worker_disk_size=32,
num_workers=2,
storage_bucket=BUCKET_NAME,
init_actions_uris=[GCS_INIT_FILE],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
num_preemptible_workers=1,
preemptibility="PREEMPTIBLE",
internal_ip_only=False,
).make()
診斷叢集¶
Dataproc 支援收集叢集診斷資訊,例如系統、Spark、Hadoop 和 Dataproc 日誌、叢集設定檔,這些資訊可用於疑難排解 Dataproc 叢集或工作。請務必注意,此資訊只能在叢集刪除之前收集。如需更多關於診斷叢集時可傳遞的可用欄位資訊,請造訪 Dataproc 診斷叢集 API。
若要診斷 Dataproc 叢集,請使用: DataprocDiagnoseClusterOperator
。
diagnose_cluster = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
)
您也可以使用可延遲模式,以便非同步執行運算子
diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster_deferrable",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
deferrable=True,
)
更新叢集¶
您可以透過提供叢集配置和 updateMask 來擴增或縮減叢集。在 updateMask 引數中,您指定要更新的欄位路徑(相對於叢集)。如需關於 updateMask 和其他參數的詳細資訊,請參閱 Dataproc 更新叢集 API。
新的叢集配置和 updateMask 的範例
CLUSTER_UPDATE = {
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}
若要更新叢集,您可以使用: DataprocUpdateClusterOperator
scale_cluster = DataprocUpdateClusterOperator(
task_id="scale_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
您可以針對此動作使用可延遲模式,以便非同步執行運算子
update_cluster = DataprocUpdateClusterOperator(
task_id="update_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
啟動叢集¶
若要啟動叢集,您可以使用 DataprocStartClusterOperator
start_cluster = DataprocStartClusterOperator(
task_id="start_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
停止叢集¶
若要停止叢集,您可以使用 DataprocStopClusterOperator
stop_cluster = DataprocStopClusterOperator(
task_id="stop_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
刪除叢集¶
若要刪除叢集,您可以使用: DataprocDeleteClusterOperator
。
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
)
您可以針對此動作使用可延遲模式,以便非同步執行運算子
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
deferrable=True,
)
將工作提交至叢集¶
Dataproc 支援提交不同大數據元件的工作。目前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。如需關於版本和映像的詳細資訊,請參閱 Cloud Dataproc 映像版本列表
若要將工作提交至叢集,您需要提供工作來源檔案。工作來源檔案可以在 GCS、叢集或本機檔案系統上。您可以指定 file:/// 路徑來參照叢集主節點上的本機檔案。
可以使用以下方式提交工作配置: DataprocSubmitJobOperator
。
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
要提交的工作配置範例¶
我們在下方為每個框架提供了一個範例。工作中可以提供的引數比範例中顯示的更多。如需完整的引數列表,請參閱 DataProc 工作引數
PySpark 工作的配置範例
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}
SparkSQl 工作的配置範例
SPARK_SQL_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Spark 工作的配置範例
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
在 可延遲模式 中執行的 Spark 工作配置範例
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
Hive 工作的配置範例
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Hadoop 工作的配置範例
HADOOP_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
},
}
Pig 工作的配置範例
PIG_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
SparkR 工作的配置範例
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}
Presto 工作的配置範例
PRESTO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Trino 工作的配置範例
TRINO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Flink 工作的配置範例
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
使用工作流程範本¶
Dataproc 支援建立工作流程範本,稍後可以觸發這些範本。
可以使用以下方式建立工作流程範本: DataprocCreateWorkflowTemplateOperator
。
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
region=REGION,
)
工作流程建立後,使用者可以使用 DataprocInstantiateWorkflowTemplateOperator
觸發它
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
此外,對於所有這些動作,您都可以使用可延遲模式的運算子
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow_async",
region=REGION,
project_id=PROJECT_ID,
template_id=WORKFLOW_NAME,
deferrable=True,
)
內嵌運算子是另一種選擇。它會建立工作流程、執行工作流程,然後在之後刪除它: DataprocInstantiateInlineWorkflowTemplateOperator
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)
此外,對於所有這些動作,您都可以使用可延遲模式的運算子
instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template_async",
template=WORKFLOW_TEMPLATE,
region=REGION,
deferrable=True,
)
建立批次¶
Dataproc 支援建立批次工作負載。
可以使用以下方式建立批次: DataprocCreateBatchOperator
。
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_2 = DataprocCreateBatchOperator(
task_id="create_batch_2",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_3 = DataprocCreateBatchOperator(
task_id="create_batch_3",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_3,
asynchronous=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
若要建立具有永久歷史伺服器的批次,您應先使用特定參數建立 Dataproc 叢集。關於如何建立叢集的文件,您可以在這裡找到
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster_for_phs",
project_id=PROJECT_ID,
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
叢集建立後,您應將其新增至批次配置。
create_batch = DataprocCreateBatchOperator(
task_id="create_batch_with_phs",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
若要檢查操作是否成功,您可以使用 DataprocBatchSensor
。
batch_async_sensor = DataprocBatchSensor(
task_id="batch_async_sensor",
region=REGION,
project_id=PROJECT_ID,
batch_id=BATCH_ID_3,
poke_interval=10,
)
此外,對於所有這些動作,您都可以使用可延遲模式的運算子
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
deferrable=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
取得批次¶
若要取得批次,您可以使用: DataprocGetBatchOperator
。
get_batch = DataprocGetBatchOperator(
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
列出批次¶
若要取得現有批次的列表,您可以使用: DataprocListBatchesOperator
。
list_batches = DataprocListBatchesOperator(
task_id="list_batches",
project_id=PROJECT_ID,
region=REGION,
)
刪除批次¶
若要刪除批次,您可以使用: DataprocDeleteBatchOperator
。
delete_batch = DataprocDeleteBatchOperator(
task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)
取消批次操作¶
若要取消操作,您可以使用: DataprocCancelOperationOperator
。
cancel_operation = DataprocCancelOperationOperator(
task_id="cancel_operation",
project_id=PROJECT_ID,
region=REGION,
operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)