airflow.providers.google.cloud.hooks.bigquery

BigQuery Hook 和 BigQuery 的非常基本的 PEP 249 實作。

模組內容

類別

BigQueryHook

與 BigQuery 互動。

BigQueryConnection

BigQuery 連線。

BigQueryBaseCursor

BigQuery cursor。

BigQueryCursor

一個非常基本的 BigQuery PEP 249 cursor 實作。

BigQueryAsyncHook

使用 gcloud-aio 函式庫來檢索 Job 詳細資訊。

BigQueryTableAsyncHook

BigQuery Table 的非同步 hook。

函數

split_tablename(table_input, default_project_id[, ...])

屬性

log

BigQueryJob

airflow.providers.google.cloud.hooks.bigquery.log[source]
airflow.providers.google.cloud.hooks.bigquery.BigQueryJob[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]

繼承自: airflow.providers.google.common.hooks.base_google.GoogleBaseHook, airflow.providers.common.sql.hooks.sql.DbApiHook

與 BigQuery 互動。

此 hook 使用 Google Cloud 連線。

參數
  • gcp_conn_id – 用於 GCP 憑證的 Airflow 連線。

  • use_legacy_sql (bool) – 這指定是否使用舊版 SQL 方言。

  • location (str | None) – BigQuery 資源的位置。

  • priority (str) – 指定查詢的優先順序。可能的值包括 INTERACTIVE 和 BATCH。預設值為 INTERACTIVE。

  • api_resource_configs (dict | None) – 這包含應用於 Google BigQuery jobs 的參數設定。

  • impersonation_chain – 這是可選的服務帳戶,用於使用短期憑證模擬身分。

  • impersonation_scopes (str | collections.abc.Sequence[str] | None) – 模擬帳戶的可選 scopes 列表。將覆蓋連線中的 scopes。

  • labels (dict | None) – BigQuery 資源標籤。

property scopes: collections.abc.Sequence[str][source]

傳回 OAuth 2.0 scopes。

傳回

傳回在 impersonation_scopes、連線設定或預設 scope 中定義的 scope

傳回類型

collections.abc.Sequence[str]

conn_name_attr = 'gcp_conn_id'[source]
default_conn_name = 'google_cloud_bigquery_default'[source]
conn_type = 'gcpbigquery'[source]
hook_name = 'Google Bigquery'[source]
classmethod get_connection_form_widgets()[source]

傳回要新增到連線表單的連線 widgets。

classmethod get_ui_field_behaviour()[source]

傳回自訂欄位行為。

get_conn()[source]

取得 BigQuery PEP 249 連線物件。

get_client(project_id=PROVIDE_PROJECT_ID, location=None)[source]

取得已驗證的 BigQuery Client。

參數
  • project_id (str) – Client 代表操作的專案的專案 ID。

  • location (str | None) – jobs / datasets / tables 的預設位置。

get_uri()[source]

DbApiHook 覆寫以用於 get_sqlalchemy_engine()

get_sqlalchemy_engine(engine_kwargs=None)[source]

建立 SQLAlchemy engine 物件。

參數

engine_kwargs (dict | None) – 用於 create_engine() 的 Kwargs。

get_records(sql, parameters=None)[source]

執行 sql 並傳回一組記錄。

參數
  • sql – 要執行的 sql 語句 (str) 或要執行的 sql 語句列表

  • parameters – 用於呈現 SQL 查詢的參數。

abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]

插入列。

目前不支援插入。理論上,您可以使用 BigQuery 的串流 API 將列插入到表格中,但尚未實作。

get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]

取得 BigQuery 結果的 Pandas DataFrame。

DbApiHook 方法必須被覆寫,因為 Pandas 除了 SQLite 之外,不支援 PEP 249 連線。

參數
  • sql (str) – 要執行的 BigQuery SQL。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用於呈現 SQL 查詢的參數 (未使用,保留以覆寫父類別方法)

  • dialect (str | None) – BigQuery SQL 的方言 – 舊版 SQL 或標準 SQL,如果未指定,則預設使用 self.use_legacy_sql

  • kwargs – (可選) 傳遞到 pandas_gbq.read_gbq 方法中

table_exists(dataset_id, table_id, project_id)[source]

檢查 Google BigQuery 中是否存在表格。

參數
  • project_id (str) – 要在其中查找表格的 Google Cloud 專案。提供給 hook 的連線必須提供對指定專案的存取權。

  • dataset_id (str) – 要在其中查找表格的 dataset 名稱。

  • table_id (str) – 要檢查是否存在的表格名稱。

table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]

檢查 Google BigQuery 中是否存在分割區。

參數
  • project_id (str) – 要在其中查找表格的 Google Cloud 專案。提供給 hook 的連線必須提供對指定專案的存取權。

  • dataset_id (str) – 要在其中查找表格的 dataset 名稱。

  • table_id (str) – 要檢查是否存在的表格名稱。

  • partition_id (str) – 要檢查是否存在的分割區名稱。

create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]

在 dataset 中建立新的空表格。

若要建立由 SQL 查詢定義的視圖,請將字典解析為 view 參數。

參數
傳回

建立的表格

傳回類型

google.cloud.bigquery.table.Table

create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]

建立新的空 dataset。

參數
  • project_id (str) – 我們要在其中建立空 dataset 的專案名稱。如果 dataset_reference 中有 projectId,則不需要提供。

  • dataset_id (str | None) – dataset 的 id。如果 dataset_reference 中有 datasetId,則不需要提供。

  • location (str | None) – (可選) dataset 應駐留的地理位置。沒有預設值,但如果沒有提供任何內容,dataset 將在美國建立。

  • dataset_reference (dict[str, Any] | None) – 可以與請求正文一起提供的 Dataset 參考。更多資訊: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • exists_ok (bool) – 如果為 True,則在建立 dataset 時忽略「已存在」錯誤。

get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]

取得給定 dataset 的表格列表。

更多資訊,請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list

參數
  • dataset_id (str) – 請求的 dataset 的 dataset ID。

  • project_id (str) – (可選) 請求的 dataset 的專案。如果為 None,將使用 self.project_id。

  • max_results (int | None) – (可選) 要傳回的最大表格數。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

傳回

與 dataset 關聯的表格列表。

傳回類型

list[dict[str, Any]]

delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]

刪除您專案中的 BigQuery dataset。

參數
  • project_id (str) – 我們擁有 dataset 的專案名稱。

  • dataset_id (str) – 要刪除的 dataset。

  • delete_contents (bool) – 如果為 True,則刪除 dataset 中的所有表格。如果為 False 且 dataset 包含表格,則請求將失敗。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

變更表格的某些欄位。

使用 fields 指定要更新的欄位。至少必須提供一個欄位。如果 fields 中列出了某個欄位,且該欄位在 table 中為 None,則該欄位值將被刪除。

如果 table.etag 不是 None,則只有當伺服器上的表格具有相同的 ETag 時,更新才會成功。因此,使用 get_table 讀取表格、變更其欄位,然後將其傳遞給 update_table,將確保只有在自上次讀取以來表格未發生修改時,才會儲存變更。

參數
  • project_id (str) – 要在其中建立表格的專案。

  • dataset_id (str | None) – 要在其中建立表格的 dataset。

  • table_id (str | None) – 要建立的表格名稱。

  • table_resource (dict[str, Any]) – 表格資源,如文件所述: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表格必須包含 tableReference 或必須提供 project_iddataset_idtable_id

  • fields (list[str] | None) – 要變更的 table 欄位,以表格屬性(例如 “friendly_name”)拼寫。

insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]

將資料串流到 BigQuery,一次一筆記錄,無需載入工作。

參數
  • project_id (str) – 我們在其中建立表格的專案名稱

  • dataset_id (str) – 我們在其中建立表格的資料集名稱

  • table_id (str) – 表格名稱

  • rows (list) –

    要插入的列

    rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
    

  • ignore_unknown_values (bool) – [選用] 接受包含與結構描述不符的值的列。未知的數值將被忽略。預設值為 false,這會將未知的數值視為錯誤。

  • skip_invalid_rows (bool) – [選用] 插入請求中的所有有效列,即使存在無效列也一樣。預設值為 false,這會導致在存在任何無效列時,整個請求都會失敗。

  • fail_on_error (bool) – [選用] 如果發生任何錯誤,則強制任務失敗。預設值為 false,表示即使發生任何插入錯誤,任務也不應失敗。

update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]

變更資料集的某些欄位。

使用 fields 指定要更新的欄位。至少必須提供一個欄位。如果 fields 中列出了某個欄位,且該欄位在 dataset 中為 None,則會將其刪除。

如果 dataset.etag 不是 None,則只有當伺服器上的資料集具有相同的 ETag 時,更新才會成功。因此,使用 get_dataset 讀取資料集、變更其欄位,然後將其傳遞給 update_dataset,將確保只有在自上次讀取以來資料集未發生修改時,才會儲存變更。

參數
get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

取得目前專案中的所有 BigQuery 資料集。

如需更多資訊,請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

參數
  • project_id (str) – 您嘗試取得所有資料集的 Google Cloud 專案

  • include_all (bool) – 如果結果包含隱藏的資料集,則為 True。預設為 False。

  • filter – 用於依標籤篩選結果的運算式。如需語法,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter

  • filter – str

  • max_results (int | None) – 要傳回的資料集最大數量。

  • max_results – int

  • page_token (str | None) – 代表資料集游標的符記。如果未傳遞,API 將傳回第一頁資料集。此符記標記要傳回的迭代器開始位置,並且可以在 HTTPIteratornext_page_token 存取 page_token 的值。

  • page_token – str

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

  • return_iterator (bool) – 不是傳回 list[Row],而是傳回 HTTPIterator,可用於取得 next_page_token 屬性。

get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]

擷取 dataset_id 參照的資料集。

參數
  • dataset_id (str) – BigQuery 資料集 ID

  • project_id (str) – Google Cloud 專案 ID

傳回

dataset_resource

傳回類型

google.cloud.bigquery.dataset.Dataset

另請參閱

如需更多資訊,請參閱資料集資源內容: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]

授予資料集的授權檢視存取權給檢視表格。

如果已授予此檢視對資料集的存取權,則不執行任何動作。此方法不是原子性的。執行此方法可能會覆蓋同時發生的更新。

參數
  • source_dataset (str) – 來源資料集

  • view_dataset (str) – 檢視所在的資料集

  • view_table (str) – 檢視的表格

  • project_id (str) – 來源資料集的專案。如果為 None,將使用 self.project_id。

  • view_project (str | None) – 檢視所在的專案。如果為 None,將使用 self.project_id。

傳回

來源資料集的資料集資源。

傳回類型

dict[str, Any]

run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]

如果表格存在則更新,否則建立新表格。

由於 BigQuery 原生不允許表格 upsert,因此這不是原子操作。

參數
delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]

從資料集中刪除現有表格。

如果表格不存在,則傳回錯誤,除非 not_found_ok 設定為 True。

參數
  • table_id (str) – 一個虛線 (<project>.|<project>:)<dataset>.<table>,指示要刪除的表格。

  • not_found_ok (bool) – 如果為 True,即使要求的表格不存在,也傳回成功。

  • project_id (str) – 用於執行請求的專案

list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

列出表格中的列。

請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list

參數
  • dataset_id (str) – 要求的表格的資料集 ID。

  • table_id (str) – 要求的表格的表格 ID。

  • max_results (int | None) – 要傳回的最大結果數。

  • selected_fields (list[str] | str | None) – 要傳回的欄位清單(逗號分隔)。如果未指定,則傳回所有欄位。

  • page_token (str | None) – 頁面符記,從先前的呼叫傳回,用於識別結果集。

  • start_index (int | None) – 要讀取的起始列的從零開始的索引。

  • project_id (str) – Client 代表操作的專案的專案 ID。

  • location (str | None) – 工作的預設位置。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

  • return_iterator (bool) – 不是傳回 list[Row],而是傳回 RowIterator,可用於取得 next_page_token 屬性。

傳回

列的清單

傳回類型

list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator

get_schema(dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

取得給定資料集和表格的結構描述。

參數
  • dataset_id (str) – 要求的表格的資料集 ID

  • table_id (str) – 要求的表格的表格 ID

  • project_id (str) – 要求的表格的選用專案 ID。如果未提供,將使用連接器設定的專案。

傳回

表格結構描述

傳回類型

dict

update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

更新給定資料集和表格之結構描述中的欄位。

請注意,結構描述中的某些欄位是不可變更的;嘗試變更它們會導致例外狀況。

如果包含新的欄位,則會插入該欄位,這需要設定所有必要欄位。

參數
  • include_policy_tags (bool) – 如果設定為 True,原則標籤將包含在更新請求中,即使未變更,也需要特殊權限,請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 要更新的請求表格的資料集 ID

  • table_id (str) – 要更新的表格的表格 ID

  • schema_fields_updates (list[dict[str, Any]]) –

    部分結構描述資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {"name": "salary", "description": "Some New Description"},
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • project_id (str) – 我們要在其中更新表格的專案名稱。

poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]

檢查工作是否已完成。

參數
  • job_id (str) – 工作的 ID。

  • project_id (str) – 執行工作的 Google Cloud 專案

  • location (str | None) – 工作執行的位置

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

cancel_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

取消工作並等待取消完成。

參數
  • job_id (str) – 工作的 ID。

  • project_id (str) – 執行工作的 Google Cloud 專案

  • location (str | None) – 工作執行的位置

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

擷取 BigQuery 工作。

參數
  • job_id (str) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。

  • project_id (str) – 執行工作的 Google Cloud 專案。

  • location (str | None) – 工作執行的位置。

insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]

執行 BigQuery 工作並等待其完成。

參數
  • configuration (dict) – configuration 參數直接對應至工作物件中的 BigQuery configuration 欄位。請參閱 https://cloud.google.com/bigquery/docs/reference/v2/jobs 以取得詳細資訊。

  • job_id (str | None) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。如果未提供,則會產生 uuid。

  • project_id (str) – 執行工作的 Google Cloud 專案。

  • location (str | None) – 工作執行的位置。

  • nowait (bool) – 是否插入工作而不等待結果。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

  • timeout (float | None) – 等待底層 HTTP 傳輸的秒數,然後再使用 retry

傳回

工作 ID。

傳回類型

BigQueryJob

generate_job_id(job_id, dag_id, task_id, logical_date, configuration, force_rerun=False)[source]
split_tablename(table_input, default_project_id, var_name=None)[source]
get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]

取得指定 job_id 的查詢結果。

參數
  • job_id (str) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。

  • location (str) – 用於此操作的位置。

  • selected_fields (list[str] | str | None) – 要傳回的欄位清單(逗號分隔)。如果未指定,則傳回所有欄位。

  • max_results (int | None) – 要從表格擷取的最大記錄(列)數。

  • project_id (str) – 工作執行的 Google Cloud 專案。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC。

  • job_retry (google.api_core.retry.Retry) – 如何重試失敗的工作。

傳回

列出在選定欄位(如果有的話)篩選的資料列。

引發

AirflowException

傳回類型

list[dict[str, Any]]

class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[原始碼]

BigQuery 連線。

BigQuery 沒有持續連線的概念。因此,這些物件是游標的小型無狀態工廠,游標會完成所有實際工作。

close()[原始碼]

不執行任何動作。BigQueryConnection 不需要此操作。

commit()[原始碼]

不執行任何動作。BigQueryConnection 不支援交易。

cursor()[原始碼]

使用連線傳回新的 Cursor 物件。

abstract rollback()[原始碼]

不執行任何動作。BigQueryConnection 不支援交易。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[原始碼]

繼承自: airflow.utils.log.logging_mixin.LoggingMixin

BigQuery cursor。

BigQuery 基礎游標包含對 BigQuery 執行查詢的輔助方法。在不需要 PEP 249 游標的情況下,運算子可以直接使用這些方法。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[原始碼]

繼承自: BigQueryBaseCursor

一個非常基本的 BigQuery PEP 249 cursor 實作。

PyHive PEP 249 實作被用作參考

https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py

property description: list[原始碼]

傳回游標描述。

property rowcount: int[原始碼]

預設傳回 -1 以表示不支援此功能。

arraysize[原始碼]
close()[原始碼]

預設不執行任何動作。

execute(operation, parameters=None)[原始碼]

執行 BigQuery 查詢,並更新 BigQueryCursor 描述。

參數
  • operation (str) – 要執行的查詢。

  • parameters (dict | None) – 要替換到查詢中的參數。

executemany(operation, seq_of_parameters)[原始碼]

使用不同參數多次執行 BigQuery 查詢。

參數
  • operation (str) – 要執行的查詢。

  • seq_of_parameters (list) – 要替換到查詢中的字典參數清單。

flush_results()[原始碼]

清除與游標屬性相關的結果。

fetchone()[原始碼]

擷取查詢結果集的下一列。

next()[原始碼]

從緩衝區傳回下一列。

用於 fetchone 的輔助方法。

如果緩衝區是空的,則嘗試分頁瀏覽結果集以取得下一頁,並將其載入緩衝區。

fetchmany(size=None)[原始碼]

擷取查詢結果的下一組資料列。

這會傳回序列的序列(例如,元組清單)。當沒有更多資料列可用時,會傳回空序列。

每次呼叫要擷取的資料列數由參數指定。如果未指定,游標的 arraysize 會決定要擷取的資料列數。

此方法嘗試擷取 size 參數指示的盡可能多的資料列。如果由於指定的資料列數不可用而無法實現,則可能會傳回較少的資料列。

如果先前呼叫 execute() 未產生任何結果集,或尚未發出任何呼叫,則會引發 Error(或子類別)例外。

fetchall()[原始碼]

擷取查詢結果的所有(剩餘)資料列。

會傳回序列的序列(例如,元組清單)。

get_arraysize()[原始碼]

取得一次要擷取的資料列數。

另請參閱

fetchmany()

set_arraysize(arraysize)[原始碼]

設定一次要擷取的資料列數。

另請參閱

fetchmany()

setinputsizes(sizes)[原始碼]

預設不執行任何動作。

setoutputsize(size, column=None)[原始碼]

預設不執行任何動作。

airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[原始碼]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]

繼承自: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

使用 gcloud-aio 函式庫來檢索 Job 詳細資訊。

sync_hook_class[原始碼]
async get_job_instance(project_id, job_id, session)[原始碼]

依據工作 ID 和專案 ID 取得指定的工作資源。

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[原始碼]
async get_job_output(job_id, project_id=PROVIDE_PROJECT_ID)[原始碼]

非同步取得指定工作 ID 的 BigQuery 工作輸出。

async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[原始碼]

建立新工作並使用 gcloud-aio 取得 job_id。

async cancel_job(job_id, project_id, location)[原始碼]

取消 BigQuery 工作。

參數
  • job_id (str) – 要取消的工作 ID。

  • project_id (str | None) – 執行工作的 Google Cloud 專案。

  • location (str | None) – 執行工作的位置。

get_records(query_results, as_dict=False, selected_fields=None)[原始碼]

將來自 BigQuery 的回應轉換為記錄。

參數
  • query_results (dict[str, Any]) – 來自 SQL 查詢的結果

  • as_dict (bool) – 如果為 True,則傳回字典清單形式的結果,否則傳回清單清單形式的結果。

  • selected_fields (str | list[str] | None) –

value_check(sql, pass_value, records, tolerance=None)[原始碼]

將單一查詢結果列和容差與 pass_value 比對。

引發

AirflowException – 如果比對失敗

interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[原始碼]

檢查度量(SQL 運算式)的值是否在特定容差範圍內。

參數
  • row1 (str | None) – 第一個 SQL 查詢的查詢執行工作的第一個結果列

  • row2 (str | None) – 第二個 SQL 查詢的查詢執行工作的第一個結果列

  • metrics_thresholds (dict[str, Any]) – 度量索引的比率字典,例如 'COUNT(*)':1.5 會要求當天與 days_back 之前的日期間的差異在 50% 或更少。

  • ignore_zero (bool) – 是否應忽略零度量

  • ratio_formula (str) – 要使用哪個公式來計算兩個度量之間的比率。假設 cur 是今天的度量,而 ref 是 today - days_back 的度量。 max_over_min:計算 max(cur, ref) / min(cur, ref) relative_diff:計算 abs(cur-ref) / ref

class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]

繼承自: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

BigQuery Table 的非同步 hook。

sync_hook_class[原始碼]
async get_table_client(dataset, table_id, project_id, session)[原始碼]

取得 Google BigQuery Table 物件。

參數
  • dataset (str) – 要在其中尋找表格儲存貯體的資料集名稱。

  • table_id (str) – 要檢查是否存在的表格名稱。

  • project_id (str) – 要在其中查找表格的 Google Cloud 專案。提供給 hook 的連線必須提供對指定專案的存取權。

  • session (aiohttp.ClientSession) – aiohttp ClientSession

此條目是否有幫助?