Apache Spark 運算子

先決條件

SparkJDBCOperator

在 Apache Spark 伺服器上啟動應用程式,它使用 SparkSubmitOperator 來執行與基於 JDBC 的資料庫之間的資料傳輸。

有關參數定義,請參閱 SparkJDBCOperator

使用運算子

使用 cmd_type 參數,可以將資料從 Spark 傳輸到資料庫 (spark_to_jdbc) 或從資料庫傳輸到 Spark (jdbc_to_spark),這將使用 Spark 命令 saveAsTable 寫入表格。

tests/system/apache/spark/example_spark_dag.py[原始碼]

jdbc_to_spark_job = SparkJDBCOperator(
    cmd_type="jdbc_to_spark",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="overwrite",
    save_format="JSON",
    task_id="jdbc_to_spark_job",
)

spark_to_jdbc_job = SparkJDBCOperator(
    cmd_type="spark_to_jdbc",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="append",
    task_id="spark_to_jdbc_job",
)

參考

如需更多資訊,請參閱 Apache Spark DataFrameWriter 文件

SparkSqlOperator

在 Apache Spark 伺服器上啟動應用程式,它要求 spark-sql 腳本位於 PATH 中。 運算子將在 Spark Hive Metastore 服務上執行 SQL 查詢,sql 參數可以被模板化,並且可以是 .sql.hql 檔案。

有關參數定義,請參閱 SparkSqlOperator

使用運算子

tests/system/apache/spark/example_spark_dag.py[原始碼]

spark_sql_job = SparkSqlOperator(
    sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)

參考

如需更多資訊,請參閱 執行 Spark SQL CLI

SparkSubmitOperator

在 Apache Spark 伺服器上啟動應用程式,它使用 spark-submit 腳本,該腳本負責設定包含 Spark 及其相依性的類別路徑,並且可以支援 Spark 支援的不同叢集管理器和部署模式。

有關參數定義,請參閱 SparkSubmitOperator

使用運算子

tests/system/apache/spark/example_spark_dag.py[原始碼]

submit_job = SparkSubmitOperator(
    application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)

參考

如需更多資訊,請參閱 Apache Spark 提交應用程式

這個條目是否有幫助?