airflow.providers.common.sql.operators.sql
¶
模組內容¶
類別¶
這是通用 SQL 運算子的基礎類別,用於取得 DB Hook。 |
|
在特定資料庫中執行 SQL 程式碼。 |
|
在 column_checks 字典中執行一個或多個範本化檢查。 |
|
執行 checks 字典中提供的一個或多個檢查。 |
|
對資料庫執行檢查。 |
|
使用 SQL 程式碼執行簡單的值檢查。 |
|
檢查以 SQL 表達式給出的指標是否在先前 days_back 的容許範圍內。 |
|
使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。 |
|
允許 DAG 根據 SQL 查詢的結果「分支」或遵循指定的路徑。 |
函式¶
|
- class airflow.providers.common.sql.operators.sql.BaseSQLOperator(*, conn_id=None, database=None, hook_params=None, retry_on_failure=True, **kwargs)[原始碼]¶
基底類別:
airflow.models.BaseOperator
這是通用 SQL 運算子的基礎類別,用於取得 DB Hook。
提供的方法為 .get_db_hook()。預設行為會嘗試根據連線類型檢索 DB hook。您可以透過覆寫 .get_db_hook() 方法來自訂行為。
- 參數
conn_id (str | None) – 參考特定資料庫
- template_fields: collections.abc.Sequence[str] = ('conn_id', 'database', 'hook_params')[原始碼]¶
- classmethod get_hook(conn_id, hook_params=None)[原始碼]¶
傳回此連線 ID 的預設 Hook。
- 參數
conn_id (str) – 連線 ID
hook_params (dict | None) – hook 參數
- 傳回
此連線的預設 hook
- 傳回類型
- class airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator(*, sql, autocommit=False, parameters=None, handler=fetch_all_handler, output_processor=None, conn_id=None, database=None, split_statements=None, return_last=True, show_return_value_in_logs=False, **kwargs)[原始碼]¶
基底類別:
BaseSQLOperator
在特定資料庫中執行 SQL 程式碼。
在實作特定的運算子時,您也可以在 hook 中實作 _process_output 方法,以執行由您的 DB Hook 傳回的值的額外處理。例如,您可以將從語句游標檢索的描述與傳回值合併,或將運算子的輸出儲存到檔案。
- 參數
sql (str | list[str]) – 要執行的 SQL 程式碼或指向範本檔案的字串(已範本化)。檔案必須具有 '.sql' 副檔名。
autocommit (bool) – (選用)如果為 True,則會自動提交每個命令(預設值:False)。
parameters (collections.abc.Mapping | collections.abc.Iterable | None) – (選用)用於呈現 SQL 查詢的參數。
handler (Callable[[Any], list[tuple] | None]) – (選用)將套用至游標的函式(預設值:fetch_all_handler)。
output_processor (Callable[[list[Any], list[collections.abc.Sequence[collections.abc.Sequence] | None]], list[Any] | tuple[list[collections.abc.Sequence[collections.abc.Sequence] | None], list]] | None) – (選用)將套用至結果的函式(預設值:default_output_processor)。
split_statements (bool | None) – (選用)是否將單個 SQL 字串分割成語句。預設情況下,會延遲到已配置 hook 的
run
方法中的預設值。conn_id (str | None) – 用於連線至資料庫的連線 ID
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
return_last (bool) – (選用)僅傳回最後一個語句的結果(預設值:True)。
show_return_value_in_logs (bool) – (選用)如果為 true,運算子輸出將列印到任務日誌。謹慎使用。不建議將大型資料集轉儲到日誌。(預設值:False)。
參見
有關如何使用此運算子的更多資訊,請參閱指南: 執行 SQL 查詢
- template_fields: collections.abc.Sequence[str] = ('sql', 'parameters')[原始碼]¶
- template_ext: collections.abc.Sequence[str] = ('.sql', '.json')[原始碼]¶
- class airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator(*, table, column_mapping, partition_clause=None, conn_id=None, database=None, accept_none=True, **kwargs)[原始碼]¶
基底類別:
BaseSQLOperator
在 column_checks 字典中執行一個或多個範本化檢查。
檢查是根據 column_mapping 指定的每個欄位執行。
每個檢查可以採用以下一個或多個選項
equal_to
:要等於的確切值,不能與其他比較選項一起使用greater_than
:結果應嚴格大於的值less_than
:結果應嚴格小於的值geq_to
:結果應大於或等於的值leq_to
:結果應小於或等於的值tolerance
:結果可能與預期值相差的百分比partition_clause
:傳遞到 WHERE 語句以分割資料的額外子句
- 參數
table (str) – 要在其上執行檢查的表格
column_mapping (dict[str, dict[str, Any]]) –
欄位及其相關檢查的字典,例如
{ "col_name": { "null_check": { "equal_to": 0, "partition_clause": "foreign_key IS NOT NULL", }, "min": { "greater_than": 5, "leq_to": 10, "tolerance": 0.2, }, "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01}, } }
partition_clause (str | None) –
部分 SQL 語句,新增至運算子建立的查詢中的 WHERE 子句,以建立要執行檢查的 partition_clauses,例如
"date = '1970-01-01'"
conn_id (str | None) – 用於連線至資料庫的連線 ID
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
accept_none (bool) – 是否接受查詢傳回的 None 值。如果為 true,則將 None 轉換為 0。
參見
有關如何使用此運算子的更多資訊,請參閱指南: 檢查 SQL 表格欄位
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[原始碼]¶
- class airflow.providers.common.sql.operators.sql.SQLTableCheckOperator(*, table, checks, partition_clause=None, conn_id=None, database=None, **kwargs)[原始碼]¶
基底類別:
BaseSQLOperator
執行 checks 字典中提供的一個或多個檢查。
檢查應撰寫為傳回布林值結果。
- 參數
table (str) – 要在其上執行檢查的表格
checks (dict[str, dict[str, Any]]) –
檢查的字典,其中檢查名稱後跟著一個字典,該字典至少包含一個檢查語句,以及可選的分區子句,例如
{ "row_count_check": {"check_statement": "COUNT(*) = 1000"}, "column_sum_check": {"check_statement": "col_a + col_b < col_c"}, "third_check": {"check_statement": "MIN(col) = 1", "partition_clause": "col IS NOT NULL"}, }
partition_clause (str | None) –
部分 SQL 語句,新增至運算子建立的查詢中的 WHERE 子句,以建立要執行檢查的 partition_clauses,例如
"date = '1970-01-01'"
conn_id (str | None) – 用於連線至資料庫的連線 ID
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
參見
有關如何使用此運算子的更多資訊,請參閱指南: 檢查 SQL 表格值
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[原始碼]¶
- class airflow.providers.common.sql.operators.sql.SQLCheckOperator(*, sql, conn_id=None, database=None, parameters=None, **kwargs)[原始碼]¶
基底類別:
BaseSQLOperator
對資料庫執行檢查。
SQLCheckOperator
預期 SQL 查詢將傳回單一列。使用 pythonbool
轉換評估該第一列上的每個值。如果任何值傳回False
,則檢查失敗並出現錯誤。如果傳回 Python 字典,並且 Python 字典中的任何值為False
,則檢查失敗並出現錯誤。請注意,Python 布林值轉換將以下內容評估為
False
False
0
空字串 (
""
)空列表 (
[]
)空字典或集合 (
{}
)值為
False
的字典 ({'DUPLICATE_ID_CHECK': False}
)
給定類似
SELECT COUNT(*) FROM foo
的查詢,只有在計數== 0
時才會失敗。您可以製作更複雜的查詢,例如,檢查表格的列數是否與上游來源表格的列數相同,或者今天的分割區計數是否大於昨天的分割區計數,或者一組指標是否小於 7 天平均值的 3 個標準差。此運算子可以用作您管線中的資料品質檢查,並且根據您將其放置在 DAG 中的位置,您可以選擇停止關鍵路徑,防止發布可疑資料,或在側邊接收電子郵件警報,而不會停止 DAG 的進度。
- 參數
sql (str) – 要執行的 SQL。(已範本化)
conn_id (str | None) – 用於連線至資料庫的連線 ID。
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (選用)用於呈現 SQL 查詢的參數。
- template_fields: collections.abc.Sequence[str] = ('sql',)[原始碼]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLValueCheckOperator(*, sql, pass_value, tolerance=None, conn_id=None, database=None, **kwargs)[source]¶
基底類別:
BaseSQLOperator
使用 SQL 程式碼執行簡單的值檢查。
- 參數
sql (str) – 要執行的 SQL。(已範本化)
conn_id (str | None) – 用於連線至資料庫的連線 ID。
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
- template_fields: collections.abc.Sequence[str] = ('sql', 'pass_value')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, ratio_formula='max_over_min', ignore_zero=True, conn_id=None, database=None, **kwargs)[source]¶
基底類別:
BaseSQLOperator
檢查以 SQL 表達式給出的指標是否在先前 days_back 的容許範圍內。
- 參數
table (str) – 資料表名稱
conn_id (str | None) – 用於連線至資料庫的連線 ID。
database (str | None) – 資料庫名稱,將覆寫連線中定義的資料庫
days_back (SupportsAbs[int]) – ds 與我們要對照檢查的 ds 之間的天數。預設為 7 天
date_filter_column (str | None) – 用於篩選日期的欄位名稱。預設為 ‘ds’
ratio_formula (str | None) –
用於計算兩個指標之間比率的公式。假設 cur 是今天的指標,而 ref 是今天 - days_back 的指標。預設值:‘max_over_min’
max_over_min
: 計算 max(cur, ref) / min(cur, ref)relative_diff
: 計算 abs(cur-ref) / ref
ignore_zero (bool) – 是否應忽略零指標
- template_fields: collections.abc.Sequence[str] = ('sql1', 'sql2')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator(*, sql, min_threshold, max_threshold, conn_id=None, database=None, **kwargs)[source]¶
基底類別:
BaseSQLOperator
使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。
閾值可以是數值或產生數值的 SQL 陳述式形式。
- 參數
sql (str) – 要執行的 SQL。(已範本化)
conn_id (str | None) – 用於連線至資料庫的連線 ID。
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
min_threshold (Any) – 要執行的數值或最小閾值 SQL(已套用範本)
max_threshold (Any) – 要執行的數值或最大閾值 SQL(已套用範本)
參見
如需關於如何使用此運算子的更多資訊,請參閱指南: 根據閾值檢查值
- template_fields: collections.abc.Sequence[str] = ('sql', 'min_threshold', 'max_threshold')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.BranchSQLOperator(*, sql, follow_task_ids_if_true, follow_task_ids_if_false, conn_id='default_conn_id', database=None, parameters=None, **kwargs)[source]¶
繼承自:
BaseSQLOperator
,airflow.models.SkipMixin
允許 DAG 根據 SQL 查詢的結果「分支」或遵循指定的路徑。
- 參數
sql (str) – 要執行的 SQL 程式碼,應傳回 true 或 false(已套用範本)。範本參考會以 ‘.sql’ 結尾的字串辨識。預期的 SQL 查詢應傳回布林值 (True/False)、整數 (0 = False,否則 = 1) 或字串 (true/y/yes/1/on/false/n/no/0/off)。
follow_task_ids_if_true (list[str]) – 如果查詢傳回 true,要遵循的任務 ID 或任務 ID
follow_task_ids_if_false (list[str]) – 如果查詢傳回 false,要遵循的任務 ID 或任務 ID
conn_id (str) – 用於連線到資料庫的連線 ID。
database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (選用)用於呈現 SQL 查詢的參數。
- template_fields: collections.abc.Sequence[str] = ('sql',)[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶