Google Cloud BigQuery Data Transfer Service 運算子

BigQuery Data Transfer Service 自動將資料從 SaaS 應用程式移動到 Google BigQuery,並以排程、託管的方式進行。您的分析團隊可以為資料倉儲奠定基礎,而無需編寫任何程式碼。BigQuery Data Transfer Service 最初支援 Google 應用程式來源,例如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。透過 BigQuery Data Transfer Service,使用者還可以存取資料連接器,讓您輕鬆地將資料從 Teradata 和 Amazon S3 傳輸到 BigQuery。

先決條件任務

要使用這些運算子,您必須執行以下幾項操作

建立傳輸設定

要建立 DTS 傳輸設定,您可以使用 BigQueryCreateDataTransferOperator

在 Airflow 的情況下,客戶需要建立一個停用自動排程的傳輸設定,然後使用專門的 Airflow 運算子觸發傳輸執行,該運算子將呼叫 StartManualTransferRuns API,例如 BigQueryDataTransferServiceStartTransferRunsOperatorBigQueryCreateDataTransferOperator 檢查傳遞的設定中是否包含自動排程選項。如果存在,則不執行任何操作,否則其值會設定為 True

tests/system/google/cloud/bigquery/example_bigquery_dts.py


# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
    "destination_dataset_id": DATASET_NAME,
    "display_name": "test data transfer",
    "data_source_id": "google_cloud_storage",
    "schedule_options": {"disable_auto_scheduling": True},
    "params": {
        "field_delimiter": ",",
        "max_bad_records": "0",
        "skip_leading_rows": "1",
        "data_path_template": BUCKET_URI,
        "destination_table_name_template": DTS_BQ_TABLE,
        "file_format": "CSV",
    },
}

您可以使用或不使用專案 ID 建立運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。運算子的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
    transfer_config=TRANSFER_CONFIG,
    project_id=PROJECT_ID,
    task_id="gcp_bigquery_create_transfer",
)

transfer_config_id = cast(str, XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))

您可以將 Jinja 模板transfer_configproject_idauthorization_codegcp_conn_idimpersonation_chain 參數一起使用,讓您可以動態地決定值。結果會儲存到 XCom,這允許其他運算子使用它。此外,新設定的 ID 可在 XCom 中以 transfer_config_id 鍵存取。

刪除傳輸設定

要刪除 DTS 傳輸設定,您可以使用 BigQueryDeleteDataTransferConfigOperator

運算子的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
    transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)

您可以將 Jinja 模板transfer_configproject_idauthorization_codegcp_conn_idimpersonation_chain 參數一起使用,讓您可以動態地決定值。

手動啟動傳輸執行

啟動手動傳輸執行,使其立即執行,且 schedule_time 等於目前時間。BigQueryDataTransferServiceStartTransferRunsOperator

運算子的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
    task_id="gcp_bigquery_start_transfer",
    project_id=PROJECT_ID,
    transfer_config_id=transfer_config_id,
    requested_run_time={"seconds": int(time.time() + 60)},
)

您可以將 Jinja 模板transfer_config_idproject_idrequested_time_rangerequested_run_timegcp_conn_idimpersonation_chain 參數一起使用,讓您可以動態地決定值。

要檢查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
    task_id="gcp_run_sensor",
    transfer_config_id=transfer_config_id,
    run_id=cast(str, XComArg(gcp_bigquery_start_transfer, key="run_id")),
    expected_statuses={"SUCCEEDED"},
)

您可以將 Jinja 模板run_idtransfer_config_idexpected_statusesproject_idimpersonation_chain 參數一起使用,讓您可以動態地決定值。

參考

如需更多資訊,請參閱

此條目是否有幫助?