SnowparkOperator¶
使用 SnowparkOperator
在 Snowpark Python 程式碼於 Snowflake 資料庫中執行。
警告
Snowpark 尚不支援 Python 3.12。
目前,此運算子不支援 Snowpark pandas API,因為 Airflow 中使用了衝突的 pandas 版本。請考慮將 Snowpark pandas API 與其他 Snowpark 裝飾器或運算子一起使用。
提示
建議使用 @task.snowpark 裝飾器來取代 SnowparkOperator
執行 Snowpark Python 程式碼。
先決條件任務¶
要使用此運算子,您必須完成以下幾件事
透過 pip 安裝 provider 套件。
pip install 'apache-airflow-providers-snowflake'詳細資訊請參閱 安裝。
使用運算子¶
使用 snowflake_conn_id
參數來指定使用的連線。如果未指定,將使用 snowflake_default
。
以下是如何使用 @task.snowpark
的範例
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
setup_data_operator = SnowparkOperator(
task_id="setup_data",
python_callable=setup_data,
dag=dag,
)
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows_operator = SnowparkOperator(
task_id="check_num_rows",
python_callable=check_num_rows,
op_kwargs={"table_name": "{{ task_instance.xcom_pull(task_ids='setup_data') }}"},
dag=dag,
)
setup_data_operator >> check_num_rows_operator
如範例所示,有兩種方法可以在您的 Python 函數中使用 Snowpark session 物件
將 Snowpark session 物件作為名為
session
的關鍵字引數傳遞給函數。Snowpark session 將自動注入到函數中,讓您可以像平常一樣使用它。使用 Snowpark 的 get_active_session 函數,在函數內部檢索 Snowpark session 物件。
注意
可以傳遞給運算子的參數將優先於 Airflow 連線 metadata 中已給定的參數(例如 schema
、role
、database
等等)。