參數¶
參數讓您能夠為任務提供運行時組態。您可以在 DAG 程式碼中設定預設參數,並在觸發 DAG 時提供額外參數,或覆寫參數值。Param
值會使用 JSON Schema 進行驗證。對於排程的 DAG 執行,會使用預設的 Param
值。
定義的參數也用於在手動觸發時呈現良好的 UI。當您手動觸發 DAG 時,您可以在 dagrun 開始之前修改其參數。如果使用者提供的值未通過驗證,Airflow 會顯示警告,而不是建立 dagrun。
DAG 層級參數¶
若要將參數新增至 DAG
,請使用 params
kwarg 初始化它。使用字典,將參數名稱對應到 Param
或指示參數預設值的物件。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
with DAG(
"the_dag",
params={
"x": Param(5, type="integer", minimum=3),
"my_int_param": 6
},
) as dag:
@task.python
def example_task(params: dict):
# This will print the default value, 6:
dag.log.info(dag.params['my_int_param'])
# This will print the manually-provided value, 42:
dag.log.info(params['my_int_param'])
# This will print the default value, 5, since it wasn't provided manually:
dag.log.info(params['x'])
example_task()
if __name__ == "__main__":
dag.test(
run_conf={"my_int_param": 42}
)
注意
DAG 層級參數是傳遞給任務的預設值。這些不應與透過 UI 表單或 CLI 手動提供的值混淆,這些值僅存在於 DagRun
和 TaskInstance
的上下文中。這種區別對於 TaskFlow DAG 至關重要,TaskFlow DAG 可能包含 with DAG(...) as dag:
區塊內的邏輯。在這種情況下,使用者可能會嘗試使用 dag
物件存取手動提供的參數值,但這只會包含預設值。為了確保存取手動提供的值,請在您的任務中使用範本變數,例如 params
或 ti
。
任務層級參數¶
您也可以將參數新增至個別任務。
def print_my_int_param(params):
print(params.my_int_param)
PythonOperator(
task_id="print_my_int_param",
params={"my_int_param": 10},
python_callable=print_my_int_param,
)
任務層級參數優先於 DAG 層級參數,而使用者提供的參數(在觸發 DAG 時)優先於任務層級參數。
在任務中引用參數¶
參數可以在 範本字串 中的 params
下引用。例如
PythonOperator(
task_id="from_template",
op_args=[
"{{ params.my_int_param + 10 }}",
],
python_callable=(
lambda my_int_param: print(my_int_param)
),
)
即使參數可以使用各種型別,範本的預設行為是為您的任務提供字串。您可以透過在初始化 DAG
時設定 render_template_as_native_obj=True
來變更此行為。
with DAG(
"the_dag",
params={"my_int_param": Param(5, type="integer", minimum=3)},
render_template_as_native_obj=True
):
這樣,當 Param
的型別提供給您的任務時,將會受到尊重
# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
task_id="template_type",
op_args=[
"{{ params.my_int_param }}",
],
python_callable=(
lambda my_int_param: print(type(my_int_param))
),
)
存取參數的另一種方式是透過任務的 context
kwarg。
def print_my_int_param(**context):
print(context["params"]["my_int_param"])
PythonOperator(
task_id="print_my_int_param",
python_callable=print_my_int_param,
params={"my_int_param": 12345},
)
JSON Schema 驗證¶
Param
使用 JSON Schema,因此您可以使用 https://json-schema.dev.org.tw/draft/2020-12/json-schema-validation.html 中提到的完整 JSON Schema 規格來定義 Param
物件。
with DAG(
"my_dag",
params={
# an int with a default value
"my_int_param": Param(10, type="integer", minimum=0, maximum=20),
# a required param which can be of multiple types
# a param must have a default value
"multi_type_param": Param(5, type=["null", "number", "string"]),
# an enum param, must be one of three values
"enum_param": Param("foo", enum=["foo", "bar", 42]),
# a param which uses json-schema formatting
"email": Param(
default="example@example.com",
type="string",
format="idn-email",
minLength=5,
maxLength=255,
),
},
):
注意
如果為 DAG 定義了 schedule
,則具有預設值的參數必須有效。這會在 DAG 剖析期間驗證。如果 schedule=None
,則參數不會在 DAG 剖析期間驗證,而是在觸發 DAG 之前驗證。這在 DAG 作者不想提供預設值,但想要強制使用者在觸發時提供有效參數的情況下很有用。
使用參數提供觸發 UI 表單¶
2.6.0 版本新增。
DAG
層級參數用於呈現使用者友善的觸發表單。當使用者點擊「觸發 DAG」按鈕時,會提供此表單。
觸發 UI 表單是根據預先定義的 DAG 參數呈現的。如果 DAG 沒有定義任何參數,則會略過觸發表單。表單元素可以使用 Param
類別定義,而屬性定義表單欄位的顯示方式。
觸發 UI 表單中支援以下功能
來自頂層 DAG 參數的直接純量值(布林值、整數、字串、列表、字典)會自動裝箱到
Param
物件中。從原生 Python 資料型別自動偵測type
屬性。因此,這些簡單型別會呈現為對應的欄位型別。參數名稱用作標籤,且不進行進一步驗證,所有值都視為選填。如果您使用
Param
類別作為參數值的定義,則可以新增以下屬性Param
屬性title
用於呈現輸入框的表單欄位標籤。如果未定義title
,則改為使用參數名稱/鍵。Param
屬性description
會在輸入欄位下方呈現為灰色幫助文字。如果您想要提供特殊格式或連結,則需要使用 Param 屬性description_md
。請參閱教學課程 DAG 參數 UI 範例 DAG 以取得範例。Param
屬性type
會影響欄位的呈現方式。支援以下型別參數型別
表單元素型別
其他支援的屬性
範例
string
產生單行文字方塊或文字區域以編輯文字。
minLength
:最小文字長度maxLength
:最大文字長度format="date"
:產生日期選擇器與日曆彈出視窗format="date-time"
:產生日期和時間選擇器與日曆彈出視窗format="time"
:產生時間選擇器format="multiline"
:產生多行文字區域values_display={"a": "Alpha", "b": "Beta"}
:對於透過enum
產生的選取下拉式選單,您可以新增屬性values_display
,其中包含 dict 並將資料值對應到顯示標籤。examples=["One", "Two", "Three"]
:如果您想要呈現值的建議(不將使用者限制為固定的enum
如上所述),您可以使用examples
,它是一個項目列表。
Param("default", type="string", maxLength=10)
Param(f"{datetime.date.today()}", type="string", format="date")
number
或integer
產生一個欄位,限制僅新增數值。HTML 瀏覽器通常也會在右側新增微調器以增加或減少值。integer
僅允許整數數字,number
也允許小數值。minimum
:最小值maximum
:最大值
Param(42, type="integer", minimum=14, multipleOf=7)
boolean
產生一個切換按鈕,用作True
或False
。無。
Param(True, type="boolean")
array
產生一個 HTML 多行文字欄位,編輯的每一行都將成為字串陣列作為值。- 如果您新增具有列表的屬性
examples
,則會產生多值選取選項,而不是自由文字欄位。 values_display={"a": "Alpha", "b": "Beta"}
:對於多值選取examples
,您可以新增屬性values_display
,其中包含 dict 和對應資料值以顯示標籤。- 如果您新增具有字典的屬性
items
,其中包含欄位type
,其值不是「string」,則會產生 JSON 輸入欄位,以用於更多陣列型別和其他型別驗證,如
Param(["a", "b", "c"], type="array")
Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])
Param(["one@example.com", "two@example.com"], type="array", items={"type": "string", "format": "idn-email"})
object
產生具有文字驗證的 JSON 輸入欄位。Param({"key": "value"}, type=["object", "null"])
null
指定不預期任何內容。單獨使用沒有太大意義,但對於型別組合很有用,例如type=["null", "string"]
,因為type 屬性也接受型別列表。預設情況下,如果您指定型別,則欄位將會變成必填欄位,並帶有輸入 - 因為 JSON 驗證。如果您想要讓欄位值變成僅選填新增,您必須允許JSON schema 驗證允許 null值。Param(None, type=["null", "string"])
如果表單欄位留空,則會將
None
值傳遞到參數字典。表單欄位會按照 DAG 中
params
定義的順序呈現。如果您想要將區段新增至表單,請將屬性
section
新增至每個欄位。文字將用作區段標籤。沒有section
的欄位將會在預設區域中呈現。其他區段預設會摺疊。如果您想要讓參數不顯示,請使用
const
屬性。這些參數將會提交,但在表單中隱藏。const
值必須與預設值相符,才能通過 JSON Schema 驗證。在表單底部,可以展開產生的 JSON 組態。如果您想要手動變更值,可以調整 JSON 組態。當表單欄位變更時,變更會被覆寫。
若要在發布觸發表單的連結時預先填入表單中的值,您可以呼叫觸發 URL
/dags/<dag_name>/trigger
,並將查詢參數以name=value
形式新增至 URL,例如/dags/example_params_ui_tutorial/trigger?required_field=some%20text
。若要預先定義 DAG 執行的 run id,請使用 URL 參數run_id
。欄位可以是必填或選填。型別欄位預設為必填,以確保它們通過 JSON schema 驗證。若要將型別欄位設為選填,您必須允許「null」型別。
沒有「section」的欄位將在預設區域中呈現。其他區段預設會摺疊。
注意
如果欄位為必填,則預設值也必須根據 schema 有效。如果 DAG 是使用 schedule=None
定義的,則參數值驗證會在觸發時進行。
如需範例,請查看提供的兩個範例 DAG:參數觸發範例 DAG 和 參數 UI 範例 DAG。
with DAG(
dag_id=Path(__file__).stem,
dag_display_name="Params Trigger UI",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
tags=["example", "params"],
params={
"names": Param(
["Linda", "Martha", "Thomas"],
type="array",
description="Define the list of names for which greetings should be generated in the logs."
" Please have one name per line.",
title="Names to greet",
),
"english": Param(True, type="boolean", title="English"),
"german": Param(True, type="boolean", title="German (Formal)"),
"french": Param(True, type="boolean", title="French"),
},
) as dag:
@task(task_id="get_names", task_display_name="Get names")
def get_names(**kwargs) -> list[str]:
params: ParamsDict = kwargs["params"]
if "names" not in params:
print("Uuups, no names given, was no UI used to trigger?")
return []
return params["names"]
@task.branch(task_id="select_languages", task_display_name="Select languages")
def select_languages(**kwargs) -> list[str]:
params: ParamsDict = kwargs["params"]
selected_languages = []
for lang in ["english", "german", "french"]:
if params[lang]:
selected_languages.append(f"generate_{lang}_greeting")
return selected_languages
@task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
def generate_english_greeting(name: str) -> str:
return f"Hello {name}!"
@task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
def generate_german_greeting(name: str) -> str:
return f"Sehr geehrter Herr/Frau {name}."
@task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
def generate_french_greeting(name: str) -> str:
return f"Bonjour {name}!"
@task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
def print_greetings(greetings1, greetings2, greetings3) -> None:
for g in greetings1 or []:
print(g)
for g in greetings2 or []:
print(g)
for g in greetings3 or []:
print(g)
if not (greetings1 or greetings2 or greetings3):
print("sad, nobody to greet :-(")
lang_select = select_languages()
names = get_names()
english_greetings = generate_english_greeting.expand(name=names)
german_greetings = generate_german_greeting.expand(name=names)
french_greetings = generate_french_greeting.expand(name=names)
lang_select >> [english_greetings, german_greetings, french_greetings]
results_print = print_greetings(english_greetings, german_greetings, french_greetings)
params={
# Let's start simple: Standard dict values are detected from type and offered as entry form fields.
# Detected types are numbers, text, boolean, lists and dicts.
# Note that such auto-detected parameters are treated as optional (not required to contain a value)
"x": 3,
"text": "Hello World!",
"flag": False,
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
# But of course you might want to have it nicer! Let's add some description to parameters.
# Note if you can add any Markdown formatting to the description, you need to use the description_md
# attribute.
"most_loved_number": Param(
42,
type="integer",
title="Your favorite number",
description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
"If you can not think of any at the moment please think of the 42 which is very famous because"
"of the book [The Hitchhiker's Guide to the Galaxy]"
"(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
"The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
),
# If you want to have a selection list box then you can use the enum feature of JSON schema
"pick_one": Param(
"value 42",
type="string",
title="Select one Value",
description="You can use JSON schema enum's to generate drop down selection boxes.",
enum=[f"value {i}" for i in range(16, 64)],
),
"required_field": Param(
# In this example we have no default value
# Form will enforce a value supplied by users to be able to trigger
type="string",
title="Required text field",
description="This field is required. You can not submit without having text in here.",
),
"optional_field": Param(
"optional text, you can trigger also w/o text",
type=["null", "string"],
title="Optional text field",
description_md="This field is optional. As field content is JSON schema validated you must "
"allow the `null` type.",
),
@task(task_display_name="Show used parameters")
def show_params(**kwargs) -> None:
params: ParamsDict = kwargs["params"]
print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
show_params()

2.7.0 版本新增:即使未使用組態開關 webserver.show_trigger_form_if_no_params
定義任何參數,也可以強制顯示觸發表單。
2.8.0 版本變更:預設情況下,不允許自訂 HTML,以防止注入腳本或其他惡意 HTML 程式碼。如果您信任您的 DAG 作者,您可以透過將組態項目 webserver.allow_raw_html_descriptions
設定為 True
來變更參數描述的信任層級以允許原始 HTML。使用預設設定時,所有 HTML 都將顯示為純文字。這與先前使用屬性 description_html
啟用豐富格式的功能相關,該功能現在已由屬性 description_md
取代。使用屬性 custom_html_form
的自訂表單元素允許 DAG 作者指定原始 HTML 表單範本。自 2.8.0 版本起,這些自訂 HTML 表單元素已棄用。
停用運行時參數修改¶
在觸發 DAG 時更新參數的功能取決於旗標 core.dag_run_conf_overrides_params
。將此組態設定為 False
將有效地將您的預設參數變成常數。