Amazon EMR¶
Amazon EMR (先前稱為 Amazon Elastic MapReduce) 是一個受管叢集平台,可簡化在 AWS 上執行大數據框架 (例如 Apache Hadoop 和 Apache Spark) 以處理和分析大量資料的流程。使用這些框架和相關的開放原始碼專案,您可以為了分析目的和商業智慧工作負載處理資料。Amazon EMR 也讓您可以將大量資料轉換並移入和移出其他 AWS 資料儲存和資料庫,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。
先決條件任務¶
若要使用這些運算子,您必須執行一些事項
使用 AWS Console 或 AWS CLI 建立必要的資源。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 的安裝
設定連線.
運算子¶
注意
為了成功執行範例,您需要為 Amazon EMR 建立 IAM 服務角色 (EMR_EC2_DefaultRole
和 EMR_DefaultRole
)。您可以使用 AWS CLI 建立這些角色:aws emr create-default-roles
。
建立 EMR 工作流程¶
您可以使用 EmrCreateJobFlowOperator
建立新的 EMR 工作流程。叢集將在步驟完成後自動終止。
預設行為是在叢集啟動後立即將 DAG 任務節點標記為成功 (wait_policy=None
)。可以透過使用不同的 wait_policy
修改此行為。可用選項如下
WaitPolicy.WAIT_FOR_COMPLETION
- DAG 任務節點等待叢集執行中WaitPolicy.WAIT_FOR_STEPS_COMPLETION
- DAG 任務節點等待叢集終止
此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。使用 deferrable
模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。
工作流程組態¶
若要在 EMR 上建立工作流程,您需要指定 EMR 叢集的組態
tests/system/amazon/aws/example_emr.py
SPARK_STEPS = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
JOB_FLOW_OVERRIDES: dict[str, Any] = {
"Name": "PiCalc",
"ReleaseLabel": "emr-7.1.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{
"Name": "Primary node",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
],
# If the EMR steps complete too quickly the cluster will be torn down before the other system test
# tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
# Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
"Steps": SPARK_STEPS,
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
}
在這裡,我們建立一個 EMR 單節點叢集 PiCalc。它只有一個步驟 calculate_pi,它使用 Spark 計算 Pi
的值。組態 'KeepJobFlowAliveWhenNoSteps': False
告知叢集在步驟完成後關閉。或者,可以使用沒有 Steps
值的組態,並且可以在稍後使用 EmrAddStepsOperator
新增步驟。請參閱以下詳細資訊。
注意
使用 EMR API 啟動的 EMR 叢集 (如這個叢集) 預設對所有使用者不可見,因此您可能在 EMR 管理主控台中看不到該叢集 - 您可以透過在 JOB_FLOW_OVERRIDES
字典的末尾新增 'VisibleToAllUsers': True
來變更此設定。
如需更多組態資訊,請參閱 Boto3 EMR client。
建立工作流程¶
在以下程式碼中,我們正在使用如上所述的組態建立新的工作流程。
tests/system/amazon/aws/example_emr.py
create_job_flow = EmrCreateJobFlowOperator(
task_id="create_job_flow",
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
將步驟新增至 EMR 工作流程¶
若要將步驟新增至現有的 EMR 工作流程,您可以使用 EmrAddStepsOperator
。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。使用 deferrable
模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。
tests/system/amazon/aws/example_emr.py
add_steps = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id=create_job_flow.output,
steps=SPARK_STEPS,
execution_role_arn=execution_role_arn,
)
終止 EMR 工作流程¶
若要終止 EMR 工作流程,您可以使用 EmrTerminateJobFlowOperator
。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。使用 deferrable
模式將釋放 worker 插槽,並提高 Airflow 叢集中資源的有效利用率。然而,此模式將需要 Airflow triggerer 在您的部署中可用。
tests/system/amazon/aws/example_emr.py
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
job_flow_id=create_job_flow.output,
)
修改 Amazon EMR 容器¶
若要修改現有的 EMR 容器,您可以使用 EmrContainerSensor
。
tests/system/amazon/aws/example_emr.py
modify_cluster = EmrModifyClusterOperator(
task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)
啟動 EMR 筆記本執行¶
您可以使用 EmrStartNotebookExecutionOperator
,在連接到執行中叢集的現有筆記本上啟動筆記本執行。
tests/system/amazon/aws/example_emr_notebook_execution.py
start_execution = EmrStartNotebookExecutionOperator(
task_id="start_execution",
editor_id=editor_id,
cluster_id=cluster_id,
relative_path="EMR-System-Test.ipynb",
service_role="EMR_Notebooks_DefaultRole",
)
停止 EMR 筆記本執行¶
您可以使用 EmrStopNotebookExecutionOperator
停止執行中的筆記本執行。
tests/system/amazon/aws/example_emr_notebook_execution.py
stop_execution = EmrStopNotebookExecutionOperator(
task_id="stop_execution",
notebook_execution_id=notebook_execution_id_1,
)
感測器¶
等待 EMR 筆記本執行狀態¶
若要監控 EMR 筆記本執行的狀態,您可以使用 EmrNotebookExecutionSensor
。
tests/system/amazon/aws/example_emr_notebook_execution.py
wait_for_execution_start = EmrNotebookExecutionSensor(
task_id="wait_for_execution_start",
notebook_execution_id=notebook_execution_id_1,
target_states={"RUNNING"},
poke_interval=5,
)
等待 Amazon EMR 工作流程狀態¶
若要監控 EMR 工作流程的狀態,您可以使用 EmrJobFlowSensor
。
tests/system/amazon/aws/example_emr.py
check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)
等待 Amazon EMR 步驟狀態¶
若要監控 EMR 工作步驟的狀態,您可以使用 EmrStepSensor
。
tests/system/amazon/aws/example_emr.py
wait_for_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id=create_job_flow.output,
step_id=get_step_id(add_steps.output),
)
節流¶
Amazon EMR 的服務配額相對較低,詳細資訊請參閱 此處。因此,當使用此頁面中列出的任何運算子和感測器時,您可能會遇到節流問題。為了規避此限制,請考慮自訂 AWS 連線組態以修改預設的 Boto3 重試策略。請參閱 AWS 連線組態文件。