Presto to Google Cloud Storage Transfer Operator (Presto 至 Google Cloud Storage 傳輸運算子)

Presto 是一個開放原始碼的分散式 SQL 查詢引擎,用於對各種大小的資料來源執行互動式分析查詢,範圍從 GB 級到 PB 級。Presto 允許查詢資料所在的任何位置,包括 Hive、Cassandra、關聯式資料庫,甚至是專有的資料儲存區。單個 Presto 查詢可以合併來自多個來源的資料,從而在整個組織中進行分析。

Google Cloud Storage 允許在全球範圍內隨時隨地儲存和檢索任何數量的資料。您可以使用它來儲存備份和封存資料,以及作為 BigQuery 的資料來源

資料傳輸

Presto 和 Google Storage 之間的檔案傳輸是透過 PrestoToGCSOperator 運算子執行。

此運算子有 3 個必要參數

  • sql - 要執行的 SQL。

  • bucket - 要上傳到的儲存桶。

  • filename - 上傳到 Google Cloud Storage 時用作物件名稱的檔案名稱。檔案名稱中應指定 {},以便在檔案因大小而分割時,運算子可以注入檔案編號。

所有參數都在參考文件中描述 - PrestoToGCSOperator

一個範例運算子呼叫可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[原始碼]

presto_to_gcs_basic = PrestoToGCSOperator(
    task_id="presto_to_gcs_basic",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)

資料格式的選擇

此運算子支援兩種輸出格式

  • json - JSON Lines (預設)

  • csv

您可以使用 export_format 參數指定這些選項。

如果您想要建立 CSV 檔案,您的運算子呼叫可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[原始碼]

presto_to_gcs_csv = PrestoToGCSOperator(
    task_id="presto_to_gcs_csv",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    export_format="csv",
)

產生 BigQuery 結構描述

如果您設定 schema_filename 參數,則會從資料庫中傾印出一個包含表格的 BigQuery 結構描述欄位的 .json 檔案,並上傳到儲存桶。

如果您想要建立結構描述檔案,則範例運算子呼叫可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[原始碼]

presto_to_gcs_multiple_types = PrestoToGCSOperator(
    task_id="presto_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    gzip=False,
)

有關 BigQuery 結構描述的更多資訊,請參閱 BigQuery 文件中的 指定結構描述

將結果分割成多個檔案

此運算子支援將大型結果分割成多個檔案的功能。approx_max_file_size_bytes 參數允許開發人員指定分割檔案的大小。預設情況下,檔案不超過 1,900,000,000 位元組 (1900 MB)

請查看 Google Cloud Storage 中的配額與限制,以查看單個物件允許的最大檔案大小。

如果您想要建立 10 MB 的檔案,您的程式碼可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[原始碼]

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

使用 BigQuery 查詢資料

Google Cloud Storage 中可用的資料可以被 BigQuery 使用。您可以將資料載入到 BigQuery,或在查詢中直接引用 GCS 資料。有關將資料載入到 BigQuery 的資訊,請參閱 BigQuery 文件中的 Cloud Storage 資料載入簡介。有關查詢 GCS 資料的資訊,請參閱 BigQuery 文件中的 查詢 Cloud Storage 資料

Airflow 也有許多運算子允許您建立 BigQuery 的使用。例如,如果您想要建立一個外部表格,讓您可以建立直接從 GCS 讀取資料的查詢,則可以使用 BigQueryCreateExternalTableOperator。使用此運算子的方式如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[原始碼]

create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
    task_id="create_external_table_multiple_types",
    bucket=GCS_BUCKET,
    source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
    table_resource={
        "tableReference": {
            "projectId": GCP_PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
        },
    },
    schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)

有關 Airflow 和 BigQuery 整合的更多資訊,請參閱 Python API 參考文件 - bigquery

參考文獻

如需更多資訊,請參閱

此條目是否有幫助?