airflow.providers.google.cloud.transfers.gcs_to_bigquery

此模組包含 Google Cloud Storage 至 BigQuery 的運算子。

模組內容

類別

GCSToBigQueryOperator

從 Google Cloud Storage 將檔案載入至 BigQuery。

屬性

ALLOWED_FORMATS

airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET'][原始碼]
class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[原始碼]

基底類別: airflow.models.BaseOperator

從 Google Cloud Storage 將檔案載入至 BigQuery。

用於 BigQuery 表格的綱要可以使用兩種方式之一指定。您可以直接傳入綱要欄位,或者您可以將運算子指向 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是包含綱要欄位的 JSON 檔案。

另請參閱

如需關於如何使用此運算子的更多資訊,請參閱指南: 運算子

參數
  • bucket – 要從中載入的儲存桶。(已套用樣板)

  • source_objects – 要從中載入的 Google Cloud Storage URI 字串或清單。(已套用樣板) 如果 source_format 為 'DATASTORE_BACKUP',則清單必須僅包含單一 URI。

  • destination_project_dataset_table – 點分隔的 (<project>.|<project>:)<dataset>.<table> BigQuery 表格,用於載入資料。如果未包含 <project>,則專案將為連線 JSON 中定義的專案。(已套用樣板)

  • schema_fields – 如果設定,綱要欄位清單定義於此: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load。當 source_format 為 'DATASTORE_BACKUP' 時,不應設定。如果 'schema_object' 為 null 且 autodetect 為 False,則必須定義此參數。

  • schema_object – 如果設定,指向包含表格綱要的 .json 檔案的 GCS 物件路徑。(已套用樣板) 如果 'schema_fields' 為 null 且 autodetect 為 False,則必須定義此參數。

  • schema_object_bucket – [選用] 如果設定,則為儲存綱要物件樣板的 GCS 儲存桶。(已套用樣板) (預設值: bucket 的值)

  • source_format – 要匯出的檔案格式。

  • compression – [選用] 資料來源的壓縮類型。可能的值包括 GZIP 和 NONE。預設值為 NONE。此設定會被 Google Cloud Bigtable、Google Cloud Datastore 備份和 Avro 格式忽略。

  • create_disposition – 如果表格不存在,則為建立處置方式。

  • skip_leading_rows – BigQuery 在載入資料時將跳過的 CSV 檔案頂端列數。當 autodetect 開啟時,行為如下: skip_leading_rows 未指定 - Autodetect 嘗試偵測第一列中的標頭。如果未偵測到標頭,則將該列讀取為資料。否則,資料會從第二列開始讀取。 skip_leading_rows 為 0 - 指示 autodetect 沒有標頭,且資料應從第一列開始讀取。 skip_leading_rows = N > 0 - Autodetect 跳過 N-1 列,並嘗試在第 N 列中偵測標頭。如果未偵測到標頭,則只會跳過第 N 列。否則,第 N 列會用於擷取偵測到的綱要的欄位名稱。預設值設定為 None,以便 autodetect 選項可以偵測綱要欄位。

  • write_disposition – 如果表格已存在,則為寫入處置方式。

  • field_delimiter – 從 CSV 載入時要使用的分隔符號。

  • max_bad_records – BigQuery 在執行作業時可以忽略的最大錯誤記錄數。

  • quote_character – 用於在 CSV 檔案中引用資料區段的值。

  • ignore_unknown_values – [選用] 指示 BigQuery 是否應允許表格綱要中未表示的額外值。如果為 true,則會忽略額外值。如果為 false,則具有額外欄位的記錄會被視為錯誤記錄,並且如果錯誤記錄過多,則會在作業結果中傳回無效錯誤。

  • allow_quoted_newlines – 是否允許引用換行符號 (true) 或不允許 (false)。

  • allow_jagged_rows – 接受缺少尾隨選用欄位的列。遺失的值會被視為 null。如果為 false,則缺少尾隨欄位的記錄會被視為錯誤記錄,並且如果錯誤記錄過多,則會在作業結果中傳回無效錯誤。僅適用於 CSV,其他格式會忽略。

  • encoding – 資料的字元編碼。請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding

  • max_id_key – 如果設定,則為要載入的 BigQuery 表格中的欄位名稱。這將用於在載入發生後從 BigQuery 選取 MAX 值。結果將由 execute() 命令傳回,而 execute() 命令又會儲存在 XCom 中,供未來的運算子使用。這對於增量載入很有幫助 - 在未來的執行期間,您可以從最大 ID 繼續。

  • schema_update_options – 允許作為載入作業的副作用更新目的地表格的綱要。

  • src_fmt_configs – 設定特定於來源格式的選用欄位

  • external_table – 旗標,用於指定目的地表格是否應為 BigQuery 外部表格。預設值為 False。

  • time_partitioning – 根據 API 規格設定選用的時間分割欄位,即依欄位、類型和到期日分割。請注意,「欄位」無法與 dataset.table$partition 同時使用。

  • cluster_fields – 要求以此載入的結果依一或多個欄位排序儲存。BigQuery 支援分割和非分割表格的叢集。給定欄位的順序決定排序順序。不適用於外部表格。

  • autodetect – [選用] 指示我們是否應自動推斷 CSV 和 JSON 來源的選項和綱要。(預設值: True)。如果 'schema_fields' 和 'schema_object' 未定義,則必須將此參數設定為 True。如果表格在 Airflow 外部建立,建議設定為 True。如果 autodetect 為 None 且未提供綱要 (無論是透過 schema_fields 還是 schema_object),則假設表格已存在。

  • encryption_configuration

    [選用] 自訂加密設定 (例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
    }
    

  • location – [選用] 作業的地理位置。美國和歐盟以外的地區為必要項目。請參閱 https://cloud.google.com/bigquery/docs/locations#specifying_your_location 以取得詳細資訊

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選用服務帳戶,用於使用短期憑證模擬身分,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須授予緊接在前的身分「服務帳戶權杖建立者」IAM 角色,清單中的第一個帳戶將此角色授予原始帳戶 (已套用樣板)。

  • labels – [選用] BiqQuery 表格的標籤。

  • description – [選用] BigQuery 表格的描述。這僅在目的地表格為新建立時使用。如果表格已存在且提供的值與目前描述不同,則作業將會失敗。

  • deferrable (bool) – 在可延遲模式下執行運算子

  • force_delete (bool) – 如果目的地表格已存在,則強制刪除。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[原始碼]
template_ext: collections.abc.Sequence[str] = ('.sql',)[原始碼]
ui_color = '#f0eee4'[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 樣板時使用的相同字典。

請參閱 get_template_context 以取得更多內容。

execute_complete(context, event)[原始碼]

立即傳回並依賴觸發器擲回成功事件。觸發器的回呼。

依賴觸發器擲回例外狀況,否則假設執行成功。

on_kill()[原始碼]

覆寫此方法以在任務實例被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈程序。

get_openlineage_facets_on_complete(task_instance)[原始碼]

實作 on_complete,因為我們將包含最終的 BQ 作業 ID。

此條目是否有幫助?