Amazon EMR Serverless 運算子¶
Amazon EMR Serverless 是 Amazon EMR 中的無伺服器選項,讓資料分析師和工程師能夠輕鬆執行開放原始碼大數據分析框架,而無需設定、管理和擴展叢集或伺服器。您將獲得 Amazon EMR 的所有功能和優勢,而無需專家來規劃和管理叢集。
先決條件任務¶
要使用這些運算子,您必須執行一些操作
使用 AWS Console 或 AWS CLI 建立必要的資源。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 安裝
設定連線.
運算子¶
建立 EMR Serverless 應用程式¶
您可以使用 EmrServerlessCreateApplicationOperator
來建立新的 EMR Serverless 應用程式。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。這需要安裝 aiobotocore 模組。
tests/system/amazon/aws/example_emr_serverless.py
emr_serverless_app = EmrServerlessCreateApplicationOperator(
task_id="create_emr_serverless_task",
release_label="emr-6.6.0",
job_type="SPARK",
config={"name": "new_application"},
)
啟動 EMR Serverless 任務¶
您可以使用 EmrServerlessStartJobOperator
來啟動 EMR Serverless 任務。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。這需要安裝 aiobotocore 模組。
tests/system/amazon/aws/example_emr_serverless.py
start_job = EmrServerlessStartJobOperator(
task_id="start_emr_serverless_job",
application_id=emr_serverless_app_id,
execution_role_arn=role_arn,
job_driver=SPARK_JOB_DRIVER,
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)
開啟應用程式 UI¶
運算子也可以設定為產生應用程式 UI 和 Spark stdout 日誌的一次性連結,方法是傳遞 enable_application_ui_links=True
作為參數。任務開始執行後,這些連結將在相關任務的「詳細資訊」區段中提供。如果 enable_application_ui_links=False
,則連結將會存在但呈現灰色。
您需要確保您具有以下 IAM 權限才能產生儀表板連結。
"emr-serverless:GetDashboardForJobRun"
如果 Amazon S3 或 Amazon CloudWatch 日誌已為 EMR Serverless 啟用,則指向各自控制台的連結也將在任務日誌和任務「詳細資訊」中提供。
停止 EMR Serverless 應用程式¶
您可以使用 EmrServerlessStopApplicationOperator
來停止 EMR Serverless 應用程式。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。這需要安裝 aiobotocore 模組。
tests/system/amazon/aws/example_emr_serverless.py
stop_app = EmrServerlessStopApplicationOperator(
task_id="stop_application",
application_id=emr_serverless_app_id,
force_stop=True,
)
刪除 EMR Serverless 應用程式¶
您可以使用 EmrServerlessDeleteApplicationOperator
來刪除 EMR Serverless 應用程式。此運算子可以透過傳遞 deferrable=True
作為參數,在可延遲模式下執行。這需要安裝 aiobotocore 模組。
tests/system/amazon/aws/example_emr_serverless.py
delete_app = EmrServerlessDeleteApplicationOperator(
task_id="delete_application",
application_id=emr_serverless_app_id,
)
感測器¶
等待 EMR Serverless 任務狀態¶
要監控 EMR Serverless 任務的狀態,您可以使用 EmrServerlessJobSensor
。
tests/system/amazon/aws/example_emr_serverless.py
wait_for_job = EmrServerlessJobSensor(
task_id="wait_for_job",
application_id=emr_serverless_app_id,
job_run_id=start_job.output,
# the default is to wait for job completion, here we just wait for the job to be running.
target_states={*EmrServerlessHook.JOB_SUCCESS_STATES, "RUNNING"},
)
等待 EMR Serverless 應用程式狀態¶
要監控 EMR Serverless 應用程式的狀態,您可以使用 EmrServerlessApplicationSensor
。
tests/system/amazon/aws/example_emr_serverless.py
wait_for_app_creation = EmrServerlessApplicationSensor(
task_id="wait_for_app_creation",
application_id=emr_serverless_app_id,
)