PySpark 裝飾器¶
以 @task.pyspark
裝飾器包裝的 Python 可呼叫物件,若可用,會注入 SparkSession 和 SparkContext 物件。
參數¶
以下參數可以傳遞給裝飾器
- conn_id: str
用於連線到 Spark 叢集的連線 ID。如果未指定,Spark Master 將設定為
local[*]
。- config_kwargs: dict
用於初始化 SparkConf 物件的 kwargs。這會覆寫連線中設定的 Spark 組態選項。
範例¶
以下範例展示如何使用 @task.pyspark
裝飾器。請注意,spark
和 sc
物件會被注入到函數中。
@task.pyspark(conn_id="spark-local")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
df = spark.createDataFrame(
[
(1, "John Doe", 21),
(2, "Jane Doe", 22),
(3, "Joe Bloggs", 23),
],
["id", "name", "age"],
)
df.show()
return df.toPandas()
Spark Connect¶
在 Apache Spark 3.4 中,Spark Connect 引入了解耦的客戶端-伺服器架構,允許使用 DataFrame API 遠端連線到 Spark 叢集。在 Airflow 中,使用 Spark Connect 是使用 PySpark 裝飾器的首選方式,因為它不需要在與 Airflow 相同的主機上執行 Spark 驅動程式。若要使用 Spark Connect,請在您的主機 URL 前面加上 sc://
。例如,sc://spark-cluster:15002
。
驗證¶
Spark Connect 沒有內建驗證。但是,gRPC HTTP/2 介面允許使用驗證,透過驗證 Proxy 與 Spark Connect 伺服器通訊。若要使用驗證,請務必建立 Spark Connect
連線並設定正確的憑證。