動態 DAG 生成¶
本文档描述了如何创建结构动态生成的 DAG,但 DAG 中任务的数量在 DAG 运行之间不会改变。如果您想实现一个 DAG,其中任务(或 Airflow 2.6 中的任务组)的数量可以根据先前任务的输出/结果而变化,请参阅 动态任务映射。
注意
生成任务和任务组的一致顺序
在所有动态生成 DAG 的情况下,您都应确保每次生成 DAG 时,任务和任务组都以一致的顺序生成,否则您最终可能会遇到每次刷新页面时,任务和任务组在网格视图中更改顺序的情况。例如,可以通过在数据库查询中使用稳定的排序机制或在 Python 中使用 sorted()
函数来实现这一点。
具有环境變數的動態 DAG¶
如果您想使用變數來設定您的程式碼,您應該始終在頂層程式碼中使用 環境變數,而不是 Airflow 變數。在頂層程式碼中使用 Airflow 變數會建立與 Airflow 元資料庫的連線以提取值,這可能會減慢解析速度並給資料庫帶來額外的負載。請參閱 關於 Airflow 變數的最佳實務,以使用 Jinja 範本在您的 DAG 中充分利用 Airflow 變數。
例如,您可以為您的生產和開發環境設定不同的 DEPLOYMENT
變數。DEPLOYMENT
變數可以在您的生產環境中設定為 PROD
,在您的開發環境中設定為 DEV
。然後,您可以根據環境變數的值,在生產和開發環境中以不同的方式建構您的 DAG。
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
使用嵌入式元資料產生 Python 程式碼¶
您可以外部產生包含元資料作為可匯入常數的 Python 程式碼。然後,您的 DAG 可以直接匯入此類常數,並用於建構物件並建立依賴關係。這使得從多個 DAG 匯入此類程式碼變得容易,而無需尋找、載入和解析儲存在常數中的元資料 - 這由 Python 直譯器在處理「匯入」語句時自動完成。乍聽之下這很奇怪,但產生此類程式碼並確保它是您可以從 DAG 匯入的有效 Python 程式碼非常容易。
例如,假設您動態產生(在您的 DAG 資料夾中)my_company_utils/common.py
檔案
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
然後,您可以像這樣在所有 DAG 中匯入和使用 ALL_TASKS
常數
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
for task in ALL_TASKS:
# create your operators and relations here
...
別忘了,在這種情況下,您需要在 my_company_utils
資料夾中新增空的 __init__.py
檔案,並且您應該將 my_company_utils/.*
行新增到 .airflowignore
檔案(如果使用 regexp 忽略語法),以便排程器在尋找 DAG 時忽略整個資料夾。
從結構化資料檔案以外部組態建立動態 DAG¶
如果您需要使用更複雜的元資料來準備您的 DAG 結構,並且您希望將資料保留在結構化的非 Python 格式中,則您應該將資料匯出到 DAG 資料夾中的檔案並將其推送到 DAG 資料夾,而不是嘗試透過 DAG 的頂層程式碼提取資料 - 原因在父 頂層 Python 程式碼 中說明。
元資料應與 DAG 一起以方便的檔案格式(JSON、YAML 格式是不錯的選擇)匯出並儲存在 DAG 資料夾中。理想情況下,元資料應與您從中載入它的 DAG 檔案模組在同一個套件/資料夾中發布,因為這樣您就可以輕鬆地在您的 DAG 中找到元資料檔案的位置。可以使用包含 DAG 的模組的 __file__
屬性找到要讀取的檔案位置
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
註冊動態 DAG¶
當您使用 @dag
裝飾器或 with DAG(..)
上下文管理器時,您可以動態生成 DAG,Airflow 將自動註冊它們。
from datetime import datetime
from airflow.decorators import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
dynamic_generated_dag()
下面的程式碼將為每個組態生成一個 DAG:dynamic_generated_dag_config1
和 dynamic_generated_dag_config2
。它們中的每一個都可以使用相關組態單獨執行。
如果您不希望 DAG 自動註冊,您可以在您的 DAG 上設定 auto_register=False
來停用此行為。
版本 2.4 變更: 從 2.4 版開始,透過呼叫 @dag
裝飾函數建立的 DAG(或在 with DAG(...)
上下文管理器中使用的 DAG)會自動註冊,不再需要儲存在全域變數中。
最佳化執行期間的 DAG 解析延遲¶
版本 2.4 新增。
這是一個 實驗性功能。
有時,當您從單個 DAG 檔案生成大量動態 DAG 時,可能會在任務執行期間解析 DAG 檔案時導致不必要的延遲。影響是在任務開始之前的延遲。
為什麼會這樣?您可能沒有意識到,但在您的任務執行之前,Airflow 會解析 DAG 來源的 Python 檔案。
Airflow 排程器(或更確切地說是 DAG 檔案處理器)需要載入完整的 DAG 檔案才能處理所有元資料。但是,任務執行只需要單個 DAG 物件即可執行任務。了解這一點後,我們可以在執行任務時跳過不必要的 DAG 物件的生成,從而縮短解析時間。當生成的 DAG 數量很大時,此最佳化最有效。
您可以採取一種實驗性方法來最佳化此行為。請注意,並非總是可以使用它(例如,當後續 DAG 的生成取決於先前的 DAG 時)或當您的 DAG 生成存在一些副作用時。此外,下面的程式碼片段非常複雜,雖然我們已經對其進行了測試,並且在大多數情況下都有效,但可能在某些情況下,目前解析的 DAG 的檢測將失敗,並且它將恢復為建立所有 DAG 或失敗。請謹慎使用此解決方案並徹底測試它。
您可獲得的效能提升的一個很好的例子在 Airflow 的 Magic Loop 部落格文章中顯示,該文章描述了如何在任務執行期間將解析時間從 120 秒縮短到 200 毫秒。(該範例是在 Airflow 2.4 之前編寫的,因此它使用了 Airflow 未記載的行為。)
在 Airflow 2.4 中,您可以改用 get_parsing_context()
方法以記載且可預測的方式檢索當前上下文。
在迭代要為其生成 DAG 的事物集合時,您可以使用上下文來確定是否需要生成所有 DAG 物件(在 DAG 檔案處理器中解析時),還是僅生成單個 DAG 物件(在執行任務時)。
get_parsing_context()
傳回當前解析上下文。上下文是 AirflowParsingContext
的類型,並且在僅需要單個 DAG/任務的情況下,它包含設定的 dag_id
和 task_id
欄位。在需要「完整」解析的情況下(例如在 DAG 檔案處理器中),上下文的 dag_id
和 task_id
設定為 None
。
from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context
current_dag_id = get_parsing_context().dag_id
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG
with DAG(dag_id=dag_id, ...):
...