Google Cloud BigQuery 傳輸運算子至 Postgres¶
Google Cloud BigQuery 是 Google Cloud 的無伺服器資料倉儲服務。 PostgreSQL 是一個開源關聯式資料庫管理系統。此運算子可用於將資料從 BigQuery 表格複製到 PostgreSQL。
先決條件任務¶
要使用這些運算子,您必須完成幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[google]'安裝的詳細資訊請參閱安裝。
運算子¶
將資料從 BigQuery 表格複製到 Postgres 表格是透過 BigQueryToPostgresOperator
運算子執行的。
使用 Jinja 模板 與 target_table_name
, impersonation_chain
, dataset_id
, table_id
來動態定義值。
您可以使用參數 selected_fields
來限制要複製的欄位(預設為所有欄位),以及參數 replace
來覆寫目標表格而不是附加到其後。如果使用 replace
參數,則由於底層 INSERT 命令中 PostgreSQL 的 ON CONFLICT 子句的限制,因此需要指定 selected_fields
和 replace_index
參數。
更多資訊,請參考以上連結。
傳輸資料¶
以下運算子將資料從 BigQuery 表格複製到 PostgreSQL。
bigquery_to_postgres = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=False,
)
此運算子也可以使用來自 BigQuery 表格的匹配資料來取代 PostgreSQL 表格中的資料。
bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres_upsert",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=True,
selected_fields=["emp_name", "salary"],
replace_index=["emp_name", "salary"],
)