Google Cloud BigQuery 運算子¶
BigQuery 是 Google 完全託管、PB 級規模、低成本的分析資料倉儲。它是一種無伺服器軟體即服務 (SaaS),不需要資料庫管理員。它讓使用者可以專注於分析資料,使用熟悉的 SQL 尋找有意義的見解。
Airflow 提供運算子來管理資料集和表格、執行查詢和驗證資料。
先決條件任務¶
若要使用這些運算子,您必須執行以下幾個步驟
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
管理資料集¶
建立資料集¶
若要在 BigQuery 資料庫中建立空的資料集,您可以使用 BigQueryCreateEmptyDatasetOperator
。
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
取得資料集詳細資訊¶
若要取得現有資料集的詳細資訊,您可以使用 BigQueryGetDatasetOperator
。
此運算子會傳回 Dataset Resource。
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
列出資料集中的表格¶
若要擷取指定資料集中表格的列表,請使用 BigQueryGetDatasetTablesOperator
。
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
更新表格¶
若要在 BigQuery 中更新表格,您可以使用 BigQueryUpdateTableOperator
。
update 方法會取代整個 Table 資源,而 patch 方法只會取代提交的 Table 資源中提供的欄位。
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
更新資料集¶
若要在 BigQuery 中更新資料集,您可以使用 BigQueryUpdateDatasetOperator
。
update 方法會取代整個資料集資源,而 patch 方法只會取代提交的資料集資源中提供的欄位。
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
刪除資料集¶
若要從 BigQuery 資料庫中刪除現有資料集,您可以使用 BigQueryDeleteDatasetOperator
。
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
管理表格¶
建立原生表格¶
若要在指定的 BigQuery 資料集中建立新的空表格,您可以選擇性地使用綱要,您可以使用 BigQueryCreateEmptyTableOperator
。
用於 BigQuery 表格的綱要可以使用兩種方式指定。您可以直接傳入綱要欄位,也可以將運算子指向 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是包含綱要欄位的 JSON 檔案。
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
您可以使用此運算子在現有表格之上建立檢視表。
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
您也可以使用此運算子建立具體化檢視表,定期快取查詢結果,以提高效能和效率。
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
建立外部表格¶
若要在資料集內使用 Google Cloud Storage 中的資料建立新的外部表格,您可以使用 BigQueryCreateExternalTableOperator
。
與 BigQueryCreateEmptyTableOperator
類似,您可以直接傳入綱要欄位。
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
或者您可以將運算子指向 Google Cloud Storage 物件名稱,其中儲存了綱要。
update_table_schema_json = BigQueryCreateEmptyTableOperator(
task_id="update_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)
若要使用 BigQuery 綱要自動偵測,請設定 autodetect
旗標,而不是提供明確的綱要資訊。
從表格擷取資料¶
若要從 BigQuery 表格擷取資料,您可以使用 BigQueryGetDataOperator
。或者,如果您將欄位傳遞至 selected_fields
,則可以擷取選定欄位的資料。
此運算子的結果可以根據 as_dict
參數的值以兩種不同的格式擷取:False
(預設) - Python 列表的列表,其中巢狀列表中的元素數量將等於擷取的列數。巢狀中的每個元素都將是一個巢狀列表,其中的元素將代表該列的欄位值。True
- Python 字典的列表,其中每個字典代表一行。在每個字典中,鍵是欄位名稱,值是這些欄位的對應值。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
以下範例示範如何在非同步 (可延遲) 模式下使用 BigQueryGetDataOperator
。請注意,可延遲任務需要 Triggerer 在您的 Airflow 部署上執行。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
更新插入表格¶
若要更新插入表格,您可以使用 BigQueryUpsertTableOperator
。
此運算子會更新現有表格,或在給定的資料集中建立新的空表格。
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
更新表格綱要¶
若要更新表格的綱要,您可以使用 BigQueryUpdateTableSchemaOperator
。
此運算子會更新提供的綱要欄位值,同時保持其餘部分不變。這對於在現有表格綱要上設定新的欄位描述很有用。
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
刪除表格¶
若要刪除現有表格,您可以使用 BigQueryDeleteTableOperator
。
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
您也可以使用此運算子刪除檢視表。
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
您也可以使用此運算子刪除具體化檢視表。
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
執行 BigQuery 工作¶
假設您想要執行以下查詢。
INSERT_ROWS_QUERY = (
f"INSERT {DATASET}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
若要在特定的 BigQuery 資料庫中執行 SQL 查詢,您可以使用 BigQueryInsertJobOperator
,並使用適當的查詢工作組態,該組態可以是 Jinja 範本。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=location,
)
以下範例示範如何在非同步 (可延遲) 模式下使用 BigQueryInsertJobOperator
。請注意,可延遲任務需要 Triggerer 在您的 Airflow 部署上執行。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
如需 BigQuery 工作類型的詳細資訊,請查看文件。
如果您想在組態中包含一些檔案,您可以使用 Jinja 範本語言的 include
子句,如下所示
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
location=location,
)
包含的檔案也可以使用 Jinja 範本,這在 .sql
檔案的情況下可能很有用。
此外,您可以使用 BigQueryInsertJobOperator
的 job_id
參數來提高冪等性。如果未傳遞此參數,則 uuid 將用作 job_id
。如果提供,則運算子將嘗試使用此 job_id`
提交新工作。如果已經有具有此 job_id
的工作,則它將重新附加到現有工作。
此外,對於所有這些操作,您都可以在可延遲模式下使用運算子
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
驗證資料¶
檢查查詢結果是否包含資料¶
若要對 BigQuery 執行檢查,您可以使用 BigQueryCheckOperator
此運算子預期一個 SQL 查詢,該查詢將傳回單列。使用 python bool
轉換評估該第一列上的每個值。如果任何值傳回 False
,則檢查失敗並發生錯誤。
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
您也可以在此運算子中使用可延遲模式
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比較查詢結果以傳遞值¶
若要使用 SQL 程式碼執行簡單的值檢查,您可以使用 BigQueryValueCheckOperator
這些運算子預期一個 SQL 查詢,該查詢將傳回單列。將該第一列上的每個值與 pass_value
進行比較,pass_value
可以是字串或數值。如果是數值,您也可以指定 tolerance
。
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
您也可以在此運算子中使用可延遲模式
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比較一段時間內的指標¶
若要檢查以 SQL 運算式給出的指標值是否在 days_back
之前的某個容許範圍內,您可以使用 BigQueryIntervalCheckOperator
或 BigQueryIntervalCheckAsyncOperator
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=location,
)
您也可以在此運算子中使用可延遲模式
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
使用預定義的測試檢查欄位¶
若要檢查欄位是否通過使用者可設定的測試,您可以使用 BigQueryColumnCheckOperator
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
檢查表格層級資料品質¶
若要檢查表格是否通過使用者定義的測試,您可以使用 BigQueryTableCheckOperator
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
感測器¶
檢查表格是否存在¶
若要檢查表格是否存在,您可以定義感測器運算子。這允許延遲下游運算子的執行,直到表格存在為止。如果表格在日期上進行分片,您可以例如使用 {{ ds_nodash }}
巨集作為表格名稱後綴。
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
如果您想在感測器執行時釋放工作器插槽,您也可以在此運算子中使用可延遲模式。
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
check_table_exists_async = BigQueryTableExistenceSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
檢查表格分割區是否存在¶
若要檢查表格是否存在且具有分割區,您可以使用。BigQueryTablePartitionExistenceSensor
。
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
對於 DAY 分割表格,partition_id 參數是 “%Y%m%d” 格式的字串
如果您想在感測器執行時釋放工作器插槽,您也可以在此運算子中使用可延遲模式。
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)