Google Dataprep 運算子¶
Dataprep 是一種智慧雲端資料服務,可視覺化探索、清理和準備資料,以進行分析和機器學習。此服務可用於探索和轉換來自不同和/或大型資料集的原始資料,成為乾淨且結構化的資料,以供進一步分析和處理。Dataprep Job 是一個內部物件,編碼執行 Cloud Dataprep 工作群組一部分所需的資訊。如需更多關於此服務的資訊,請造訪Google Dataprep API 文件
開始之前¶
在 Airflow 中使用 Dataprep 之前,您需要使用 TOKEN 驗證您的帳戶。若要將 Dataprep 與 Airflow 連線,您需要 Dataprep 權杖。請依照 Dataprep 指示進行操作。
TOKEN 應以 JSON 格式新增至 Airflow 中的連線。您可以查看管理連線
DataprepRunJobGroupOperator 將執行指定的工作。運算子需要配方 ID。若要識別配方 ID,請使用runJobGroup 的 API 文件。例如,如果 URL 是 /flows/10?recipe=7,則配方 ID 為 7。配方無法透過此運算子建立。它只能透過 UI 建立,UI 可在此處取得。某些參數可以被 DAG 的 body 請求覆蓋。範例 DAG 中顯示了如何操作。
請參閱以下範例:設定這些欄位的值:.. code-block
Connection Id: "your_conn_id"
Extra: {"token": "TOKEN", "base_url": "https://api.clouddataprep.com"}
先決條件任務¶
若要使用這些運算子,您必須執行一些操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
執行工作群組¶
運算子任務是建立一個工作群組,以已驗證的使用者身分啟動指定的工作。這執行與在應用程式中點擊「執行工作」按鈕相同的動作。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepRunJobGroupOperator
範例用法
run_job_group_task = DataprepRunJobGroupOperator(
task_id="run_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
body_request={
"wrangledDataset": {"id": DATASET_WRANGLED_ID},
"overrides": WRITE_SETTINGS,
},
)
取得工作群組的工作¶
運算子任務是取得關於 Cloud Dataprep 工作中批次工作的資訊。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepGetJobsForJobGroupOperator
範例用法
get_jobs_for_job_group_task = DataprepGetJobsForJobGroupOperator(
task_id="get_jobs_for_job_group",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)
取得工作群組¶
運算子任務是取得指定的工作群組。工作群組是從流程中特定節點執行的工作。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepGetJobGroupOperator
範例用法
get_job_group_task = DataprepGetJobGroupOperator(
task_id="get_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
embed="",
include_deleted=False,
)
複製流程¶
運算子任務是複製流程。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepCopyFlowOperator
範例用法
copy_task = DataprepCopyFlowOperator(
task_id="copy_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_ID,
name=f"copy_{DATASET_NAME}",
)
執行流程¶
運算子任務是執行流程。流程是整理邏輯的容器,其中包含匯入的資料集、配方、輸出物件和參考。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepRunFlowOperator
範例用法
run_flow_task = DataprepRunFlowOperator(
task_id="run_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_COPY_ID,
body_request={},
)
刪除流程¶
運算子任務是刪除流程。流程是整理邏輯的容器,其中包含匯入的資料集、配方、輸出物件和參考。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepDeleteFlowOperator
範例用法
delete_flow_task = DataprepDeleteFlowOperator(
task_id="delete_flow",
dataprep_conn_id=CONNECTION_ID,
flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
)
檢查工作群組是否完成¶
感測器任務是告知系統何時啟動的工作群組已完成,無論成功與否。工作群組是從流程中特定節點執行的工作。
若要取得關於 Cloud Dataprep 工作中工作的資訊,請使用:DataprepJobGroupIsFinishedSensor
範例用法
check_flow_status_sensor = DataprepJobGroupIsFinishedSensor(
task_id="check_flow_status",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)