Google DataFusion 運算子¶
Cloud Data Fusion 是一個完全託管的雲端原生資料整合服務,可協助使用者有效率地建置和管理 ETL/ELT 資料管線。透過圖形化介面和廣泛的開源程式庫,其中包含預先設定的連接器和轉換,Cloud Data Fusion 將組織的重心從程式碼和整合轉移到洞察和行動。
先決條件任務¶
若要使用這些運算子,您必須執行幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
如 Google Cloud 文件中所述,為您的專案啟用計費功能。
如 Cloud Console 文件中所述,啟用 API。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'安裝的詳細資訊請參閱此處 Installation。
重新啟動 DataFusion 執行個體¶
若要重新啟動 Data Fusion 執行個體,請使用: CloudDataFusionRestartInstanceOperator
。
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
您可以使用 Jinja 模板 與 instance_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
刪除 DataFusion 執行個體¶
若要刪除 Data Fusion 執行個體,請使用: CloudDataFusionDeleteInstanceOperator
。
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以使用 Jinja 模板 與 instance_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
建立 DataFusion 執行個體¶
若要建立 Data Fusion 執行個體,請使用: CloudDataFusionCreateInstanceOperator
。
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
您可以使用 Jinja 模板 與 instance_name
, instance
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
更新 DataFusion 執行個體¶
若要更新 Data Fusion 執行個體,請使用: CloudDataFusionUpdateInstanceOperator
。
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
您可以使用 Jinja 模板 與 instance_name
, instance
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
取得 DataFusion 執行個體¶
若要擷取 Data Fusion 執行個體,請使用: CloudDataFusionGetInstanceOperator
。
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
您可以使用 Jinja 模板 與 instance_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
建立 DataFusion 管線¶
若要建立 Data Fusion 管線,請使用: CloudDataFusionCreatePipelineOperator
。
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
您可以使用 Jinja 模板 與 instance_name
, pipeline_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
啟動 DataFusion 管線¶
若要使用同步模式啟動 Data Fusion 管線: CloudDataFusionStartPipelineOperator
。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
pipeline_timeout=1000,
task_id="start_pipeline",
)
若要使用非同步模式啟動 Data Fusion 管線: CloudDataFusionStartPipelineOperator
。
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
可以使用可延遲模式以非同步方式啟動 Data Fusion 管線。雖然非同步參數可以讓您等待 DataFusion 管線使用同步 sleep() 方法達到終止狀態,但可延遲模式會使用非同步呼叫檢查狀態。無法同時使用非同步和可延遲參數。請查看使用可延遲模式的範例: CloudDataFusionStartPipelineOperator
。
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline_def",
deferrable=True,
)
您可以使用 Jinja 模板 與 instance_name
, pipeline_name
, runtime_args
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
停止 DataFusion 管線¶
若要停止 Data Fusion 管線,請使用: CloudDataFusionStopPipelineOperator
。
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
您可以使用 Jinja 模板 與 instance_name
, pipeline_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
刪除 DataFusion 管線¶
若要刪除 Data Fusion 管線,請使用: CloudDataFusionDeletePipelineOperator
。
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以使用 Jinja 模板 與 instance_name
, version_id
, pipeline_name
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
列出 DataFusion 管線¶
若要列出 Data Fusion 管線,請使用: CloudDataFusionListPipelinesOperator
。
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
您可以使用 Jinja 模板 與 instance_name
, artifact_name
, artifact_version
, impersonation_chain
參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。
感測器¶
當啟動管線以非同步方式觸發時,可以使用感測器執行檢查並驗證管線是否處於正確狀態。
CloudDataFusionPipelineStateSensor
.
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)