參數

參數讓您能夠為任務提供運行時組態。您可以在 DAG 程式碼中設定預設參數,並在觸發 DAG 時提供額外參數,或覆寫參數值。Param 值會使用 JSON Schema 進行驗證。對於排程的 DAG 執行,會使用預設的 Param 值。

定義的參數也用於在手動觸發時呈現良好的 UI。當您手動觸發 DAG 時,您可以在 dagrun 開始之前修改其參數。如果使用者提供的值未通過驗證,Airflow 會顯示警告,而不是建立 dagrun。

DAG 層級參數

若要將參數新增至 DAG,請使用 params kwarg 初始化它。使用字典,將參數名稱對應到 Param 或指示參數預設值的物件。

 from airflow import DAG
 from airflow.decorators import task
 from airflow.models.param import Param

 with DAG(
     "the_dag",
     params={
         "x": Param(5, type="integer", minimum=3),
         "my_int_param": 6
     },
 ) as dag:

     @task.python
     def example_task(params: dict):
         # This will print the default value, 6:
         dag.log.info(dag.params['my_int_param'])

         # This will print the manually-provided value, 42:
         dag.log.info(params['my_int_param'])

         # This will print the default value, 5, since it wasn't provided manually:
         dag.log.info(params['x'])

     example_task()

 if __name__ == "__main__":
     dag.test(
         run_conf={"my_int_param": 42}
     )

注意

DAG 層級參數是傳遞給任務的預設值。這些不應與透過 UI 表單或 CLI 手動提供的值混淆,這些值僅存在於 DagRunTaskInstance 的上下文中。這種區別對於 TaskFlow DAG 至關重要,TaskFlow DAG 可能包含 with DAG(...) as dag: 區塊內的邏輯。在這種情況下,使用者可能會嘗試使用 dag 物件存取手動提供的參數值,但這只會包含預設值。為了確保存取手動提供的值,請在您的任務中使用範本變數,例如 paramsti

任務層級參數

您也可以將參數新增至個別任務。

def print_my_int_param(params):
  print(params.my_int_param)

PythonOperator(
    task_id="print_my_int_param",
    params={"my_int_param": 10},
    python_callable=print_my_int_param,
)

任務層級參數優先於 DAG 層級參數,而使用者提供的參數(在觸發 DAG 時)優先於任務層級參數。

在任務中引用參數

參數可以在 範本字串 中的 params 下引用。例如

 PythonOperator(
     task_id="from_template",
     op_args=[
         "{{ params.my_int_param + 10 }}",
     ],
     python_callable=(
         lambda my_int_param: print(my_int_param)
     ),
 )

即使參數可以使用各種型別,範本的預設行為是為您的任務提供字串。您可以透過在初始化 DAG 時設定 render_template_as_native_obj=True 來變更此行為。

 with DAG(
     "the_dag",
     params={"my_int_param": Param(5, type="integer", minimum=3)},
     render_template_as_native_obj=True
 ):

這樣,當 Param 的型別提供給您的任務時,將會受到尊重

# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
    task_id="template_type",
    op_args=[
        "{{ params.my_int_param }}",
    ],
    python_callable=(
        lambda my_int_param: print(type(my_int_param))
    ),
)

存取參數的另一種方式是透過任務的 context kwarg。

 def print_my_int_param(**context):
     print(context["params"]["my_int_param"])

 PythonOperator(
     task_id="print_my_int_param",
     python_callable=print_my_int_param,
     params={"my_int_param": 12345},
 )

JSON Schema 驗證

Param 使用 JSON Schema,因此您可以使用 https://json-schema.dev.org.tw/draft/2020-12/json-schema-validation.html 中提到的完整 JSON Schema 規格來定義 Param 物件。

with DAG(
    "my_dag",
    params={
        # an int with a default value
        "my_int_param": Param(10, type="integer", minimum=0, maximum=20),

        # a required param which can be of multiple types
        # a param must have a default value
        "multi_type_param": Param(5, type=["null", "number", "string"]),

        # an enum param, must be one of three values
        "enum_param": Param("foo", enum=["foo", "bar", 42]),

        # a param which uses json-schema formatting
        "email": Param(
            default="example@example.com",
            type="string",
            format="idn-email",
            minLength=5,
            maxLength=255,
        ),
    },
):

注意

如果為 DAG 定義了 schedule,則具有預設值的參數必須有效。這會在 DAG 剖析期間驗證。如果 schedule=None,則參數不會在 DAG 剖析期間驗證,而是在觸發 DAG 之前驗證。這在 DAG 作者不想提供預設值,但想要強制使用者在觸發時提供有效參數的情況下很有用。

注意

目前,基於安全考量,無法使用從自訂類別衍生的 Param 物件。我們正計畫為自訂 Param 類別建立註冊系統,就像我們為 Operator ExtraLinks 所做的那樣。

使用參數提供觸發 UI 表單

2.6.0 版本新增。

DAG 層級參數用於呈現使用者友善的觸發表單。當使用者點擊「觸發 DAG」按鈕時,會提供此表單。

觸發 UI 表單是根據預先定義的 DAG 參數呈現的。如果 DAG 沒有定義任何參數,則會略過觸發表單。表單元素可以使用 Param 類別定義,而屬性定義表單欄位的顯示方式。

觸發 UI 表單中支援以下功能

  • 來自頂層 DAG 參數的直接純量值(布林值、整數、字串、列表、字典)會自動裝箱到 Param 物件中。從原生 Python 資料型別自動偵測 type 屬性。因此,這些簡單型別會呈現為對應的欄位型別。參數名稱用作標籤,且不進行進一步驗證,所有值都視為選填。

  • 如果您使用 Param 類別作為參數值的定義,則可以新增以下屬性

    • Param 屬性 title 用於呈現輸入框的表單欄位標籤。如果未定義 title,則改為使用參數名稱/鍵。

    • Param 屬性 description 會在輸入欄位下方呈現為灰色幫助文字。如果您想要提供特殊格式或連結,則需要使用 Param 屬性 description_md。請參閱教學課程 DAG 參數 UI 範例 DAG 以取得範例。

    • Param 屬性 type 會影響欄位的呈現方式。支援以下型別

      參數型別

      表單元素型別

      其他支援的屬性

      範例

      string

      產生單行文字方塊或文字區域以編輯文字。

      • minLength:最小文字長度

      • maxLength:最大文字長度

      • format="date":產生日期選擇器
        與日曆彈出視窗
      • format="date-time":產生日期和
        時間選擇器與日曆彈出視窗
      • format="time":產生時間選擇器

      • format="multiline":產生多行文字區域

      • enum=["a", "b", "c"]:產生
        純量值的下拉式選取清單。
        根據 JSON 驗證,必須選取值
        或欄位必須明確標記為
        選填。另請參閱內部的詳細資訊
      • values_display={"a": "Alpha", "b": "Beta"}:
        對於透過
        enum 產生的選取下拉式選單,您可以新增屬性
        values_display,其中包含 dict 並將資料
        值對應到顯示標籤。
      • examples=["One", "Two", "Three"]:如果您
        想要呈現值的建議
        (不將使用者限制為固定的 enum
        如上所述),您可以使用 examples
        ,它是一個項目列表。
      另請參閱
      ,這些選項在後端 DAG 觸發之前檢查。

      Param("default", type="string", maxLength=10)

      Param(f"{datetime.date.today()}", type="string", format="date")

      number

      integer

      產生一個欄位,限制僅新增
      數值。HTML 瀏覽器
      通常也會在
      右側新增微調器以增加或減少
      值。integer 僅允許整數
      數字,number 也允許
      小數值。
      • minimum:最小值

      • maximum:最大值

      另請參閱
      ,這些選項在後端 DAG 觸發之前檢查。

      Param(42, type="integer", minimum=14, multipleOf=7)

      boolean

      產生一個切換按鈕,用作
      TrueFalse

      無。

      Param(True, type="boolean")

      array

      產生一個 HTML 多行文字欄位,
      編輯的每一行都將成為
      字串陣列作為值。
      • 如果您新增具有列表的屬性 examples
        ,則會產生多值選取選項
        ,而不是自由文字欄位。
      • values_display={"a": "Alpha", "b": "Beta"}:
        對於多值選取 examples,您可以新增
        屬性 values_display,其中包含 dict 和
        對應資料值以顯示標籤。
      • 如果您新增具有字典的屬性 items
        ,其中包含欄位 type
        ,其值不是「string」,則會產生 JSON 輸入
        欄位,以用於更多陣列型別和
        其他型別驗證,如

      Param(["a", "b", "c"], type="array")

      Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])

      Param(["one@example.com", "two@example.com"], type="array", items={"type": "string", "format": "idn-email"})

      object

      產生具有
      文字驗證的 JSON 輸入欄位。
      HTML 表單僅驗證
      JSON 輸入的語法。為了驗證
      特定結構的內容,請查看

      Param({"key": "value"}, type=["object", "null"])

      null

      指定不預期任何內容。
      單獨使用沒有太大意義
      ,但對於型別組合很有用
      ,例如 type=["null", "string"],因為
      type 屬性也接受
      型別列表。
      預設情況下,如果您指定型別,則
      欄位將會變成必填欄位,並帶有
      輸入 - 因為 JSON 驗證。
      如果您想要讓欄位值變成
      僅選填新增,您必須允許
      JSON schema 驗證允許 null
      值。

      Param(None, type=["null", "string"])

  • 如果表單欄位留空,則會將 None 值傳遞到參數字典。

  • 表單欄位會按照 DAG 中 params 定義的順序呈現。

  • 如果您想要將區段新增至表單,請將屬性 section 新增至每個欄位。文字將用作區段標籤。沒有 section 的欄位將會在預設區域中呈現。其他區段預設會摺疊。

  • 如果您想要讓參數不顯示,請使用 const 屬性。這些參數將會提交,但在表單中隱藏。const 值必須與預設值相符,才能通過 JSON Schema 驗證

  • 在表單底部,可以展開產生的 JSON 組態。如果您想要手動變更值,可以調整 JSON 組態。當表單欄位變更時,變更會被覆寫。

  • 若要在發布觸發表單的連結時預先填入表單中的值,您可以呼叫觸發 URL /dags/<dag_name>/trigger,並將查詢參數以 name=value 形式新增至 URL,例如 /dags/example_params_ui_tutorial/trigger?required_field=some%20text。若要預先定義 DAG 執行的 run id,請使用 URL 參數 run_id

  • 欄位可以是必填或選填。型別欄位預設為必填,以確保它們通過 JSON schema 驗證。若要將型別欄位設為選填,您必須允許「null」型別。

  • 沒有「section」的欄位將在預設區域中呈現。其他區段預設會摺疊。

注意

如果欄位為必填,則預設值也必須根據 schema 有效。如果 DAG 是使用 schedule=None 定義的,則參數值驗證會在觸發時進行。

如需範例,請查看提供的兩個範例 DAG:參數觸發範例 DAG參數 UI 範例 DAG

airflow/example_dags/example_params_trigger_ui.py[原始碼]

with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)

airflow/example_dags/example_params_ui_tutorial.py[原始碼]

    params={
        # Let's start simple: Standard dict values are detected from type and offered as entry form fields.
        # Detected types are numbers, text, boolean, lists and dicts.
        # Note that such auto-detected parameters are treated as optional (not required to contain a value)
        "x": 3,
        "text": "Hello World!",
        "flag": False,
        "a_simple_list": ["one", "two", "three", "actually one value is made per line"],
        # But of course you might want to have it nicer! Let's add some description to parameters.
        # Note if you can add any Markdown formatting to the description, you need to use the description_md
        # attribute.
        "most_loved_number": Param(
            42,
            type="integer",
            title="Your favorite number",
            description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
            "If you can not think of any at the moment please think of the 42 which is very famous because"
            "of the book [The Hitchhiker's Guide to the Galaxy]"
            "(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
            "The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
        ),
        # If you want to have a selection list box then you can use the enum feature of JSON schema
        "pick_one": Param(
            "value 42",
            type="string",
            title="Select one Value",
            description="You can use JSON schema enum's to generate drop down selection boxes.",
            enum=[f"value {i}" for i in range(16, 64)],
        ),

airflow/example_dags/example_params_ui_tutorial.py[原始碼]

        "required_field": Param(
            # In this example we have no default value
            # Form will enforce a value supplied by users to be able to trigger
            type="string",
            title="Required text field",
            description="This field is required. You can not submit without having text in here.",
        ),
        "optional_field": Param(
            "optional text, you can trigger also w/o text",
            type=["null", "string"],
            title="Optional text field",
            description_md="This field is optional. As field content is JSON schema validated you must "
            "allow the `null` type.",
        ),

airflow/example_dags/example_params_ui_tutorial.py[原始碼]

    @task(task_display_name="Show used parameters")
    def show_params(**kwargs) -> None:
        params: ParamsDict = kwargs["params"]
        print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")

    show_params()
../_images/trigger-dag-tutorial-form.png

2.7.0 版本新增:即使未使用組態開關 webserver.show_trigger_form_if_no_params 定義任何參數,也可以強制顯示觸發表單。

2.8.0 版本變更:預設情況下,不允許自訂 HTML,以防止注入腳本或其他惡意 HTML 程式碼。如果您信任您的 DAG 作者,您可以透過將組態項目 webserver.allow_raw_html_descriptions 設定為 True 來變更參數描述的信任層級以允許原始 HTML。使用預設設定時,所有 HTML 都將顯示為純文字。這與先前使用屬性 description_html 啟用豐富格式的功能相關,該功能現在已由屬性 description_md 取代。使用屬性 custom_html_form 的自訂表單元素允許 DAG 作者指定原始 HTML 表單範本。自 2.8.0 版本起,這些自訂 HTML 表單元素已棄用。

停用運行時參數修改

在觸發 DAG 時更新參數的功能取決於旗標 core.dag_run_conf_overrides_params。將此組態設定為 False 將有效地將您的預設參數變成常數。

此條目是否有幫助?