Amazon EMR

Amazon EMR (先前稱為 Amazon Elastic MapReduce) 是一個受管叢集平台,可簡化在 AWS 上執行大數據框架 (例如 Apache Hadoop 和 Apache Spark) 以處理和分析大量資料的流程。使用這些框架和相關的開放原始碼專案,您可以為了分析目的和商業智慧工作負載處理資料。Amazon EMR 也讓您可以將大量資料轉換並移入和移出其他 AWS 資料儲存和資料庫,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。

先決條件任務

若要使用這些運算子,您必須執行一些事項

運算子

注意

為了成功執行範例,您需要為 Amazon EMR 建立 IAM 服務角色 (EMR_EC2_DefaultRoleEMR_DefaultRole)。您可以使用 AWS CLI 建立這些角色:aws emr create-default-roles

建立 EMR 工作流程

您可以使用 EmrCreateJobFlowOperator 建立新的 EMR 工作流程。叢集將在步驟完成後自動終止。

預設行為是在叢集啟動後立即將 DAG 任務節點標記為成功 (wait_policy=None)。可以透過使用不同的 wait_policy 修改此行為。可用選項如下

  • WaitPolicy.WAIT_FOR_COMPLETION - DAG 任務節點等待叢集執行中

  • WaitPolicy.WAIT_FOR_STEPS_COMPLETION - DAG 任務節點等待叢集終止

此運算子可以透過傳遞 deferrable=True 作為參數,在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。

工作流程組態

若要在 EMR 上建立工作流程,您需要指定 EMR 叢集的組態

tests/system/amazon/aws/example_emr.py

SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-7.1.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        # If the EMR steps complete too quickly the cluster will be torn down before the other system test
        # tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
        # Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

在這裡,我們建立一個 EMR 單節點叢集 PiCalc。它只有一個步驟 calculate_pi,它使用 Spark 計算 Pi 的值。組態 'KeepJobFlowAliveWhenNoSteps': False 告知叢集在步驟完成後關閉。或者,可以使用沒有 Steps 值的組態,並且可以在稍後使用 EmrAddStepsOperator 新增步驟。請參閱以下詳細資訊。

注意

使用 EMR API 啟動的 EMR 叢集 (如這個叢集) 預設對所有使用者不可見,因此您可能在 EMR 管理主控台中看不到該叢集 - 您可以透過在 JOB_FLOW_OVERRIDES 字典的末尾新增 'VisibleToAllUsers': True 來變更此設定。

如需更多組態資訊,請參閱 Boto3 EMR client

建立工作流程

在以下程式碼中,我們正在使用如上所述的組態建立新的工作流程。

tests/system/amazon/aws/example_emr.py

create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

將步驟新增至 EMR 工作流程

若要將步驟新增至現有的 EMR 工作流程,您可以使用 EmrAddStepsOperator。此運算子可以透過傳遞 deferrable=True 作為參數,在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。

tests/system/amazon/aws/example_emr.py

add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)

終止 EMR 工作流程

若要終止 EMR 工作流程,您可以使用 EmrTerminateJobFlowOperator。此運算子可以透過傳遞 deferrable=True 作為參數,在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。

tests/system/amazon/aws/example_emr.py

remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)

修改 Amazon EMR 容器

若要修改現有的 EMR 容器,您可以使用 EmrContainerSensor

tests/system/amazon/aws/example_emr.py

modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)

啟動 EMR 筆記本執行

您可以使用 EmrStartNotebookExecutionOperator,在連接到執行中叢集的現有筆記本上啟動筆記本執行。

tests/system/amazon/aws/example_emr_notebook_execution.py

start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)

停止 EMR 筆記本執行

您可以使用 EmrStopNotebookExecutionOperator 停止執行中的筆記本執行。

tests/system/amazon/aws/example_emr_notebook_execution.py

stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)

感測器

等待 EMR 筆記本執行狀態

若要監控 EMR 筆記本執行的狀態,您可以使用 EmrNotebookExecutionSensor

tests/system/amazon/aws/example_emr_notebook_execution.py

wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)

等待 Amazon EMR 工作流程狀態

若要監控 EMR 工作流程的狀態,您可以使用 EmrJobFlowSensor

tests/system/amazon/aws/example_emr.py

check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)

等待 Amazon EMR 步驟狀態

若要監控 EMR 工作步驟的狀態,您可以使用 EmrStepSensor

tests/system/amazon/aws/example_emr.py

wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)

節流

Amazon EMR 的服務配額相對較低,詳細資訊請參閱 此處。因此,當使用此頁面中列出的任何運算子和感測器時,您可能會遇到節流問題。為了規避此限制,請考慮自訂 AWS 連線組態以修改預設的 Boto3 重試策略。請參閱 AWS 連線組態文件

此條目是否有幫助?