使用 SQLExecuteQueryOperator 連接 Trino

使用 SQLExecuteQueryOperatorTrino 查詢引擎中執行 SQL 命令。

警告

TrinoOperator 已被棄用,建議改用 SQLExecuteQueryOperator。如果您目前使用 TrinoOperator,應盡快遷移。

使用 Operator

使用 trino_conn_id 參數來連接您的 Trino 實例

以下是如何使用 SQLExecuteQueryOperator 連接 Trino 的範例

tests/system/trino/example_trino.py[原始碼]


with models.DAG(
    dag_id="example_trino",
    schedule="@once",  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    trino_create_schema = SQLExecuteQueryOperator(
        task_id="trino_create_schema",
        sql=f"CREATE SCHEMA IF NOT EXISTS {SCHEMA} WITH (location = 's3://irisbkt/cities/');",
        handler=list,
    )
    trino_create_table = SQLExecuteQueryOperator(
        task_id="trino_create_table",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE}(
        cityid bigint,
        cityname varchar
        )""",
        handler=list,
    )
    trino_insert = SQLExecuteQueryOperator(
        task_id="trino_insert",
        sql=f"""INSERT INTO {SCHEMA}.{TABLE} VALUES (1, 'San Francisco');""",
        handler=list,
    )
    trino_multiple_queries = SQLExecuteQueryOperator(
        task_id="trino_multiple_queries",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE1}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE1} VALUES (2, 'San Jose');
        CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE2}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE2} VALUES (3, 'San Diego');""",
        handler=list,
    )
    trino_templated_query = SQLExecuteQueryOperator(
        task_id="trino_templated_query",
        sql="SELECT * FROM {{ params.SCHEMA }}.{{ params.TABLE }}",
        handler=list,
        params={"SCHEMA": SCHEMA, "TABLE": TABLE1},
    )
    trino_parameterized_query = SQLExecuteQueryOperator(
        task_id="trino_parameterized_query",
        sql=f"select * from {SCHEMA}.{TABLE2} where cityname = ?",
        parameters=("San Diego",),
        handler=list,
    )

    (
        trino_create_schema
        >> trino_create_table
        >> trino_insert
        >> trino_multiple_queries
        >> trino_templated_query
        >> trino_parameterized_query
    )

注意

此 Operator 可用於執行任何語法正確的 Trino 查詢,並且可以使用 liststring 傳遞多個查詢

此條目是否有幫助?