DAG 檔案處理¶
DAG 檔案處理指的是將 DAGs 資料夾中包含的 Python 檔案轉換為 DAG 物件的過程,這些 DAG 物件包含要排程的任務。
DAG 檔案處理涉及兩個主要組件。 DagFileProcessorManager
是一個執行無限迴圈的程序,用於確定哪些檔案需要處理,而 DagFileProcessorProcess
是一個單獨的程序,用於將單個檔案轉換為一個或多個 DAG 物件。
DagFileProcessorManager
執行使用者程式碼。 因此,您可以決定在與排程器程序不同的主機中將其作為獨立程序執行。 如果您決定將其作為獨立程序執行,則需要設定此組態:AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True
並執行 airflow dag-processor
CLI 命令,否則,啟動排程器程序 (airflow scheduler
) 也會啟動 DagFileProcessorManager
。

DagFileProcessorManager
具有以下步驟
檢查新檔案:如果自上次 DAG 刷新以來經過的時間 > dag_dir_list_interval,則更新檔案路徑列表
排除最近處理的檔案:排除處理時間比 min_file_process_interval 更近且未修改的檔案
佇列檔案路徑:將發現的檔案新增至檔案路徑佇列
處理檔案:為每個檔案啟動一個新的
DagFileProcessorProcess
,最多為 parsing_processes 個收集結果:從任何已完成的 DAG 處理器收集結果
記錄統計資訊:印出統計資訊並發出
dag_processing.total_parse_time
DagFileProcessorProcess
具有以下步驟
處理檔案:整個過程必須在 dag_file_processor_timeout 內完成
DAG 檔案以 Python 模組形式載入:必須在 dagbag_import_timeout 內完成
處理模組:在 Python 模組中尋找 DAG 物件
返回 DagBag:向
DagFileProcessorManager
提供已發現 DAG 物件的列表