Google DataFusion 運算子

Cloud Data Fusion 是一個完全託管的雲端原生資料整合服務,可協助使用者有效率地建置和管理 ETL/ELT 資料管線。透過圖形化介面和廣泛的開源程式庫,其中包含預先設定的連接器和轉換,Cloud Data Fusion 將組織的重心從程式碼和整合轉移到洞察和行動。

先決條件任務

若要使用這些運算子,您必須執行幾件事

重新啟動 DataFusion 執行個體

若要重新啟動 Data Fusion 執行個體,請使用: CloudDataFusionRestartInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

您可以使用 Jinja 模板instance_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

刪除 DataFusion 執行個體

若要刪除 Data Fusion 執行個體,請使用: CloudDataFusionDeleteInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以使用 Jinja 模板instance_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

建立 DataFusion 執行個體

若要建立 Data Fusion 執行個體,請使用: CloudDataFusionCreateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

您可以使用 Jinja 模板instance_name, instance, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

更新 DataFusion 執行個體

若要更新 Data Fusion 執行個體,請使用: CloudDataFusionUpdateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

您可以使用 Jinja 模板instance_name, instance, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

取得 DataFusion 執行個體

若要擷取 Data Fusion 執行個體,請使用: CloudDataFusionGetInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

您可以使用 Jinja 模板instance_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

建立 DataFusion 管線

若要建立 Data Fusion 管線,請使用: CloudDataFusionCreatePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

您可以使用 Jinja 模板instance_name, pipeline_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

啟動 DataFusion 管線

若要使用同步模式啟動 Data Fusion 管線: CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    pipeline_timeout=1000,
    task_id="start_pipeline",
)

若要使用非同步模式啟動 Data Fusion 管線: CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

可以使用可延遲模式以非同步方式啟動 Data Fusion 管線。雖然非同步參數可以讓您等待 DataFusion 管線使用同步 sleep() 方法達到終止狀態,但可延遲模式會使用非同步呼叫檢查狀態。無法同時使用非同步和可延遲參數。請查看使用可延遲模式的範例: CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

您可以使用 Jinja 模板instance_name, pipeline_name, runtime_args, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

停止 DataFusion 管線

若要停止 Data Fusion 管線,請使用: CloudDataFusionStopPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

您可以使用 Jinja 模板instance_name, pipeline_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

刪除 DataFusion 管線

若要刪除 Data Fusion 管線,請使用: CloudDataFusionDeletePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以使用 Jinja 模板instance_name, version_id, pipeline_name, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

列出 DataFusion 管線

若要列出 Data Fusion 管線,請使用: CloudDataFusionListPipelinesOperator

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

您可以使用 Jinja 模板instance_name, artifact_name, artifact_version, impersonation_chain 參數,這可讓您動態地決定值。結果會儲存至 XCom,以便讓其他運算子使用。

感測器

當啟動管線以非同步方式觸發時,可以使用感測器執行檢查並驗證管線是否處於正確狀態。

CloudDataFusionPipelineStateSensor.

tests/system/google/cloud/datafusion/example_datafusion.py[原始碼]

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

此條目是否有幫助?