Google Dataform 運算子

Dataform 是一種服務,供資料分析師開發、測試、版本控制和排程複雜的 SQL 工作流程,以在 BigQuery 中進行資料轉換。

Dataform 讓您管理資料整合的擷取、載入和轉換 (ELT) 流程中的資料轉換。在從來源系統擷取原始資料並載入到 BigQuery 之後,Dataform 可協助您將其轉換為定義完善、經過測試且有文件記錄的資料表套件。

有關此任務的更多資訊,請造訪Dataform 文件

組態

在使用 Dataform 運算子之前,您需要初始化儲存庫和工作區,有關此操作的更多資訊,請造訪Dataform 文件

建立儲存庫

若要建立儲存庫以追蹤 Dataform 服務中的程式碼,請使用DataformCreateRepositoryOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

建立工作區

若要建立工作區以儲存 Dataform 服務中的程式碼,請使用DataformCreateWorkspaceOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

建立編譯結果

若要建立編譯結果,請使用DataformCreateCompilationResultOperator。以下是一個簡單的組態範例

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

取得編譯結果

若要取得編譯結果,您可以使用DataformGetCompilationResultOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

建立工作流程調用

若要建立工作流程調用,您可以使用DataformCreateWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

我們可以同步或非同步模式執行此操作,對於非同步操作,我們也有一個感測器 DataformWorkflowInvocationStateSensor

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

我們還有一個感測器 DataformWorkflowInvocationActionStateSensor,用於檢查非同步觸發的工作流程調用之特定動作的狀態。

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
    task_id="is-workflow-invocation-action-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    target_name="first_view",
    expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
    failure_statuses={
        WorkflowInvocationAction.State.SKIPPED,
        WorkflowInvocationAction.State.DISABLED,
        WorkflowInvocationAction.State.CANCELLED,
        WorkflowInvocationAction.State.FAILED,
    },
)

取得工作流程調用

若要取得工作流程調用,您可以使用DataformGetWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

查詢工作流程調用動作

若要查詢工作流程調用動作,您可以使用DataformQueryWorkflowInvocationActionsOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

取消工作流程調用

若要取消工作流程調用,您可以使用 DataformCancelWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

刪除儲存庫

若要刪除儲存庫,請使用DataformDeleteRepositoryOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

刪除工作區

若要刪除工作區,請使用DataformDeleteWorkspaceOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

移除檔案

若要移除檔案,請使用DataformRemoveFileOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

移除目錄

若要移除目錄,請使用DataformRemoveDirectoryOperator。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

初始化工作區

為提供的工作區建立預設專案結構。在此之前,應先建立工作區和儲存庫。以下範例顯示使用方式

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

將檔案寫入工作區

若要將具有指定內容的檔案寫入指定的工作區,請使用DataformWriteFileOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

在工作區中建立目錄

若要在指定的工作區中使用給定的路徑建立目錄,請使用DataformMakeDirectoryOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

安裝 NPM 套件

若要為指定的工作區安裝 npm 套件,請使用DataformInstallNpmPackagesOperator

tests/system/google/cloud/dataform/example_dataform.py[原始碼]

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

此條目是否有幫助?