@task.snowpark
¶
使用 @task.snowpark
在 Snowpark Python 程式碼中,於 Snowflake 資料庫中執行。
警告
Snowpark 尚不支援 Python 3.12。
目前,此裝飾器不支援 Snowpark pandas API,因為 Airflow 中使用了衝突的 pandas 版本。請考慮將 Snowpark pandas API 與其他 Snowpark 裝飾器或運算子一起使用。
先決條件任務¶
要使用此裝飾器,您必須執行以下幾項操作
透過 pip 安裝供應商套件。
pip install 'apache-airflow-providers-snowflake'詳細資訊請參閱 安裝。
使用運算子¶
使用 snowflake_conn_id
參數指定使用的連線。如果未指定,將使用 snowflake_default
。
@task.snowpark
的範例用法如下
@task.snowpark
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
table_name = setup_data() # type: ignore[call-arg, misc]
@task.snowpark
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows(table_name)
如範例所示,有兩種方法可以在您的 Python 函數中使用 Snowpark session 物件
將 Snowpark session 物件作為名為
session
的關鍵字引數傳遞給函數。Snowpark session 將自動注入到函數中,讓您可以像平常一樣使用它。使用 Snowpark 中的 get_active_session 函數,在函數內部檢索 Snowpark session 物件。
注意
可以傳遞到裝飾器的參數,其優先順序高於 Airflow 連線元資料中已給定的參數(例如 schema
、role
、database
等)。