Google Dataform 運算子¶
Dataform 是一種服務,供資料分析師開發、測試、版本控制和排程複雜的 SQL 工作流程,以在 BigQuery 中進行資料轉換。
Dataform 讓您管理資料整合的擷取、載入和轉換 (ELT) 流程中的資料轉換。在從來源系統擷取原始資料並載入到 BigQuery 之後,Dataform 可協助您將其轉換為定義完善、經過測試且有文件記錄的資料表套件。
有關此任務的更多資訊,請造訪Dataform 文件
組態¶
在使用 Dataform 運算子之前,您需要初始化儲存庫和工作區,有關此操作的更多資訊,請造訪Dataform 文件
建立儲存庫¶
若要建立儲存庫以追蹤 Dataform 服務中的程式碼,請使用DataformCreateRepositoryOperator
。以下範例顯示使用方式
make_repository = DataformCreateRepositoryOperator(
task_id="make-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
建立工作區¶
若要建立工作區以儲存 Dataform 服務中的程式碼,請使用DataformCreateWorkspaceOperator
。以下範例顯示使用方式
make_workspace = DataformCreateWorkspaceOperator(
task_id="make-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)
建立編譯結果¶
若要建立編譯結果,請使用DataformCreateCompilationResultOperator
。以下是一個簡單的組態範例
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
取得編譯結果¶
若要取得編譯結果,您可以使用DataformGetCompilationResultOperator
。
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
),
)
建立工作流程調用¶
若要建立工作流程調用,您可以使用DataformCreateWorkflowInvocationOperator
。
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
我們可以同步或非同步模式執行此操作,對於非同步操作,我們也有一個感測器 DataformWorkflowInvocationStateSensor
。
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation-async",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is-workflow-invocation-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
我們還有一個感測器 DataformWorkflowInvocationActionStateSensor
,用於檢查非同步觸發的工作流程調用之特定動作的狀態。
is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
task_id="is-workflow-invocation-action-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
target_name="first_view",
expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
failure_statuses={
WorkflowInvocationAction.State.SKIPPED,
WorkflowInvocationAction.State.DISABLED,
WorkflowInvocationAction.State.CANCELLED,
WorkflowInvocationAction.State.FAILED,
},
)
取得工作流程調用¶
若要取得工作流程調用,您可以使用DataformGetWorkflowInvocationOperator
。
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id="get-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
查詢工作流程調用動作¶
若要查詢工作流程調用動作,您可以使用DataformQueryWorkflowInvocationActionsOperator
。
query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
task_id="query-workflow-invocation-actions",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
取消工作流程調用¶
若要取消工作流程調用,您可以使用 DataformCancelWorkflowInvocationOperator
。
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id="cancel-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
),
)
刪除儲存庫¶
若要刪除儲存庫,請使用DataformDeleteRepositoryOperator
。以下範例顯示使用方式
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
刪除工作區¶
若要刪除工作區,請使用DataformDeleteWorkspaceOperator
。以下範例顯示使用方式
delete_workspace = DataformDeleteWorkspaceOperator(
task_id="delete-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
移除檔案¶
若要移除檔案,請使用DataformRemoveFileOperator
。以下範例顯示使用方式
remove_test_file = DataformRemoveFileOperator(
task_id="remove-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
)
移除目錄¶
若要移除目錄,請使用DataformRemoveDirectoryOperator
。以下範例顯示使用方式
remove_test_directory = DataformRemoveDirectoryOperator(
task_id="remove-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
初始化工作區¶
為提供的工作區建立預設專案結構。在此之前,應先建立工作區和儲存庫。以下範例顯示使用方式
first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
package_name=f"dataform_package_{ENV_ID}",
without_installation=True,
dataform_schema_name=DATAFORM_SCHEMA_NAME,
)
將檔案寫入工作區¶
若要將具有指定內容的檔案寫入指定的工作區,請使用DataformWriteFileOperator
。
test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
task_id="make-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
contents=test_file_content,
)
在工作區中建立目錄¶
若要在指定的工作區中使用給定的路徑建立目錄,請使用DataformMakeDirectoryOperator
。
make_test_directory = DataformMakeDirectoryOperator(
task_id="make-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
安裝 NPM 套件¶
若要為指定的工作區安裝 npm 套件,請使用DataformInstallNpmPackagesOperator
。
install_npm_packages = DataformInstallNpmPackagesOperator(
task_id="install-npm-packages",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)