Google Cloud BigQuery Data Transfer Service 運算子¶
BigQuery Data Transfer Service 自動將資料從 SaaS 應用程式移動到 Google BigQuery,並以排程、託管的方式進行。您的分析團隊可以為資料倉儲奠定基礎,而無需編寫任何程式碼。BigQuery Data Transfer Service 最初支援 Google 應用程式來源,例如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。透過 BigQuery Data Transfer Service,使用者還可以存取資料連接器,讓您輕鬆地將資料從 Teradata 和 Amazon S3 傳輸到 BigQuery。
先決條件任務¶
要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件 中所述。
啟用 API,如 Cloud Console 文件 中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
建立傳輸設定¶
要建立 DTS 傳輸設定,您可以使用 BigQueryCreateDataTransferOperator
。
在 Airflow 的情況下,客戶需要建立一個停用自動排程的傳輸設定,然後使用專門的 Airflow 運算子觸發傳輸執行,該運算子將呼叫 StartManualTransferRuns API,例如 BigQueryDataTransferServiceStartTransferRunsOperator
。BigQueryCreateDataTransferOperator
檢查傳遞的設定中是否包含自動排程選項。如果存在,則不執行任何操作,否則其值會設定為 True
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
"destination_dataset_id": DATASET_NAME,
"display_name": "test data transfer",
"data_source_id": "google_cloud_storage",
"schedule_options": {"disable_auto_scheduling": True},
"params": {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": DTS_BQ_TABLE,
"file_format": "CSV",
},
}
您可以使用或不使用專案 ID 建立運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。運算子的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = cast(str, XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))
您可以將 Jinja 模板 與 transfer_config
、 project_id
、 authorization_code
、 gcp_conn_id
、 impersonation_chain
參數一起使用,讓您可以動態地決定值。結果會儲存到 XCom,這允許其他運算子使用它。此外,新設定的 ID 可在 XCom 中以 transfer_config_id
鍵存取。
刪除傳輸設定¶
要刪除 DTS 傳輸設定,您可以使用 BigQueryDeleteDataTransferConfigOperator
。
運算子的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
您可以將 Jinja 模板 與 transfer_config
、 project_id
、 authorization_code
、 gcp_conn_id
、 impersonation_chain
參數一起使用,讓您可以動態地決定值。
手動啟動傳輸執行¶
啟動手動傳輸執行,使其立即執行,且 schedule_time 等於目前時間。BigQueryDataTransferServiceStartTransferRunsOperator
。
運算子的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
project_id=PROJECT_ID,
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
您可以將 Jinja 模板 與 transfer_config_id
、 project_id
、 requested_time_range
、 requested_run_time
、 gcp_conn_id
、 impersonation_chain
參數一起使用,讓您可以動態地決定值。
要檢查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=cast(str, XComArg(gcp_bigquery_start_transfer, key="run_id")),
expected_statuses={"SUCCEEDED"},
)
您可以將 Jinja 模板 與 run_id
、 transfer_config_id
、 expected_statuses
、 project_id
、 impersonation_chain
參數一起使用,讓您可以動態地決定值。