Azure Data Factory 運算子

Azure Data Factory 是 Azure 的雲端 ETL 服務,適用於向外擴展的無伺服器資料整合和資料轉換。它提供免程式碼 UI,用於直覺式撰寫以及單一面板的監控和管理。

AzureDataFactoryRunPipelineOperator

使用 AzureDataFactoryRunPipelineOperator 以在 Data Factory 內執行管線。預設情況下,運算子將定期檢查已執行管線的狀態,並以「成功」狀態終止。此功能可以停用以進行非同步等待 - 通常與 AzureDataFactoryPipelineRunStatusSensor 搭配使用 - 透過將 wait_for_termination 設定為 False。

以下是使用此運算子執行 Azure Data Factory 管線的範例。

tests/system/microsoft/azure/example_adf_run_pipeline.py[原始碼]

    run_pipeline1 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline1",
        pipeline_name="pipeline1",
        parameters={"myParam": "value"},
    )

以下是使用此運算子執行 Azure Data Factory 管線的範例,並帶有可延遲標誌,以便在 Airflow Triggerer 上發生管線執行狀態的輪詢。

tests/system/microsoft/azure/example_adf_run_pipeline.py[原始碼]

run_pipeline3 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline3",
    pipeline_name="pipeline1",
    parameters={"myParam": "value"},
    deferrable=True,
)

這是使用此運算子執行管線的另一個範例,但與 AzureDataFactoryPipelineRunStatusSensor 結合使用以執行非同步等待。

tests/system/microsoft/azure/example_adf_run_pipeline.py[原始碼]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

您也可以在 AzureDataFactoryPipelineRunStatusSensor 中使用可延遲模式,如果您希望在感測器執行時釋放 worker 插槽。

tests/system/microsoft/azure/example_adf_run_pipeline.py[原始碼]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

非同步輪詢資料工廠管線執行的狀態

使用 AzureDataFactoryPipelineRunStatusAsyncSensor (可延遲版本) 以非同步方式定期檢索資料工廠管線執行的狀態。此感測器將釋放 worker 插槽,因為工作狀態的輪詢發生在 Airflow triggerer 上,從而實現 Airflow 內資源的有效利用。

tests/system/microsoft/azure/example_adf_run_pipeline.py[原始碼]

run_pipeline2 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline2",
    pipeline_name="pipeline2",
    wait_for_termination=False,
)

pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
)

# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor_defered",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_async_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

參考

如需更多資訊,請參閱 Microsoft 文件

此條目是否有幫助?