airflow.providers.apache.spark.hooks.spark_jdbc

模組內容

類別

SparkJDBCHook

擴展 SparkSubmitHook,用於使用 Apache Spark 執行與基於 JDBC 的資料庫之間的資料傳輸。

class airflow.providers.apache.spark.hooks.spark_jdbc.SparkJDBCHook(spark_app_name='airflow-spark-jdbc', spark_conn_id=default_conn_name, spark_conf=None, spark_py_files=None, spark_files=None, spark_jars=None, num_executors=None, executor_cores=None, executor_memory=None, driver_memory=None, verbose=False, principal=None, keytab=None, cmd_type='spark_to_jdbc', jdbc_table=None, jdbc_conn_id='jdbc-default', jdbc_driver=None, metastore_table=None, jdbc_truncate=False, save_mode=None, save_format=None, batch_size=None, fetch_size=None, num_partitions=None, partition_column=None, lower_bound=None, upper_bound=None, create_table_column_types=None, *args, use_krb5ccache=False, **kwargs)[原始碼]

基底類別: airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook

擴展 SparkSubmitHook,用於使用 Apache Spark 執行與基於 JDBC 的資料庫之間的資料傳輸。

參數
  • spark_app_name (str) – 任務名稱 (預設 airflow-spark-jdbc)

  • spark_conn_id (str) – 在 Airflow 管理介面中設定的 spark 連線 ID

  • spark_conf (dict[str, Any] | None) – 任何額外的 Spark 組態屬性

  • spark_py_files (str | None) – 使用的額外 Python 檔案 (.zip、.egg 或 .py)

  • spark_files (str | None) – 要上傳到執行任務的容器的其他檔案

  • spark_jars (str | None) – 要上傳並新增至 driver 和 executor classpath 的其他 jar 檔

  • num_executors (int | None) – 要執行的 executor 數量。應設定此值以管理與 JDBC 資料庫建立的連線數

  • executor_cores (int | None) – 每個 executor 的核心數

  • executor_memory (str | None) – 每個 executor 的記憶體 (例如 1000M、2G)

  • driver_memory (str | None) – 分配給 driver 的記憶體 (例如 1000M、2G)

  • verbose (bool) – 是否將 verbose 標誌傳遞給 spark-submit 以進行偵錯

  • keytab (str | None) – 包含 keytab 的檔案完整路徑

  • principal (str | None) – 用於 keytab 的 kerberos principal 名稱

  • cmd_type (str) – 資料應如何流動。2 個可能的值: spark_to_jdbc:spark 從 metastore 寫入資料到 jdbc jdbc_to_spark:spark 從 jdbc 寫入資料到 metastore

  • jdbc_table (str | None) – JDBC 表格的名稱

  • jdbc_conn_id (str) – 用於連線到 JDBC 資料庫的連線 ID

  • jdbc_driver (str | None) – 用於 JDBC 連線的 JDBC 驅動程式名稱。此驅動程式 (通常是 jar 檔) 應在 'jars' 參數中傳遞

  • metastore_table (str | None) – metastore 表格的名稱

  • jdbc_truncate (bool) – (僅限 spark_to_jdbc) Spark 是否應截斷或刪除並重新建立 JDBC 表格。僅當 'save_mode' 設定為 Overwrite 時,此設定才會生效。此外,如果綱要不同,Spark 無法截斷,並且將刪除並重新建立

  • save_mode (str | None) – 要使用的 Spark 儲存模式 (例如 overwrite、append 等)

  • save_format (str | None) – (僅限 jdbc_to_spark) 要使用的 Spark 儲存格式 (例如 parquet)

  • batch_size (int | None) – (僅限 spark_to_jdbc) 每次往返 JDBC 資料庫要插入的批次大小。預設為 1000

  • fetch_size (int | None) – (僅限 jdbc_to_spark) 每次往返 JDBC 資料庫要提取的批次大小。預設值取決於 JDBC 驅動程式

  • num_partitions (int | None) – Spark 可以同時使用的最大分割區數量,適用於 spark_to_jdbc 和 jdbc_to_spark 作業。這也會限制可以開啟的 JDBC 連線數量

  • partition_column (str | None) – (僅限 jdbc_to_spark) 用於分割 metastore 表格的數字欄位。如果指定,您還必須指定:num_partitions、lower_bound、upper_bound

  • lower_bound (str | None) – (僅限 jdbc_to_spark) 要提取的數字分割欄位範圍的下限。如果指定,您還必須指定:num_partitions、partition_column、upper_bound

  • upper_bound (str | None) – (僅限 jdbc_to_spark) 要提取的數字分割欄位範圍的上限。如果指定,您還必須指定:num_partitions、partition_column、lower_bound

  • create_table_column_types (str | None) – (僅限 spark_to_jdbc) 建立表格時,要使用的資料庫欄位資料類型,而不是預設類型。資料類型資訊應以與 CREATE TABLE 欄位語法相同的格式指定 (例如:“name CHAR(64), comments VARCHAR(1024)”)。指定的類型應為有效的 spark sql 資料類型。

  • use_krb5ccache (bool) – 如果為 True,則設定 spark 使用票證快取,而不是依賴 keytab 進行 Kerberos 登入

conn_name_attr = 'spark_conn_id'[原始碼]
default_conn_name = 'spark_default'[原始碼]
conn_type = 'spark_jdbc'[原始碼]
hook_name = 'Spark JDBC'[原始碼]
submit_jdbc_job()[原始碼]

提交 Spark JDBC 任務。

get_conn()[原始碼]

傳回 hook 的連線。

此條目是否有幫助?