Amazon EMR on Amazon EKS¶
Amazon EMR on EKS 為 Amazon EMR 提供部署選項,讓您可以在 Amazon EKS 上執行開放原始碼的大數據框架。
先決條件任務¶
若要使用這些運算子,您必須執行幾項操作
使用 AWS Console 或 AWS CLI 建立必要的資源。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 安裝
設定連線.
運算子¶
建立 Amazon EMR EKS 虛擬叢集¶
EmrEksCreateClusterOperator
將建立 Amazon EMR on EKS 虛擬叢集。以下範例 DAG 示範如何建立 EMR on EKS 虛擬叢集。
若要在 Amazon EKS 上建立 Amazon EMR 叢集,您需要指定虛擬叢集名稱、您想要使用的 eks 叢集,以及 eks 命名空間。
如需更多詳細資訊,請參閱 EMR on EKS 開發指南。
tests/system/amazon/aws/example_emr_eks.py
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
virtual_cluster_name=virtual_cluster_name,
eks_cluster_name=eks_cluster_name,
eks_namespace=eks_namespace,
)
將工作提交至 Amazon EMR 虛擬叢集¶
注意
此範例假設您已設定 EMR on EKS 虛擬叢集。如需更多資訊,請參閱 EMR on EKS 入門指南。
EmrContainerOperator
將新的工作提交至 Amazon EMR on Amazon EKS 虛擬叢集。以下範例工作計算數學常數 Pi
。在生產工作環境中,您通常會參考 Amazon Simple Storage Service (S3) 上的 Spark 腳本。
若要為 Amazon EMR on Amazon EKS 建立工作,您需要指定虛擬叢集 ID、您想要使用的 Amazon EMR 版本、您的 IAM 執行角色,以及 Spark 提交參數。
您也可以選擇性地提供組態覆寫,例如 Spark、Hive 或 Log4j 屬性,以及將 Spark 日誌傳送至 Amazon S3 或 Amazon Cloudwatch 的監控組態。
在此範例中,我們示範如何新增 applicationConfiguration
以使用 AWS Glue 資料目錄,以及 monitoringConfiguration
以將日誌傳送至 Amazon CloudWatch 中的 /aws/emr-eks-spark
日誌群組。如需工作組態的更多詳細資訊,請參閱 EMR on EKS 指南。
tests/system/amazon/aws/example_emr_eks.py
job_driver_arg = {
"sparkSubmitJobDriver": {
"entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
"--conf spark.executor.cores=2 --conf spark.driver.cores=1",
}
}
configuration_overrides_arg = {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-eks-jobs",
"logStreamNamePrefix": "airflow",
}
},
}
我們將 virtual_cluster_id
和 execution_role_arn
值作為運算子參數傳遞,但您可以將它們儲存在連線中,或在 DAG 中提供。您的 AWS 區域應在 aws_default
連線中定義為 {"region_name": "us-east-1"}
,或定義在傳遞至運算子的自訂連線名稱中,並搭配 aws_conn_id
參數。運算子會傳回工作執行的工作 ID。
tests/system/amazon/aws/example_emr_eks.py
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=str(create_emr_eks_cluster.output),
execution_role_arn=job_role_arn,
release_label="emr-7.0.0-latest",
job_driver=job_driver_arg,
configuration_overrides=configuration_overrides_arg,
name="pi.py",
)
感測器¶
等待 Amazon EMR 虛擬叢集工作¶
若要等待 Amazon EMR 虛擬叢集工作的狀態達到終止狀態,您可以使用 EmrContainerSensor
tests/system/amazon/aws/example_emr_eks.py
job_waiter = EmrContainerSensor(
task_id="job_waiter",
virtual_cluster_id=str(create_emr_eks_cluster.output),
job_id=str(job_starter.output),
)