Amazon EMR on Amazon EKS

Amazon EMR on EKS 為 Amazon EMR 提供部署選項,讓您可以在 Amazon EKS 上執行開放原始碼的大數據框架。

先決條件任務

若要使用這些運算子,您必須執行幾項操作

運算子

建立 Amazon EMR EKS 虛擬叢集

EmrEksCreateClusterOperator 將建立 Amazon EMR on EKS 虛擬叢集。以下範例 DAG 示範如何建立 EMR on EKS 虛擬叢集。

若要在 Amazon EKS 上建立 Amazon EMR 叢集,您需要指定虛擬叢集名稱、您想要使用的 eks 叢集,以及 eks 命名空間。

如需更多詳細資訊,請參閱 EMR on EKS 開發指南

tests/system/amazon/aws/example_emr_eks.py

    create_emr_eks_cluster = EmrEksCreateClusterOperator(
        task_id="create_emr_eks_cluster",
        virtual_cluster_name=virtual_cluster_name,
        eks_cluster_name=eks_cluster_name,
        eks_namespace=eks_namespace,
    )

將工作提交至 Amazon EMR 虛擬叢集

注意

此範例假設您已設定 EMR on EKS 虛擬叢集。如需更多資訊,請參閱 EMR on EKS 入門指南

EmrContainerOperator 將新的工作提交至 Amazon EMR on Amazon EKS 虛擬叢集。以下範例工作計算數學常數 Pi。在生產工作環境中,您通常會參考 Amazon Simple Storage Service (S3) 上的 Spark 腳本。

若要為 Amazon EMR on Amazon EKS 建立工作,您需要指定虛擬叢集 ID、您想要使用的 Amazon EMR 版本、您的 IAM 執行角色,以及 Spark 提交參數。

您也可以選擇性地提供組態覆寫,例如 Spark、Hive 或 Log4j 屬性,以及將 Spark 日誌傳送至 Amazon S3 或 Amazon Cloudwatch 的監控組態。

在此範例中,我們示範如何新增 applicationConfiguration 以使用 AWS Glue 資料目錄,以及 monitoringConfiguration 以將日誌傳送至 Amazon CloudWatch 中的 /aws/emr-eks-spark 日誌群組。如需工作組態的更多詳細資訊,請參閱 EMR on EKS 指南

tests/system/amazon/aws/example_emr_eks.py

job_driver_arg = {
    "sparkSubmitJobDriver": {
        "entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
        "--conf spark.executor.cores=2 --conf spark.driver.cores=1",
    }
}

configuration_overrides_arg = {
    "monitoringConfiguration": {
        "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-eks-jobs",
            "logStreamNamePrefix": "airflow",
        }
    },
}

我們將 virtual_cluster_idexecution_role_arn 值作為運算子參數傳遞,但您可以將它們儲存在連線中,或在 DAG 中提供。您的 AWS 區域應在 aws_default 連線中定義為 {"region_name": "us-east-1"},或定義在傳遞至運算子的自訂連線名稱中,並搭配 aws_conn_id 參數。運算子會傳回工作執行的工作 ID。

tests/system/amazon/aws/example_emr_eks.py

job_starter = EmrContainerOperator(
    task_id="start_job",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    execution_role_arn=job_role_arn,
    release_label="emr-7.0.0-latest",
    job_driver=job_driver_arg,
    configuration_overrides=configuration_overrides_arg,
    name="pi.py",
)

感測器

等待 Amazon EMR 虛擬叢集工作

若要等待 Amazon EMR 虛擬叢集工作的狀態達到終止狀態,您可以使用 EmrContainerSensor

tests/system/amazon/aws/example_emr_eks.py

job_waiter = EmrContainerSensor(
    task_id="job_waiter",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    job_id=str(job_starter.output),
)

此條目是否有幫助?