TeradataOperator

TeradataOperator 的目的是定義涉及與 Teradata 互動的任務。

若要在 Teradata 中執行任意 SQL,請使用 TeradataOperator

使用 TeradataOperator 的常見資料庫操作

建立 Teradata 資料庫表格

以下是 TeradataOperator 的使用範例

tests/system/teradata/example_teradata.py[原始碼]

create_table = TeradataOperator(
    task_id="create_table",
    sql=r"""
    CREATE TABLE Country (
        country_id INTEGER,
        name CHAR(25),
        continent CHAR(25)
    );
    """,
)

您也可以使用外部檔案來執行 SQL 命令。外部檔案必須與 DAG.py 檔案位於同一層級。這樣您就可以輕鬆地將 SQL 查詢與程式碼分開維護。

tests/system/teradata/example_teradata.py[原始碼]

    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )

您的 dags/create_table.sql 應該看起來像這樣

  -- create Users table
  CREATE TABLE Users, FALLBACK (
    username   varchar(50),
    description           varchar(256)
);

將資料插入 Teradata 資料庫表格

然後我們可以建立一個 TeradataOperator 任務來填充 Users 表格。

tests/system/teradata/example_teradata.py[原始碼]

    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )

從您的 Teradata 資料庫表格中提取記錄

從您的 Teradata 資料庫表格中提取記錄可以很簡單,如下所示

tests/system/teradata/example_teradata.py[原始碼]

    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )

將參數傳遞到 TeradataOperator

TeradataOperator 提供 parameters 屬性,使您可以在執行期間將值動態注入到 SQL 請求中。

尋找亞洲大陸的國家

tests/system/teradata/example_teradata.py[原始碼]

    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )

刪除 Teradata 資料庫表格

然後我們可以建立一個 TeradataOperator 任務來刪除 Users 表格。

tests/system/teradata/example_teradata.py[原始碼]

    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )

完整的 Teradata Operator DAG

當我們將所有內容放在一起時,我們的 DAG 應該看起來像這樣

tests/system/teradata/example_teradata.py[原始碼]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"DROP TABLE Country;",
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"DROP TABLE schema_table;",
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"DROP DATABASE airflow_temp;",
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

TeradataStoredProcedureOperator

TeradataStoredProcedureOperator 的目的是定義涉及執行 Teradata 預存程序的任務。

在 Teradata 資料庫中執行預存程序

若要在 Teradata 中執行預存程序,請使用 TeradataStoredProcedureOperator

假設資料庫中存在一個預存程序,如下所示

REPLACE PROCEDURE TEST_PROCEDURE (
    IN val_in INTEGER,
    INOUT val_in_out INTEGER,
    OUT val_out INTEGER,
    OUT value_str_out varchar(100)
)
    BEGIN
        set val_out = val_in * 2;
        set val_in_out = val_in_out * 4;
        set value_str_out = 'string output';
    END;
/

此預存程序接受一個整數引數 val_in 作為輸入。它使用單個 inout 引數 val_in_out,該引數既作為輸入又作為輸出。此外,它傳回一個整數引數 val_out 和一個字串引數 value_str_out。

可以使用各種方法,透過 TeradataStoredProcedureOperator 呼叫此預存程序。

其中一種方法是以列表形式按位置傳遞參數,並將輸出參數指定為 Python 資料類型

tests/system/teradata/example_teradata_call_sp.py[原始碼]

    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )

或者,可以列表形式按位置傳遞參數,並將輸出參數指定為預留位置

tests/system/teradata/example_teradata_call_sp.py[原始碼]

    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )

另一種方法是以字典形式按位置傳遞參數

tests/system/teradata/example_teradata_call_sp.py[原始碼]

    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )

假設資料庫中存在一個預存程序,如下所示

REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
   BEGIN
       -- Assign current timestamp to the OUT parameter
       SET out_timestamp = CURRENT_TIMESTAMP;
   END;
 /

此預存程序產生單個時間戳記引數 out_timestamp,並且可以透過 TeradataStoredProcedureOperator 呼叫,並以列表形式按位置傳遞參數

tests/system/teradata/example_teradata_call_sp.py[原始碼]

    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )

假設資料庫中存在一個預存程序,如下所示

REPLACE PROCEDURE
TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER)
  BEGIN
    DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ;
    DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ;
    open cur1 ;
    open cur2 ;
    set val_out = val_in * 2;
  END;
/

此預存程序接受單個整數引數 val_in 作為輸入,並產生單個整數引數 val_out。此外,它產生兩個游標,表示 select 查詢的輸出。可以使用各種方法,透過 TeradataStoredProcedureOperator 呼叫此預存程序,並以列表形式按位置傳遞參數

tests/system/teradata/example_teradata_call_sp.py[原始碼]

    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )

完整的 TeradataStoredProcedureOperator DAG

當我們將所有內容放在一起時,我們的 DAG 應該看起來像這樣

tests/system/teradata/example_teradata_call_sp.py[原始碼]

CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"

with DAG(
    dag_id=DAG_ID,
    max_active_runs=1,
    max_active_tasks=3,
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
    schedule="@once",
    start_date=datetime(2023, 1, 1),
) as dag:
    create_sp_in_inout = TeradataOperator(
        task_id="create_sp_in_inout",
        sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
                    IN val_in INTEGER,
                    INOUT val_in_out INTEGER,
                    OUT val_out INTEGER,
                    OUT value_str_out varchar(100)
                )
                BEGIN
                    set val_out = val_in * 2;
                    set val_in_out = val_in_out * 4;
                    set value_str_out = 'string output';
                END;
            """,
    )
    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )
    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )
    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )
    create_sp_timestamp = TeradataOperator(
        task_id="create_sp_timestamp",
        sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
                    BEGIN
                        -- Assign current timestamp to the OUT parameter
                        SET out_timestamp = CURRENT_TIMESTAMP;
                    END;
                 """,
    )
    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )
    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )
    opr_sp_param_dr = TeradataStoredProcedureOperator(
        task_id="opr_sp_param_dr",
        procedure="examplestoredproc",
        parameters=[3, 2, int],
    )
    drop_sp = TeradataOperator(
        task_id="drop_sp",
        sql=r"drop procedure examplestoredproc;",
    )
    drop_sp_test = TeradataOperator(
        task_id="drop_sp_test",
        sql=r"drop procedure TEST_PROCEDURE;",
    )
    drop_sp_timestamp = TeradataOperator(
        task_id="drop_sp_timestamp",
        sql=r"drop procedure GetTimestampOutParameter;",
    )
    (
        create_sp_in_inout
        >> opr_sp_types
        >> opr_sp_dict
        >> opr_sp_place_holder
        >> create_sp_param_dr
        >> opr_sp_param_dr
        >> drop_sp
        >> drop_sp_test
        >> create_sp_timestamp
        >> opr_sp_timestamp
        >> drop_sp_timestamp
    )

此條目是否有幫助?