airflow.providers.common.sql.operators.sql

模組內容

類別

BaseSQLOperator

這是通用 SQL 運算子的基礎類別,用於取得 DB Hook。

SQLExecuteQueryOperator

在特定資料庫中執行 SQL 程式碼。

SQLColumnCheckOperator

在 column_checks 字典中執行一個或多個範本化檢查。

SQLTableCheckOperator

執行 checks 字典中提供的一個或多個檢查。

SQLCheckOperator

對資料庫執行檢查。

SQLValueCheckOperator

使用 SQL 程式碼執行簡單的值檢查。

SQLIntervalCheckOperator

檢查以 SQL 表達式給出的指標是否在先前 days_back 的容許範圍內。

SQLThresholdCheckOperator

使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。

BranchSQLOperator

允許 DAG 根據 SQL 查詢的結果「分支」或遵循指定的路徑。

函式

default_output_processor(results, descriptions)

airflow.providers.common.sql.operators.sql.default_output_processor(results, descriptions)[原始碼]
class airflow.providers.common.sql.operators.sql.BaseSQLOperator(*, conn_id=None, database=None, hook_params=None, retry_on_failure=True, **kwargs)[原始碼]

基底類別: airflow.models.BaseOperator

這是通用 SQL 運算子的基礎類別,用於取得 DB Hook。

提供的方法為 .get_db_hook()。預設行為會嘗試根據連線類型檢索 DB hook。您可以透過覆寫 .get_db_hook() 方法來自訂行為。

參數

conn_id (str | None) – 參考特定資料庫

conn_id_field = 'conn_id'[原始碼]
template_fields: collections.abc.Sequence[str] = ('conn_id', 'database', 'hook_params')[原始碼]
classmethod get_hook(conn_id, hook_params=None)[原始碼]

傳回此連線 ID 的預設 Hook。

參數
  • conn_id (str) – 連線 ID

  • hook_params (dict | None) – hook 參數

傳回

此連線的預設 hook

傳回類型

airflow.hooks.base.BaseHook

get_db_hook()[原始碼]

取得連線的資料庫 hook。

傳回

資料庫 hook 物件。

傳回類型

airflow.providers.common.sql.hooks.sql.DbApiHook

class airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator(*, sql, autocommit=False, parameters=None, handler=fetch_all_handler, output_processor=None, conn_id=None, database=None, split_statements=None, return_last=True, show_return_value_in_logs=False, **kwargs)[原始碼]

基底類別: BaseSQLOperator

在特定資料庫中執行 SQL 程式碼。

在實作特定的運算子時,您也可以在 hook 中實作 _process_output 方法,以執行由您的 DB Hook 傳回的值的額外處理。例如,您可以將從語句游標檢索的描述與傳回值合併,或將運算子的輸出儲存到檔案。

參數
  • sql (str | list[str]) – 要執行的 SQL 程式碼或指向範本檔案的字串(已範本化)。檔案必須具有 '.sql' 副檔名。

  • autocommit (bool) – (選用)如果為 True,則會自動提交每個命令(預設值:False)。

  • parameters (collections.abc.Mapping | collections.abc.Iterable | None) – (選用)用於呈現 SQL 查詢的參數。

  • handler (Callable[[Any], list[tuple] | None]) – (選用)將套用至游標的函式(預設值:fetch_all_handler)。

  • output_processor (Callable[[list[Any], list[collections.abc.Sequence[collections.abc.Sequence] | None]], list[Any] | tuple[list[collections.abc.Sequence[collections.abc.Sequence] | None], list]] | None) – (選用)將套用至結果的函式(預設值:default_output_processor)。

  • split_statements (bool | None) – (選用)是否將單個 SQL 字串分割成語句。預設情況下,會延遲到已配置 hook 的 run 方法中的預設值。

  • conn_id (str | None) – 用於連線至資料庫的連線 ID

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

  • return_last (bool) – (選用)僅傳回最後一個語句的結果(預設值:True)。

  • show_return_value_in_logs (bool) – (選用)如果為 true,運算子輸出將列印到任務日誌。謹慎使用。不建議將大型資料集轉儲到日誌。(預設值:False)。

參見

有關如何使用此運算子的更多資訊,請參閱指南: 執行 SQL 查詢

template_fields: collections.abc.Sequence[str] = ('sql', 'parameters')[原始碼]
template_ext: collections.abc.Sequence[str] = ('.sql', '.json')[原始碼]
template_fields_renderers: ClassVar[dict][原始碼]
ui_color = '#cdaaed'[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

prepare_template()[原始碼]

剖析屬性參數的範本檔案。

get_openlineage_facets_on_start()[原始碼]
get_openlineage_facets_on_complete(task_instance)[原始碼]
class airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator(*, table, column_mapping, partition_clause=None, conn_id=None, database=None, accept_none=True, **kwargs)[原始碼]

基底類別: BaseSQLOperator

在 column_checks 字典中執行一個或多個範本化檢查。

檢查是根據 column_mapping 指定的每個欄位執行。

每個檢查可以採用以下一個或多個選項

  • equal_to:要等於的確切值,不能與其他比較選項一起使用

  • greater_than:結果應嚴格大於的值

  • less_than:結果應嚴格小於的值

  • geq_to:結果應大於或等於的值

  • leq_to:結果應小於或等於的值

  • tolerance:結果可能與預期值相差的百分比

  • partition_clause:傳遞到 WHERE 語句以分割資料的額外子句

參數
  • table (str) – 要在其上執行檢查的表格

  • column_mapping (dict[str, dict[str, Any]]) –

    欄位及其相關檢查的字典,例如

    {
        "col_name": {
            "null_check": {
                "equal_to": 0,
                "partition_clause": "foreign_key IS NOT NULL",
            },
            "min": {
                "greater_than": 5,
                "leq_to": 10,
                "tolerance": 0.2,
            },
            "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
        }
    }
    

  • partition_clause (str | None) –

    部分 SQL 語句,新增至運算子建立的查詢中的 WHERE 子句,以建立要執行檢查的 partition_clauses,例如

    "date = '1970-01-01'"
    

  • conn_id (str | None) – 用於連線至資料庫的連線 ID

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

  • accept_none (bool) – 是否接受查詢傳回的 None 值。如果為 true,則將 None 轉換為 0。

參見

有關如何使用此運算子的更多資訊,請參閱指南: 檢查 SQL 表格欄位

template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[原始碼]
template_fields_renderers: ClassVar[dict][原始碼]
sql_check_template = Multiline-String[原始碼]
顯示值
"""
        SELECT '{column}' AS col_name, '{check}' AS check_type, {column}_{check} AS check_result
        FROM (SELECT {check_statement} AS {column}_{check} FROM {table} {partition_clause}) AS sq
    """
column_checks[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.common.sql.operators.sql.SQLTableCheckOperator(*, table, checks, partition_clause=None, conn_id=None, database=None, **kwargs)[原始碼]

基底類別: BaseSQLOperator

執行 checks 字典中提供的一個或多個檢查。

檢查應撰寫為傳回布林值結果。

參數
  • table (str) – 要在其上執行檢查的表格

  • checks (dict[str, dict[str, Any]]) –

    檢查的字典,其中檢查名稱後跟著一個字典,該字典至少包含一個檢查語句,以及可選的分區子句,例如

    {
        "row_count_check": {"check_statement": "COUNT(*) = 1000"},
        "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
        "third_check": {"check_statement": "MIN(col) = 1", "partition_clause": "col IS NOT NULL"},
    }
    

  • partition_clause (str | None) –

    部分 SQL 語句,新增至運算子建立的查詢中的 WHERE 子句,以建立要執行檢查的 partition_clauses,例如

    "date = '1970-01-01'"
    

  • conn_id (str | None) – 用於連線至資料庫的連線 ID

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

參見

有關如何使用此運算子的更多資訊,請參閱指南: 檢查 SQL 表格值

template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[原始碼]
template_fields_renderers: ClassVar[dict][原始碼]
sql_check_template = Multiline-String[原始碼]
顯示值
"""
    SELECT '{check_name}' AS check_name, MIN({check_name}) AS check_result
    FROM (SELECT CASE WHEN {check_statement} THEN 1 ELSE 0 END AS {check_name}
          FROM {table} {partition_clause}) AS sq
    """
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.common.sql.operators.sql.SQLCheckOperator(*, sql, conn_id=None, database=None, parameters=None, **kwargs)[原始碼]

基底類別: BaseSQLOperator

對資料庫執行檢查。

SQLCheckOperator 預期 SQL 查詢將傳回單一列。使用 python bool 轉換評估該第一列上的每個值。如果任何值傳回 False,則檢查失敗並出現錯誤。如果傳回 Python 字典,並且 Python 字典中的任何值為 False,則檢查失敗並出現錯誤。

請注意,Python 布林值轉換將以下內容評估為 False

  • False

  • 0

  • 空字串 ("")

  • 空列表 ([])

  • 空字典或集合 ({})

  • 值為 False 的字典 ({'DUPLICATE_ID_CHECK': False})

給定類似 SELECT COUNT(*) FROM foo 的查詢,只有在計數 == 0 時才會失敗。您可以製作更複雜的查詢,例如,檢查表格的列數是否與上游來源表格的列數相同,或者今天的分割區計數是否大於昨天的分割區計數,或者一組指標是否小於 7 天平均值的 3 個標準差。

此運算子可以用作您管線中的資料品質檢查,並且根據您將其放置在 DAG 中的位置,您可以選擇停止關鍵路徑,防止發布可疑資料,或在側邊接收電子郵件警報,而不會停止 DAG 的進度。

參數
  • sql (str) – 要執行的 SQL。(已範本化)

  • conn_id (str | None) – 用於連線至資料庫的連線 ID。

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (選用)用於呈現 SQL 查詢的參數。

template_fields: collections.abc.Sequence[str] = ('sql',)[原始碼]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.common.sql.operators.sql.SQLValueCheckOperator(*, sql, pass_value, tolerance=None, conn_id=None, database=None, **kwargs)[source]

基底類別: BaseSQLOperator

使用 SQL 程式碼執行簡單的值檢查。

參數
  • sql (str) – 要執行的 SQL。(已範本化)

  • conn_id (str | None) – 用於連線至資料庫的連線 ID。

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

__mapper_args__[source]
template_fields: collections.abc.Sequence[str] = ('sql', 'pass_value')[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
check_value(records)[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, ratio_formula='max_over_min', ignore_zero=True, conn_id=None, database=None, **kwargs)[source]

基底類別: BaseSQLOperator

檢查以 SQL 表達式給出的指標是否在先前 days_back 的容許範圍內。

參數
  • table (str) – 資料表名稱

  • conn_id (str | None) – 用於連線至資料庫的連線 ID。

  • database (str | None) – 資料庫名稱,將覆寫連線中定義的資料庫

  • days_back (SupportsAbs[int]) – ds 與我們要對照檢查的 ds 之間的天數。預設為 7 天

  • date_filter_column (str | None) – 用於篩選日期的欄位名稱。預設為 ‘ds’

  • ratio_formula (str | None) –

    用於計算兩個指標之間比率的公式。假設 cur 是今天的指標,而 ref 是今天 - days_back 的指標。預設值:‘max_over_min’

    • max_over_min: 計算 max(cur, ref) / min(cur, ref)

    • relative_diff: 計算 abs(cur-ref) / ref

  • ignore_zero (bool) – 是否應忽略零指標

  • metrics_thresholds (dict[str, int]) – 以指標索引的比率字典

__mapper_args__[source]
template_fields: collections.abc.Sequence[str] = ('sql1', 'sql2')[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
ratio_formulas[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator(*, sql, min_threshold, max_threshold, conn_id=None, database=None, **kwargs)[source]

基底類別: BaseSQLOperator

使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。

閾值可以是數值或產生數值的 SQL 陳述式形式。

參數
  • sql (str) – 要執行的 SQL。(已範本化)

  • conn_id (str | None) – 用於連線至資料庫的連線 ID。

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

  • min_threshold (Any) – 要執行的數值或最小閾值 SQL(已套用範本)

  • max_threshold (Any) – 要執行的數值或最大閾值 SQL(已套用範本)

參見

如需關於如何使用此運算子的更多資訊,請參閱指南: 根據閾值檢查值

template_fields: collections.abc.Sequence[str] = ('sql', 'min_threshold', 'max_threshold')[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

push(meta_data)[source]

將資料檢查資訊和中繼資料傳送到外部資料庫。

預設功能將記錄中繼資料。

class airflow.providers.common.sql.operators.sql.BranchSQLOperator(*, sql, follow_task_ids_if_true, follow_task_ids_if_false, conn_id='default_conn_id', database=None, parameters=None, **kwargs)[source]

繼承自: BaseSQLOperator, airflow.models.SkipMixin

允許 DAG 根據 SQL 查詢的結果「分支」或遵循指定的路徑。

參數
  • sql (str) – 要執行的 SQL 程式碼,應傳回 true 或 false(已套用範本)。範本參考會以 ‘.sql’ 結尾的字串辨識。預期的 SQL 查詢應傳回布林值 (True/False)、整數 (0 = False,否則 = 1) 或字串 (true/y/yes/1/on/false/n/no/0/off)。

  • follow_task_ids_if_true (list[str]) – 如果查詢傳回 true,要遵循的任務 ID 或任務 ID

  • follow_task_ids_if_false (list[str]) – 如果查詢傳回 false,要遵循的任務 ID 或任務 ID

  • conn_id (str) – 用於連線到資料庫的連線 ID。

  • database (str | None) – 資料庫名稱,覆寫連線中定義的資料庫

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (選用)用於呈現 SQL 查詢的參數。

template_fields: collections.abc.Sequence[str] = ('sql',)[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#a22034'[source]
ui_fgcolor = '#F7F7F7'[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

此條目是否有幫助?