DAG 序列化

為了使 Airflow Webserver 成為無狀態的,Airflow >= 1.10.7 支援 DAG 序列化和 DB 持久性。從 Airflow 2.0.0 開始,排程器也使用序列化 DAG 以保持一致性並做出排程決策。

../_images/dag_serialization.png

在沒有 DAG 序列化與 DB 持久性的情況下,Webserver 和排程器都需要存取 DAG 檔案。排程器和 Webserver 都會解析 DAG 檔案。

透過DAG 序列化,我們的目標是將 Webserver 從 DAG 解析中解耦,這將使 Webserver 非常輕量。

如上圖所示,當使用此功能時,排程器中的 DagFileProcessorProcess 會解析 DAG 檔案,將它們序列化為 JSON 格式,並將它們以 SerializedDagModel 模型的形式儲存在 Metadata DB 中。

現在 Webserver 不必再次解析 DAG 檔案,而是讀取 JSON 格式的序列化 DAG,將它們反序列化並建立 DagBag,並使用它在 UI 中顯示。排程器也不需要實際的 DAG 來做出排程決策,而是從 Airflow 2.0.0 開始使用序列化 DAG,其中包含排程 DAG 所需的所有資訊(這是作為 排程器 HA 的一部分完成的)。

作為 DAG 序列化一部分實作的關鍵功能之一是,當 WebServer 啟動時,我們只從序列化 Dag 表格中按需載入每個 DAG,而不是載入整個 DagBag。這有助於縮短 Webserver 啟動時間並減少記憶體使用量。當您有大量 DAG 時,這種減少非常顯著。

您可以啟用將原始碼儲存在資料庫中,使 Webserver 完全獨立於 DAG 檔案。如果您的檔案嵌入在 Docker 映像檔中,或者您可以透過其他方式將它們提供給 Webserver,則這不是必要的。資料儲存在 DagCode 模型中。

最後一個要素是渲染範本欄位。當啟用序列化時,範本不會渲染到請求,而是在任務在 worker 上執行之前儲存欄位內容的副本。資料儲存在 RenderedTaskInstanceFields 模型中。為了限制資料庫的過度增長,僅保留最近的條目,較舊的條目將被清除。

注意

DAG 序列化是 Airflow 2.0+ 的嚴格要求,無法關閉。

Dag 序列化設定

airflow.cfg 中新增以下設定

[core]

# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
  • min_serialized_dag_update_interval: 此標誌設定 DB 中序列化 DAG 應更新的最小間隔(以秒為單位)。這有助於降低資料庫寫入速率。

  • min_serialized_dag_fetch_interval: 此選項控制當序列化 DAG 已載入到 Webserver 的 DagBag 中時,從 DB 重新提取序列化 DAG 的頻率。將此設定為較高值將減少 DB 的負載,但代價是可能顯示 DAG 的過時快取版本。

  • max_num_rendered_ti_fields_per_task: 此選項控制每個任務要儲存在資料庫中的渲染任務實例欄位(範本欄位)的最大數量。

  • compress_serialized_dags: 此選項控制是否將序列化 DAG 壓縮到資料庫。當您的叢集中有非常大的 DAG 時,這很有用。當 True 時,這將停用 DAG 依賴關係視圖。

如果您從 <1.10.7 更新 Airflow,請不要忘記執行 airflow db migrate

限制

  • 當使用使用者定義的篩選器和巨集時,Webserver 中的渲染視圖可能會針對尚未執行的 TI 顯示不正確的結果,因為它可能正在使用 Webserver 無法存取的外部模組。在這種情況下,請使用 airflow tasks render CLI 命令來偵錯或測試範本欄位的渲染。一旦任務執行開始,渲染的範本欄位將儲存在 DB 的單獨表格中,之後 Webserver(渲染視圖標籤)中將顯示正確的值。

注意

您需要 Airflow >= 1.10.10 才能實現完全無狀態的 Webserver。Airflow 1.10.7 至 1.10.9 在某些情況下需要存取 DAG 檔案。更多資訊:https://airflow.dev.org.tw/docs/1.10.9/dag-serialization.html#limitations

使用不同的 JSON 函式庫

若要使用不同的 JSON 函式庫而不是標準的 json 函式庫,例如 ujson,您需要在本機 Airflow 設定 (airflow_local_settings.py) 檔案中定義一個 json 變數,如下所示

import ujson

json = ujson

請參閱 設定本機設定 以了解如何設定本機設定的詳細資訊。

此條目是否有幫助?