Google Cloud BigQuery 運算子

BigQuery 是 Google 完全託管、PB 級規模、低成本的分析資料倉儲。它是一種無伺服器軟體即服務 (SaaS),不需要資料庫管理員。它讓使用者可以專注於分析資料,使用熟悉的 SQL 尋找有意義的見解。

Airflow 提供運算子來管理資料集和表格、執行查詢和驗證資料。

先決條件任務

若要使用這些運算子,您必須執行以下幾個步驟

管理資料集

建立資料集

若要在 BigQuery 資料庫中建立空的資料集,您可以使用 BigQueryCreateEmptyDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[原始碼]

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

取得資料集詳細資訊

若要取得現有資料集的詳細資訊,您可以使用 BigQueryGetDatasetOperator

此運算子會傳回 Dataset Resource

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[原始碼]

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

列出資料集中的表格

若要擷取指定資料集中表格的列表,請使用 BigQueryGetDatasetTablesOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

更新表格

若要在 BigQuery 中更新表格,您可以使用 BigQueryUpdateTableOperator

update 方法會取代整個 Table 資源,而 patch 方法只會取代提交的 Table 資源中提供的欄位。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

更新資料集

若要在 BigQuery 中更新資料集,您可以使用 BigQueryUpdateDatasetOperator

update 方法會取代整個資料集資源,而 patch 方法只會取代提交的資料集資源中提供的欄位。

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[原始碼]

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

刪除資料集

若要從 BigQuery 資料庫中刪除現有資料集,您可以使用 BigQueryDeleteDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[原始碼]

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

管理表格

建立原生表格

若要在指定的 BigQuery 資料集中建立新的空表格,您可以選擇性地使用綱要,您可以使用 BigQueryCreateEmptyTableOperator

用於 BigQuery 表格的綱要可以使用兩種方式指定。您可以直接傳入綱要欄位,也可以將運算子指向 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是包含綱要欄位的 JSON 檔案。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

create_table = BigQueryCreateEmptyTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

您可以使用此運算子在現有表格之上建立檢視表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

create_view = BigQueryCreateEmptyTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    view={
        "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "useLegacySql": False,
    },
)

您也可以使用此運算子建立具體化檢視表,定期快取查詢結果,以提高效能和效率。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

create_materialized_view = BigQueryCreateEmptyTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    materialized_view={
        "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "enableRefresh": True,
        "refreshIntervalMs": 2000000,
    },
)

建立外部表格

若要在資料集內使用 Google Cloud Storage 中的資料建立新的外部表格,您可以使用 BigQueryCreateExternalTableOperator

BigQueryCreateEmptyTableOperator 類似,您可以直接傳入綱要欄位。

tests/system/google/cloud/bigquery/example_bigquery_operations.py[原始碼]

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    destination_project_dataset_table=f"{DATASET_NAME}.external_table",
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

或者您可以將運算子指向 Google Cloud Storage 物件名稱,其中儲存了綱要。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

update_table_schema_json = BigQueryCreateEmptyTableOperator(
    task_id="update_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)

若要使用 BigQuery 綱要自動偵測,請設定 autodetect 旗標,而不是提供明確的綱要資訊。

從表格擷取資料

若要從 BigQuery 表格擷取資料,您可以使用 BigQueryGetDataOperator。或者,如果您將欄位傳遞至 selected_fields,則可以擷取選定欄位的資料。

此運算子的結果可以根據 as_dict 參數的值以兩種不同的格式擷取:False (預設) - Python 列表的列表,其中巢狀列表中的元素數量將等於擷取的列數。巢狀中的每個元素都將是一個巢狀列表,其中的元素將代表該列的欄位值。True - Python 字典的列表,其中每個字典代表一行。在每個字典中,鍵是欄位名稱,值是這些欄位的對應值。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

以下範例示範如何在非同步 (可延遲) 模式下使用 BigQueryGetDataOperator。請注意,可延遲任務需要 Triggerer 在您的 Airflow 部署上執行。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME_1,
    use_legacy_sql=False,
    max_results=10,
    selected_fields="value",
    location=LOCATION,
    deferrable=True,
)

更新插入表格

若要更新插入表格,您可以使用 BigQueryUpsertTableOperator

此運算子會更新現有表格,或在給定的資料集中建立新的空表格。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

更新表格綱要

若要更新表格的綱要,您可以使用 BigQueryUpdateTableSchemaOperator

此運算子會更新提供的綱要欄位值,同時保持其餘部分不變。這對於在現有表格綱要上設定新的欄位描述很有用。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

刪除表格

若要刪除現有表格,您可以使用 BigQueryDeleteTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

您也可以使用此運算子刪除檢視表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

您也可以使用此運算子刪除具體化檢視表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[原始碼]

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

執行 BigQuery 工作

假設您想要執行以下查詢。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

    INSERT_ROWS_QUERY = (
        f"INSERT {DATASET}.{TABLE_1} VALUES "
        f"(42, 'monty python', '{INSERT_DATE}'), "
        f"(42, 'fishy fish', '{INSERT_DATE}');"
    )

若要在特定的 BigQuery 資料庫中執行 SQL 查詢,您可以使用 BigQueryInsertJobOperator,並使用適當的查詢工作組態,該組態可以是 Jinja 範本。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=location,
)

以下範例示範如何在非同步 (可延遲) 模式下使用 BigQueryInsertJobOperator。請注意,可延遲任務需要 Triggerer 在您的 Airflow 部署上執行。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

如需 BigQuery 工作類型的詳細資訊,請查看文件

如果您想在組態中包含一些檔案,您可以使用 Jinja 範本語言的 include 子句,如下所示

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

包含的檔案也可以使用 Jinja 範本,這在 .sql 檔案的情況下可能很有用。

此外,您可以使用 BigQueryInsertJobOperatorjob_id 參數來提高冪等性。如果未傳遞此參數,則 uuid 將用作 job_id。如果提供,則運算子將嘗試使用此 job_id` 提交新工作。如果已經有具有此 job_id 的工作,則它將重新附加到現有工作。

此外,對於所有這些操作,您都可以在可延遲模式下使用運算子

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

驗證資料

檢查查詢結果是否包含資料

若要對 BigQuery 執行檢查,您可以使用 BigQueryCheckOperator

此運算子預期一個 SQL 查詢,該查詢將傳回單列。使用 python bool 轉換評估該第一列上的每個值。如果任何值傳回 False,則檢查失敗並發生錯誤。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比較查詢結果以傳遞值

若要使用 SQL 程式碼執行簡單的值檢查,您可以使用 BigQueryValueCheckOperator

這些運算子預期一個 SQL 查詢,該查詢將傳回單列。將該第一列上的每個值與 pass_value 進行比較,pass_value 可以是字串或數值。如果是數值,您也可以指定 tolerance

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比較一段時間內的指標

若要檢查以 SQL 運算式給出的指標值是否在 days_back 之前的某個容許範圍內,您可以使用 BigQueryIntervalCheckOperatorBigQueryIntervalCheckAsyncOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=location,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[原始碼]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_NAME_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

使用預定義的測試檢查欄位

若要檢查欄位是否通過使用者可設定的測試,您可以使用 BigQueryColumnCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

    column_check = BigQueryColumnCheckOperator(
        task_id="column_check",
        table=f"{DATASET}.{TABLE_1}",
        column_mapping={"value": {"null_check": {"equal_to": 0}}},
    )

檢查表格層級資料品質

若要檢查表格是否通過使用者定義的測試,您可以使用 BigQueryTableCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[原始碼]

    table_check = BigQueryTableCheckOperator(
        task_id="table_check",
        table=f"{DATASET}.{TABLE_1}",
        checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
    )

感測器

檢查表格是否存在

若要檢查表格是否存在,您可以定義感測器運算子。這允許延遲下游運算子的執行,直到表格存在為止。如果表格在日期上進行分片,您可以例如使用 {{ ds_nodash }} 巨集作為表格名稱後綴。

BigQueryTableExistenceSensor.

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

如果您想在感測器執行時釋放工作器插槽,您也可以在此運算子中使用可延遲模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_exists_def = BigQueryTableExistenceSensor(
    task_id="check_table_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_exists_async = BigQueryTableExistenceSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

檢查表格分割區是否存在

若要檢查表格是否存在且具有分割區,您可以使用。BigQueryTablePartitionExistenceSensor

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

對於 DAY 分割表格,partition_id 參數是 “%Y%m%d” 格式的字串

如果您想在感測器執行時釋放工作器插槽,您也可以在此運算子中使用可延遲模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[原始碼]

check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

參考文獻

如需進一步資訊,請參閱

此條目是否有幫助?