DAG 檔案處理

DAG 檔案處理指的是將 DAGs 資料夾中包含的 Python 檔案轉換為 DAG 物件的過程,這些 DAG 物件包含要排程的任務。

DAG 檔案處理涉及兩個主要組件。 DagFileProcessorManager 是一個執行無限迴圈的程序,用於確定哪些檔案需要處理,而 DagFileProcessorProcess 是一個單獨的程序,用於將單個檔案轉換為一個或多個 DAG 物件。

DagFileProcessorManager 執行使用者程式碼。 因此,您可以決定在與排程器程序不同的主機中將其作為獨立程序執行。 如果您決定將其作為獨立程序執行,則需要設定此組態:AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True 並執行 airflow dag-processor CLI 命令,否則,啟動排程器程序 (airflow scheduler) 也會啟動 DagFileProcessorManager

../_images/dag_file_processing_diagram.png

DagFileProcessorManager 具有以下步驟

  1. 檢查新檔案:如果自上次 DAG 刷新以來經過的時間 > dag_dir_list_interval,則更新檔案路徑列表

  2. 排除最近處理的檔案:排除處理時間比 min_file_process_interval 更近且未修改的檔案

  3. 佇列檔案路徑:將發現的檔案新增至檔案路徑佇列

  4. 處理檔案:為每個檔案啟動一個新的 DagFileProcessorProcess,最多為 parsing_processes

  5. 收集結果:從任何已完成的 DAG 處理器收集結果

  6. 記錄統計資訊:印出統計資訊並發出 dag_processing.total_parse_time

DagFileProcessorProcess 具有以下步驟

  1. 處理檔案:整個過程必須在 dag_file_processor_timeout 內完成

  2. DAG 檔案以 Python 模組形式載入:必須在 dagbag_import_timeout 內完成

  3. 處理模組:在 Python 模組中尋找 DAG 物件

  4. 返回 DagBag:向 DagFileProcessorManager 提供已發現 DAG 物件的列表

這個條目有幫助嗎?