SQL 運算子

這些運算子對 SQL 資料庫執行各種查詢,包括資料行和表格層級的資料品質檢查。

執行 SQL 查詢

使用 SQLExecuteQueryOperator 以針對不同的資料庫執行 SQL 查詢。運算子的參數為

  • sql - 單一字串、字串列表或指向要執行的範本檔案的字串;

  • autocommit (選填) 若為 True,則每個命令都會自動提交 (預設值:False);

  • parameters (選填) 用於呈現 SQL 查詢的參數。

  • handler (選填) 將應用於游標的函式。如果為 None,則不會傳回結果 (預設值:fetch_all_handler)。

  • split_statements (選填) 是否將單個 SQL 字串分割成多個語句並分別執行 (預設值:False)。

  • return_last (選填) 取決於 split_statements,如果為 True,則此參數用於僅傳回最後一個語句或所有分割語句的結果 (預設值:True)。

以下範例示範如何實例化 SQLExecuteQueryOperator 任務。

tests/system/common/sql/example_sql_execute_query.py[原始碼]

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

檢查 SQL 表格資料行

使用 SQLColumnCheckOperator 以針對給定表格的資料行執行資料品質檢查。除了連線 ID 和表格之外,還必須提供描述資料行和要執行的測試之間關係的 column_mapping。範例資料行映射是一組三個巢狀字典,看起來像這樣

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

其中 col_name 是要執行檢查的資料行名稱,而其字典中的每個條目都是一個檢查。有效的檢查為

  • null_check:檢查資料行中 NULL 值的數量

  • distinct_check:檢查資料行中相異值的 COUNT

  • unique_check:根據列數檢查資料行中相異值的數量

  • min:檢查資料行中的最小值

  • max:檢查資料行中的最大值

檢查字典中的每個條目都是檢查成功的條件、容錯值或分割子句。成功的條件為

  • greater_than

  • geq_to

  • less_than

  • leq_to

  • equal_to

指定條件時,equal_to 與其他條件不相容。可以在同一個檢查中指定下限和上限條件。容錯值是以百分比表示的結果可能超出範圍但仍被視為成功的程度。

分割子句可以在運算子層級作為參數給定,在該層級分割所有檢查;可以在資料行映射中的資料行層級給定,在該層級分割該資料行的所有檢查;或者可以在資料行的檢查層級給定,在該層級僅分割該檢查。

如果未使用來自提供的連線的資料庫,也可以指定資料庫。

預設為 true 的 accept_none 引數會將查詢傳回的 None 值轉換為 0,允許空表格傳回有效的整數。

以下範例示範如何實例化 SQLColumnCheckOperator 任務。

tests/system/common/sql/example_sql_column_table_check.py[原始碼]

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

檢查 SQL 表格值

使用 SQLTableCheckOperator 以針對給定表格執行資料品質檢查。除了連線 ID 和表格之外,還必須提供描述表格和要執行的測試之間關係的 checks 字典。範例 checks 引數是一組兩個巢狀字典,看起來像這樣

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

第一組鍵是檢查名稱,在運算子建置的範本化查詢中引用。檢查名稱下的字典鍵必須包含 check_statement,且值為解析為布林值的 SQL 語句 (這可以是解析為 airflow.operators.sql.parse_boolean 中的布林值的任何字串或整數)。要提供的另一個可能的鍵是 partition_clause,它是一個檢查層級語句,將使用 WHERE 子句分割表格中的資料以進行該檢查。此語句與參數 partition_clause 相容,後者會篩選所有檢查。

以下範例示範如何實例化 SQLTableCheckOperator 任務。

tests/system/common/sql/example_sql_column_table_check.py[原始碼]

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

根據閾值檢查值

使用 SQLThresholdCheckOperator 以將特定的 SQL 查詢結果與定義的最小和最大閾值進行比較。閾值可以是數值或評估為數值的另一個 SQL 查詢。此運算子需要連線 ID 以及要執行的 SQL 查詢,並允許選填指定資料庫,如果應該覆寫來自 connection_id 的資料庫。參數為:- sql - 要執行的 sql 查詢,作為範本化字串。- min_threshold 要檢查的最小閾值。可以是數值或範本化 sql 查詢。- max_threshold 要檢查的最大閾值。可以是數值或範本化 sql 查詢。- conn_id (選填) 用於連線到資料庫的連線 ID。- database (選填) 資料庫名稱,用於覆寫來自連線的資料庫。

以下範例示範如何實例化 SQLThresholdCheckOperator 任務。

tests/system/common/sql/example_sql_threshold_check.py[原始碼]

threshhold_check = SQLThresholdCheckOperator(
    task_id="threshhold_check",
    conn_id="sales_db",
    sql="SELECT count(distinct(customer_id)) FROM sales;",
    min_threshold=1,
    max_threshold=1000,
)

如果查詢傳回的值在閾值範圍內,則任務通過。否則,任務失敗。

這個條目有幫助嗎?