TeradataOperator¶
TeradataOperator 的目的是定義涉及與 Teradata 互動的任務。
若要在 Teradata 中執行任意 SQL,請使用 TeradataOperator
。
使用 TeradataOperator 的常見資料庫操作¶
建立 Teradata 資料庫表格¶
以下是 TeradataOperator 的使用範例
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
您也可以使用外部檔案來執行 SQL 命令。外部檔案必須與 DAG.py 檔案位於同一層級。這樣您就可以輕鬆地將 SQL 查詢與程式碼分開維護。
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
您的 dags/create_table.sql
應該看起來像這樣
-- create Users table
CREATE TABLE Users, FALLBACK (
username varchar(50),
description varchar(256)
);
將資料插入 Teradata 資料庫表格¶
然後我們可以建立一個 TeradataOperator 任務來填充 Users
表格。
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
從您的 Teradata 資料庫表格中提取記錄¶
從您的 Teradata 資料庫表格中提取記錄可以很簡單,如下所示
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
將參數傳遞到 TeradataOperator¶
TeradataOperator 提供 parameters
屬性,使您可以在執行期間將值動態注入到 SQL 請求中。
尋找亞洲大陸的國家
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
刪除 Teradata 資料庫表格¶
然後我們可以建立一個 TeradataOperator 任務來刪除 Users
表格。
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
完整的 Teradata Operator DAG¶
當我們將所有內容放在一起時,我們的 DAG 應該看起來像這樣
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"DROP TABLE Country;",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"DROP TABLE schema_table;",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"DROP DATABASE airflow_temp;",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)
TeradataStoredProcedureOperator¶
TeradataStoredProcedureOperator 的目的是定義涉及執行 Teradata 預存程序的任務。
在 Teradata 資料庫中執行預存程序¶
若要在 Teradata 中執行預存程序,請使用 TeradataStoredProcedureOperator
。
假設資料庫中存在一個預存程序,如下所示
REPLACE PROCEDURE TEST_PROCEDURE ( IN val_in INTEGER, INOUT val_in_out INTEGER, OUT val_out INTEGER, OUT value_str_out varchar(100) ) BEGIN set val_out = val_in * 2; set val_in_out = val_in_out * 4; set value_str_out = 'string output'; END; /
此預存程序接受一個整數引數 val_in 作為輸入。它使用單個 inout 引數 val_in_out,該引數既作為輸入又作為輸出。此外,它傳回一個整數引數 val_out 和一個字串引數 value_str_out。
可以使用各種方法,透過 TeradataStoredProcedureOperator
呼叫此預存程序。
其中一種方法是以列表形式按位置傳遞參數,並將輸出參數指定為 Python 資料類型
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
或者,可以列表形式按位置傳遞參數,並將輸出參數指定為預留位置
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
另一種方法是以字典形式按位置傳遞參數
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
假設資料庫中存在一個預存程序,如下所示
REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN -- Assign current timestamp to the OUT parameter SET out_timestamp = CURRENT_TIMESTAMP; END; /
此預存程序產生單個時間戳記引數 out_timestamp,並且可以透過 TeradataStoredProcedureOperator
呼叫,並以列表形式按位置傳遞參數
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
假設資料庫中存在一個預存程序,如下所示
REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) BEGIN DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; open cur1 ; open cur2 ; set val_out = val_in * 2; END; /
此預存程序接受單個整數引數 val_in 作為輸入,並產生單個整數引數 val_out。此外,它產生兩個游標,表示 select 查詢的輸出。可以使用各種方法,透過 TeradataStoredProcedureOperator
呼叫此預存程序,並以列表形式按位置傳遞參數
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
完整的 TeradataStoredProcedureOperator DAG¶
當我們將所有內容放在一起時,我們的 DAG 應該看起來像這樣
CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"
with DAG(
dag_id=DAG_ID,
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
create_sp_in_inout = TeradataOperator(
task_id="create_sp_in_inout",
sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
IN val_in INTEGER,
INOUT val_in_out INTEGER,
OUT val_out INTEGER,
OUT value_str_out varchar(100)
)
BEGIN
set val_out = val_in * 2;
set val_in_out = val_in_out * 4;
set value_str_out = 'string output';
END;
""",
)
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
create_sp_timestamp = TeradataOperator(
task_id="create_sp_timestamp",
sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
BEGIN
-- Assign current timestamp to the OUT parameter
SET out_timestamp = CURRENT_TIMESTAMP;
END;
""",
)
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
opr_sp_param_dr = TeradataStoredProcedureOperator(
task_id="opr_sp_param_dr",
procedure="examplestoredproc",
parameters=[3, 2, int],
)
drop_sp = TeradataOperator(
task_id="drop_sp",
sql=r"drop procedure examplestoredproc;",
)
drop_sp_test = TeradataOperator(
task_id="drop_sp_test",
sql=r"drop procedure TEST_PROCEDURE;",
)
drop_sp_timestamp = TeradataOperator(
task_id="drop_sp_timestamp",
sql=r"drop procedure GetTimestampOutParameter;",
)
(
create_sp_in_inout
>> opr_sp_types
>> opr_sp_dict
>> opr_sp_place_holder
>> create_sp_param_dr
>> opr_sp_param_dr
>> drop_sp
>> drop_sp_test
>> create_sp_timestamp
>> opr_sp_timestamp
>> drop_sp_timestamp
)