airflow.providers.google.cloud.hooks.bigquery
¶
BigQuery Hook 和 BigQuery 的非常基本的 PEP 249 實作。
模組內容¶
類別¶
與 BigQuery 互動。 |
|
BigQuery 連線。 |
|
BigQuery cursor。 |
|
一個非常基本的 BigQuery PEP 249 cursor 實作。 |
|
使用 gcloud-aio 函式庫來檢索 Job 詳細資訊。 |
|
BigQuery Table 的非同步 hook。 |
函數¶
|
屬性¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
,airflow.providers.common.sql.hooks.sql.DbApiHook
與 BigQuery 互動。
此 hook 使用 Google Cloud 連線。
- 參數
gcp_conn_id – 用於 GCP 憑證的 Airflow 連線。
use_legacy_sql (bool) – 這指定是否使用舊版 SQL 方言。
location (str | None) – BigQuery 資源的位置。
priority (str) – 指定查詢的優先順序。可能的值包括 INTERACTIVE 和 BATCH。預設值為 INTERACTIVE。
api_resource_configs (dict | None) – 這包含應用於 Google BigQuery jobs 的參數設定。
impersonation_chain – 這是可選的服務帳戶,用於使用短期憑證模擬身分。
impersonation_scopes (str | collections.abc.Sequence[str] | None) – 模擬帳戶的可選 scopes 列表。將覆蓋連線中的 scopes。
labels (dict | None) – BigQuery 資源標籤。
- property scopes: collections.abc.Sequence[str][source]¶
傳回 OAuth 2.0 scopes。
- 傳回
傳回在 impersonation_scopes、連線設定或預設 scope 中定義的 scope
- 傳回類型
- get_sqlalchemy_engine(engine_kwargs=None)[source]¶
建立 SQLAlchemy engine 物件。
- 參數
engine_kwargs (dict | None) – 用於
create_engine()
的 Kwargs。
- get_records(sql, parameters=None)[source]¶
執行 sql 並傳回一組記錄。
- 參數
sql – 要執行的 sql 語句 (str) 或要執行的 sql 語句列表
parameters – 用於呈現 SQL 查詢的參數。
- abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]¶
插入列。
目前不支援插入。理論上,您可以使用 BigQuery 的串流 API 將列插入到表格中,但尚未實作。
- get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]¶
取得 BigQuery 結果的 Pandas DataFrame。
DbApiHook 方法必須被覆寫,因為 Pandas 除了 SQLite 之外,不支援 PEP 249 連線。
另請參閱
https://github.com/pandas-dev/pandas/blob/055d008615272a1ceca9720dc365a2abd316f353/pandas/io/sql.py#L415 https://github.com/pandas-dev/pandas/issues/6900
- 參數
sql (str) – 要執行的 BigQuery SQL。
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用於呈現 SQL 查詢的參數 (未使用,保留以覆寫父類別方法)
dialect (str | None) – BigQuery SQL 的方言 – 舊版 SQL 或標準 SQL,如果未指定,則預設使用 self.use_legacy_sql
kwargs – (可選) 傳遞到 pandas_gbq.read_gbq 方法中
- table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]¶
檢查 Google BigQuery 中是否存在分割區。
- create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]¶
在 dataset 中建立新的空表格。
若要建立由 SQL 查詢定義的視圖,請將字典解析為 view 參數。
- 參數
project_id (str) – 要在其中建立表格的專案。
dataset_id (str | None) – 要在其中建立表格的 dataset。
table_id (str | None) – 要建立的表格名稱。
table_resource (dict[str, Any] | None) – 表格資源,如文件中所述: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,則忽略所有其他參數。
schema_fields (list | None) –
如果設定,則為架構欄位列表,如此處定義: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
labels (dict | None) – 包含表格標籤的字典,傳遞給 BigQuery
retry (google.api_core.retry.Retry) – 可選。如何重試 RPC。
time_partitioning (dict | None) –
設定可選的時間分割欄位,即根據 API 規範按欄位、類型和到期時間進行分割。
cluster_fields (list[str] | None) – [可選] 用於叢集化的欄位。BigQuery 支援分割和非分割表格的叢集化。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields
view (dict | None) –
[可選] 包含視圖定義的字典。如果設定,它將建立視圖而不是表格: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000", "useLegacySql": False, }
materialized_view (dict | None) – [可選] 具體化視圖定義。
encryption_configuration (dict | None) –
[可選] 自訂加密設定 (例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
num_retries – 連線問題時的最大重試次數。
location (str | None) – (可選) 表格應駐留的地理位置。
exists_ok (bool) – 如果為
True
,則在建立表格時忽略「已存在」錯誤。
- 傳回
建立的表格
- 傳回類型
- create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]¶
建立新的空 dataset。
- 參數
project_id (str) – 我們要在其中建立空 dataset 的專案名稱。如果 dataset_reference 中有 projectId,則不需要提供。
dataset_id (str | None) – dataset 的 id。如果 dataset_reference 中有 datasetId,則不需要提供。
location (str | None) – (可選) dataset 應駐留的地理位置。沒有預設值,但如果沒有提供任何內容,dataset 將在美國建立。
dataset_reference (dict[str, Any] | None) – 可以與請求正文一起提供的 Dataset 參考。更多資訊: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
exists_ok (bool) – 如果為
True
,則在建立 dataset 時忽略「已存在」錯誤。
- get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]¶
取得給定 dataset 的表格列表。
更多資訊,請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
- delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]¶
刪除您專案中的 BigQuery dataset。
- 參數
project_id (str) – 我們擁有 dataset 的專案名稱。
dataset_id (str) – 要刪除的 dataset。
delete_contents (bool) – 如果為 True,則刪除 dataset 中的所有表格。如果為 False 且 dataset 包含表格,則請求將失敗。
retry (google.api_core.retry.Retry) – 如何重試 RPC。
- update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
變更表格的某些欄位。
使用
fields
指定要更新的欄位。至少必須提供一個欄位。如果fields
中列出了某個欄位,且該欄位在table
中為None
,則該欄位值將被刪除。如果
table.etag
不是None
,則只有當伺服器上的表格具有相同的 ETag 時,更新才會成功。因此,使用get_table
讀取表格、變更其欄位,然後將其傳遞給update_table
,將確保只有在自上次讀取以來表格未發生修改時,才會儲存變更。- 參數
project_id (str) – 要在其中建立表格的專案。
dataset_id (str | None) – 要在其中建立表格的 dataset。
table_id (str | None) – 要建立的表格名稱。
table_resource (dict[str, Any]) – 表格資源,如文件所述: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表格必須包含
tableReference
或必須提供project_id
、dataset_id
和table_id
。fields (list[str] | None) – 要變更的
table
欄位,以表格屬性(例如 “friendly_name”)拼寫。
- insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]¶
將資料串流到 BigQuery,一次一筆記錄,無需載入工作。
- 參數
project_id (str) – 我們在其中建立表格的專案名稱
dataset_id (str) – 我們在其中建立表格的資料集名稱
table_id (str) – 表格名稱
rows (list) –
要插入的列
rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
ignore_unknown_values (bool) – [選用] 接受包含與結構描述不符的值的列。未知的數值將被忽略。預設值為 false,這會將未知的數值視為錯誤。
skip_invalid_rows (bool) – [選用] 插入請求中的所有有效列,即使存在無效列也一樣。預設值為 false,這會導致在存在任何無效列時,整個請求都會失敗。
fail_on_error (bool) – [選用] 如果發生任何錯誤,則強制任務失敗。預設值為 false,表示即使發生任何插入錯誤,任務也不應失敗。
- update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]¶
變更資料集的某些欄位。
使用
fields
指定要更新的欄位。至少必須提供一個欄位。如果fields
中列出了某個欄位,且該欄位在dataset
中為None
,則會將其刪除。如果
dataset.etag
不是None
,則只有當伺服器上的資料集具有相同的 ETag 時,更新才會成功。因此,使用get_dataset
讀取資料集、變更其欄位,然後將其傳遞給update_dataset
,將確保只有在自上次讀取以來資料集未發生修改時,才會儲存變更。- 參數
dataset_resource (dict[str, Any]) – 將在請求內文中提供的資料集資源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
dataset_id (str | None) – 資料集的 ID。
fields (collections.abc.Sequence[str]) – 要變更的
dataset
屬性(例如 “friendly_name”)。project_id (str) – Google Cloud 專案 ID
retry (google.api_core.retry.Retry) – 如何重試 RPC。
- get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
取得目前專案中的所有 BigQuery 資料集。
如需更多資訊,請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
- 參數
project_id (str) – 您嘗試取得所有資料集的 Google Cloud 專案
include_all (bool) – 如果結果包含隱藏的資料集,則為 True。預設為 False。
filter – 用於依標籤篩選結果的運算式。如需語法,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter。
filter – str
max_results (int | None) – 要傳回的資料集最大數量。
max_results – int
page_token (str | None) – 代表資料集游標的符記。如果未傳遞,API 將傳回第一頁資料集。此符記標記要傳回的迭代器開始位置,並且可以在
HTTPIterator
的next_page_token
存取page_token
的值。page_token – str
retry (google.api_core.retry.Retry) – 如何重試 RPC。
return_iterator (bool) – 不是傳回 list[Row],而是傳回 HTTPIterator,可用於取得 next_page_token 屬性。
- get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]¶
擷取 dataset_id 參照的資料集。
- 參數
dataset_id (str) – BigQuery 資料集 ID
project_id (str) – Google Cloud 專案 ID
- 傳回
dataset_resource
- 傳回類型
另請參閱
如需更多資訊,請參閱資料集資源內容: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
- run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]¶
授予資料集的授權檢視存取權給檢視表格。
如果已授予此檢視對資料集的存取權,則不執行任何動作。此方法不是原子性的。執行此方法可能會覆蓋同時發生的更新。
- 參數
source_dataset (str) – 來源資料集
view_dataset (str) – 檢視所在的資料集
view_table (str) – 檢視的表格
project_id (str) – 來源資料集的專案。如果為 None,將使用 self.project_id。
view_project (str | None) – 檢視所在的專案。如果為 None,將使用 self.project_id。
- 傳回
來源資料集的資料集資源。
- 傳回類型
dict[str, Any]
- run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]¶
如果表格存在則更新,否則建立新表格。
由於 BigQuery 原生不允許表格 upsert,因此這不是原子操作。
- 參數
dataset_id (str) – 要將表格 upsert 到其中的資料集。
table_resource (dict[str, Any]) – 表格資源。請參閱 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 要將表格 upsert 到其中的專案。如果為 None,專案將為 self.project_id。
- delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]¶
從資料集中刪除現有表格。
如果表格不存在,則傳回錯誤,除非 not_found_ok 設定為 True。
- 參數
table_id (str) – 一個虛線
(<project>.|<project>:)<dataset>.<table>
,指示要刪除的表格。not_found_ok (bool) – 如果為 True,即使要求的表格不存在,也傳回成功。
project_id (str) – 用於執行請求的專案
- list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
列出表格中的列。
請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
- 參數
dataset_id (str) – 要求的表格的資料集 ID。
table_id (str) – 要求的表格的表格 ID。
max_results (int | None) – 要傳回的最大結果數。
selected_fields (list[str] | str | None) – 要傳回的欄位清單(逗號分隔)。如果未指定,則傳回所有欄位。
page_token (str | None) – 頁面符記,從先前的呼叫傳回,用於識別結果集。
start_index (int | None) – 要讀取的起始列的從零開始的索引。
project_id (str) – Client 代表操作的專案的專案 ID。
location (str | None) – 工作的預設位置。
retry (google.api_core.retry.Retry) – 如何重試 RPC。
return_iterator (bool) – 不是傳回 list[Row],而是傳回 RowIterator,可用於取得 next_page_token 屬性。
- 傳回
列的清單
- 傳回類型
list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator
- get_schema(dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]¶
取得給定資料集和表格的結構描述。
- 參數
dataset_id (str) – 要求的表格的資料集 ID
table_id (str) – 要求的表格的表格 ID
project_id (str) – 要求的表格的選用專案 ID。如果未提供,將使用連接器設定的專案。
- 傳回
表格結構描述
- 傳回類型
- update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]¶
更新給定資料集和表格之結構描述中的欄位。
請注意,結構描述中的某些欄位是不可變更的;嘗試變更它們會導致例外狀況。
如果包含新的欄位,則會插入該欄位,這需要設定所有必要欄位。
- 參數
include_policy_tags (bool) – 如果設定為 True,原則標籤將包含在更新請求中,即使未變更,也需要特殊權限,請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 要更新的請求表格的資料集 ID
table_id (str) – 要更新的表格的表格 ID
schema_fields_updates (list[dict[str, Any]]) –
部分結構描述資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, {"name": "salary", "description": "Some New Description"}, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
project_id (str) – 我們要在其中更新表格的專案名稱。
- poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]¶
檢查工作是否已完成。
- 參數
job_id (str) – 工作的 ID。
project_id (str) – 執行工作的 Google Cloud 專案
location (str | None) – 工作執行的位置
retry (google.api_core.retry.Retry) – 如何重試 RPC。
- cancel_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]¶
取消工作並等待取消完成。
- 參數
job_id (str) – 工作的 ID。
project_id (str) – 執行工作的 Google Cloud 專案
location (str | None) – 工作執行的位置
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]¶
擷取 BigQuery 工作。
- 參數
job_id (str) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。
project_id (str) – 執行工作的 Google Cloud 專案。
location (str | None) – 工作執行的位置。
- insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]¶
執行 BigQuery 工作並等待其完成。
- 參數
configuration (dict) – configuration 參數直接對應至工作物件中的 BigQuery configuration 欄位。請參閱 https://cloud.google.com/bigquery/docs/reference/v2/jobs 以取得詳細資訊。
job_id (str | None) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。如果未提供,則會產生 uuid。
project_id (str) – 執行工作的 Google Cloud 專案。
location (str | None) – 工作執行的位置。
nowait (bool) – 是否插入工作而不等待結果。
retry (google.api_core.retry.Retry) – 如何重試 RPC。
timeout (float | None) – 等待底層 HTTP 傳輸的秒數,然後再使用
retry
。
- 傳回
工作 ID。
- 傳回類型
BigQueryJob
- get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]¶
取得指定 job_id 的查詢結果。
- 參數
job_id (str) – 工作的 ID。ID 必須僅包含字母 (a-z、A-Z)、數字 (0-9)、底線 (_) 或破折號 (-)。最大長度為 1,024 個字元。
location (str) – 用於此操作的位置。
selected_fields (list[str] | str | None) – 要傳回的欄位清單(逗號分隔)。如果未指定,則傳回所有欄位。
max_results (int | None) – 要從表格擷取的最大記錄(列)數。
project_id (str) – 工作執行的 Google Cloud 專案。
retry (google.api_core.retry.Retry) – 如何重試 RPC。
job_retry (google.api_core.retry.Retry) – 如何重試失敗的工作。
- 傳回
列出在選定欄位(如果有的話)篩選的資料列。
- 引發
AirflowException
- 傳回類型
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[原始碼]¶
BigQuery 連線。
BigQuery 沒有持續連線的概念。因此,這些物件是游標的小型無狀態工廠,游標會完成所有實際工作。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[原始碼]¶
繼承自:
airflow.utils.log.logging_mixin.LoggingMixin
BigQuery cursor。
BigQuery 基礎游標包含對 BigQuery 執行查詢的輔助方法。在不需要 PEP 249 游標的情況下,運算子可以直接使用這些方法。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[原始碼]¶
繼承自:
BigQueryBaseCursor
一個非常基本的 BigQuery PEP 249 cursor 實作。
PyHive PEP 249 實作被用作參考
https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
- airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[原始碼]¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]¶
繼承自:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
使用 gcloud-aio 函式庫來檢索 Job 詳細資訊。
- async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[原始碼]¶
建立新工作並使用 gcloud-aio 取得 job_id。
- value_check(sql, pass_value, records, tolerance=None)[原始碼]¶
將單一查詢結果列和容差與 pass_value 比對。
- 引發
AirflowException – 如果比對失敗
- interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[原始碼]¶
檢查度量(SQL 運算式)的值是否在特定容差範圍內。
- 參數
row1 (str | None) – 第一個 SQL 查詢的查詢執行工作的第一個結果列
row2 (str | None) – 第二個 SQL 查詢的查詢執行工作的第一個結果列
metrics_thresholds (dict[str, Any]) – 度量索引的比率字典,例如 'COUNT(*)':1.5 會要求當天與 days_back 之前的日期間的差異在 50% 或更少。
ignore_zero (bool) – 是否應忽略零度量
ratio_formula (str) – 要使用哪個公式來計算兩個度量之間的比率。假設 cur 是今天的度量,而 ref 是 today - days_back 的度量。 max_over_min:計算 max(cur, ref) / min(cur, ref) relative_diff:計算 abs(cur-ref) / ref
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]¶
繼承自:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
BigQuery Table 的非同步 hook。