SQL 運算子¶
這些運算子對 SQL 資料庫執行各種查詢,包括資料行和表格層級的資料品質檢查。
執行 SQL 查詢¶
使用 SQLExecuteQueryOperator
以針對不同的資料庫執行 SQL 查詢。運算子的參數為
sql
- 單一字串、字串列表或指向要執行的範本檔案的字串;autocommit
(選填) 若為 True,則每個命令都會自動提交 (預設值:False);parameters
(選填) 用於呈現 SQL 查詢的參數。handler
(選填) 將應用於游標的函式。如果為None
,則不會傳回結果 (預設值:fetch_all_handler)。split_statements
(選填) 是否將單個 SQL 字串分割成多個語句並分別執行 (預設值:False)。return_last
(選填) 取決於split_statements
,如果為True
,則此參數用於僅傳回最後一個語句或所有分割語句的結果 (預設值:True)。
以下範例示範如何實例化 SQLExecuteQueryOperator 任務。
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
split_statements=True,
return_last=False,
)
檢查 SQL 表格資料行¶
使用 SQLColumnCheckOperator
以針對給定表格的資料行執行資料品質檢查。除了連線 ID 和表格之外,還必須提供描述資料行和要執行的測試之間關係的 column_mapping。範例資料行映射是一組三個巢狀字典,看起來像這樣
column_mapping = {
"col_name": {
"null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
"min": {
"greater_than": 5,
"leq_to": 10,
"tolerance": 0.2,
},
"max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
}
}
其中 col_name 是要執行檢查的資料行名稱,而其字典中的每個條目都是一個檢查。有效的檢查為
null_check:檢查資料行中 NULL 值的數量
distinct_check:檢查資料行中相異值的 COUNT
unique_check:根據列數檢查資料行中相異值的數量
min:檢查資料行中的最小值
max:檢查資料行中的最大值
檢查字典中的每個條目都是檢查成功的條件、容錯值或分割子句。成功的條件為
greater_than
geq_to
less_than
leq_to
equal_to
指定條件時,equal_to 與其他條件不相容。可以在同一個檢查中指定下限和上限條件。容錯值是以百分比表示的結果可能超出範圍但仍被視為成功的程度。
分割子句可以在運算子層級作為參數給定,在該層級分割所有檢查;可以在資料行映射中的資料行層級給定,在該層級分割該資料行的所有檢查;或者可以在資料行的檢查層級給定,在該層級僅分割該檢查。
如果未使用來自提供的連線的資料庫,也可以指定資料庫。
預設為 true 的 accept_none 引數會將查詢傳回的 None 值轉換為 0,允許空表格傳回有效的整數。
以下範例示範如何實例化 SQLColumnCheckOperator 任務。
column_check = SQLColumnCheckOperator(
task_id="column_check",
table=AIRFLOW_DB_METADATA_TABLE,
column_mapping={
"id": {
"null_check": {
"equal_to": 0,
"tolerance": 0,
},
"distinct_check": {
"equal_to": 1,
},
}
},
)
檢查 SQL 表格值¶
使用 SQLTableCheckOperator
以針對給定表格執行資料品質檢查。除了連線 ID 和表格之外,還必須提供描述表格和要執行的測試之間關係的 checks 字典。範例 checks 引數是一組兩個巢狀字典,看起來像這樣
checks = (
{
"row_count_check": {
"check_statement": "COUNT(*) = 1000",
},
"column_sum_check": {
"check_statement": "col_a + col_b < col_c",
"partition_clause": "col_a IS NOT NULL",
},
},
)
第一組鍵是檢查名稱,在運算子建置的範本化查詢中引用。檢查名稱下的字典鍵必須包含 check_statement,且值為解析為布林值的 SQL 語句 (這可以是解析為 airflow.operators.sql.parse_boolean 中的布林值的任何字串或整數)。要提供的另一個可能的鍵是 partition_clause,它是一個檢查層級語句,將使用 WHERE 子句分割表格中的資料以進行該檢查。此語句與參數 partition_clause 相容,後者會篩選所有檢查。
以下範例示範如何實例化 SQLTableCheckOperator 任務。
row_count_check = SQLTableCheckOperator(
task_id="row_count_check",
table=AIRFLOW_DB_METADATA_TABLE,
checks={
"row_count_check": {
"check_statement": "COUNT(*) = 1",
}
},
)
根據閾值檢查值¶
使用 SQLThresholdCheckOperator
以將特定的 SQL 查詢結果與定義的最小和最大閾值進行比較。閾值可以是數值或評估為數值的另一個 SQL 查詢。此運算子需要連線 ID 以及要執行的 SQL 查詢,並允許選填指定資料庫,如果應該覆寫來自 connection_id 的資料庫。參數為:- sql
- 要執行的 sql 查詢,作為範本化字串。- min_threshold
要檢查的最小閾值。可以是數值或範本化 sql 查詢。- max_threshold
要檢查的最大閾值。可以是數值或範本化 sql 查詢。- conn_id
(選填) 用於連線到資料庫的連線 ID。- database
(選填) 資料庫名稱,用於覆寫來自連線的資料庫。
以下範例示範如何實例化 SQLThresholdCheckOperator 任務。
threshhold_check = SQLThresholdCheckOperator(
task_id="threshhold_check",
conn_id="sales_db",
sql="SELECT count(distinct(customer_id)) FROM sales;",
min_threshold=1,
max_threshold=1000,
)
如果查詢傳回的值在閾值範圍內,則任務通過。否則,任務失敗。