Apache Spark 運算子¶
先決條件¶
要使用
SparkSubmitOperator
,您必須設定 Spark 連線。要使用
SparkJDBCOperator
,您必須同時設定 Spark 連線 和 JDBC 連線。SparkSqlOperator
從運算子參數取得所有配置。
SparkJDBCOperator¶
在 Apache Spark 伺服器上啟動應用程式,它使用 SparkSubmitOperator
來執行與基於 JDBC 的資料庫之間的資料傳輸。
有關參數定義,請參閱 SparkJDBCOperator
。
使用運算子¶
使用 cmd_type
參數,可以將資料從 Spark 傳輸到資料庫 (spark_to_jdbc
) 或從資料庫傳輸到 Spark (jdbc_to_spark
),這將使用 Spark 命令 saveAsTable
寫入表格。
jdbc_to_spark_job = SparkJDBCOperator(
cmd_type="jdbc_to_spark",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="overwrite",
save_format="JSON",
task_id="jdbc_to_spark_job",
)
spark_to_jdbc_job = SparkJDBCOperator(
cmd_type="spark_to_jdbc",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="spark_to_jdbc_job",
)
參考¶
如需更多資訊,請參閱 Apache Spark DataFrameWriter 文件。
SparkSqlOperator¶
在 Apache Spark 伺服器上啟動應用程式,它要求 spark-sql
腳本位於 PATH 中。 運算子將在 Spark Hive Metastore 服務上執行 SQL 查詢,sql
參數可以被模板化,並且可以是 .sql
或 .hql
檔案。
有關參數定義,請參閱 SparkSqlOperator
。
使用運算子¶
spark_sql_job = SparkSqlOperator(
sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)
參考¶
如需更多資訊,請參閱 執行 Spark SQL CLI。
SparkSubmitOperator¶
在 Apache Spark 伺服器上啟動應用程式,它使用 spark-submit
腳本,該腳本負責設定包含 Spark 及其相依性的類別路徑,並且可以支援 Spark 支援的不同叢集管理器和部署模式。
有關參數定義,請參閱 SparkSubmitOperator
。
使用運算子¶
submit_job = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)
參考¶
如需更多資訊,請參閱 Apache Spark 提交應用程式。