升級 Airflow® 到較新版本

為何您需要升級

較新的 Airflow 版本可能包含資料庫遷移,因此您必須執行 airflow db migrate 以使用您要升級到的 Airflow 版本中的結構描述變更來遷移您的資料庫。別擔心,即使沒有遷移要執行,執行此命令也是安全的。

Airflow 版本 x 和 y 之間有哪些變更?

發布說明 列出了任何給定 Airflow 版本中包含的變更。

升級準備 — 備份資料庫

強烈建議在任何遷移之前備份您的 metadata 資料庫。如果您的資料庫沒有「熱備份」功能,您應該在關閉 Airflow 實例後執行備份,以確保您的資料庫備份是一致的。如果您沒有進行備份且遷移失敗,您可能會最終處於半遷移狀態,而從備份還原資料庫並重複遷移可能是唯一簡單的解決方案。例如,這可能是由於您的 CLI 和資料庫之間在遷移過程中網路連線中斷所導致,因此進行備份是避免此類問題的重要預防措施。

何時需要升級

如果您有基於 virtualenv 或 Docker 容器的自訂部署,您通常需要手動執行資料庫遷移作為升級過程的一部分。

在某些情況下,升級會自動發生 — 這取決於您的部署中,升級是否內建為安裝後動作。例如,當您使用 Apache Airflow 的 Helm Chart 並啟用升級後 hook 時,資料庫升級會在安裝新軟體後立即自動發生。同樣地,當您選擇透過其 UI 升級 Airflow 時,所有 Airflow-As-A-Service 解決方案都會為您自動執行升級。

如何升級

重新安裝 Apache Airflow®,指定所需的新版本。

若要升級引導的本機實例,您可以在重新執行安裝命令之前,將 AIRFLOW_VERSION 環境變數設定為預期的版本。依修補程式版本逐步升級:例如,如果從 2.8.2 版升級到 2.8.4 版,請先升級到 2.8.3 版。如需更詳細的指南,請參閱 快速開始

若要升級 PyPI 套件,請在您的環境中重新執行 pip install 命令,並使用所需的版本作為約束。如需更詳細的指南,請參閱 從 PyPI 安裝

為了手動遷移資料庫,您應該在您的環境中執行 airflow db migrate 命令。它可以從您的虛擬環境或容器中執行,這些容器讓您可以存取 Airflow CLI 使用命令列介面 和資料庫。

離線 SQL 遷移腳本

如果您想要離線執行升級腳本,您可以使用 -s--show-sql-only 旗標來取得將會執行的 SQL 陳述式。您也可以使用 --from-version 旗標指定起始 Airflow 版本,並使用 -n--to-version 旗標指定結束 Airflow 版本。從 Airflow 2.0.0 開始,Postgres 和 MySQL 支援此功能。

Airflow 2.7.0 或更高版本的範例用法

airflow db migrate -s --from-version "2.4.3" -n "2.7.3" airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"

注意

airflow db upgrade 已由 airflow db migrate 取代,自 Airflow 2.7.0 版本起,前者已被棄用。

處理遷移問題

MySQL 資料庫中錯誤的編碼

如果您使用舊的 Airflow 1.10 作為最初手動建立或使用舊版 MySQL 建立的資料庫,根據您的資料庫的原始字元集,您在遷移到較新版本的 Airflow 時可能會遇到問題,並且您的遷移可能會因奇怪的錯誤(「索引鍵大小過大」、「遺失索引」等)而失敗。下一章將說明如何手動修正此問題。

為何您可能會收到錯誤?MySQL 8 資料庫建議的字元集/定序分別為 utf8mb4utf8mb4_bin。但是,這在不同版本的 MySQL 中一直在變化,您可能會有使用不同字元集自訂建立的資料庫。如果您的資料庫是使用舊版 Airflow 或 MySQL 建立的,則在建立資料庫時或在遷移期間,編碼可能已錯誤。

不幸的是,MySQL 限制了索引鍵大小,並且使用 utf8mb4,Airflow 索引鍵大小可能對於 MySQL 來說過大而無法處理。因此,在 Airflow 中,我們強制所有「ID」索引鍵使用 utf8 字元集(這相當於 MySQL 8 中的 utf8mb3)。這限制了索引的大小,以便 MySQL 可以處理它們。

以下是您可以在嘗試遷移之前遵循的步驟來修正它(但如果您知道自己在做什麼,您也可以選擇以自己的方式執行)。

熟悉 Airflow 的內部資料庫結構,您可以在 資料庫的 ERD 結構描述 中找到,以及您可以在 資料庫遷移參考 中找到的遷移列表。

  1. 備份您的資料庫,以便您可以在發生錯誤時還原它。

  2. 檢查您的哪些表格需要修正。查看這些表格

SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;

請務必複製輸出。您將在最後一步中需要它。您的 dag_idrun_idtask_idkey 欄位應明確設定為 utf8utf8mb3 字元集,類似於

``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,  # correct

``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL,  # correct

問題是如果您的欄位沒有編碼

``task_id`` varchar(250),  # wrong !!

或僅將定序設定為 utf8mb4

``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL,  # wrong !!

或將字元集和定序設定為 utf8mb4

``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,  # wrong !!

您需要修正那些具有錯誤字元集/定序設定的欄位。

3. 移除您需要修改的表格的外鍵索引(您不需要移除所有外鍵索引 — 僅針對您需要修改的表格執行此操作)。您將需要在最後一步中重新建立它們(這就是為什麼您需要保留步驟 2 中的 SHOW CREATE TABLE 輸出)。

ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;

4. 修改您的 ID 欄位以具有正確的字元集/編碼。僅針對具有錯誤編碼的欄位執行此操作(以下是您可能需要使用的所有潛在命令)

ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
  1. 重新建立步驟 3 中移除的外鍵。

針對您移除的所有索引重複此操作。請注意,根據您擁有的 Airflow 版本,索引可能會略有不同(例如,map_index 是在 2.3.0 中新增的),但如果您保留步驟 2 中準備好的 SHOW CREATE TABLE 輸出,您將找到正確的 CONSTRAINT_NAMECONSTRAINT 來使用。

# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>

這應該使資料庫達到您可以執行遷移到新 Airflow 版本的狀態。

升級後警告

通常,您只需要成功執行 airflow db migrate 命令,這就夠了。但是,在某些情況下,遷移可能會在您的資料庫中找到一些舊的、過時的且可能錯誤的資料,並將其移至另一個表格。在這種情況下,您可能會在您的 Web 伺服器 UI 中收到關於找到資料的警告。

您可能會看到的典型訊息

Airflow 在 metadata 資料庫的 <原始表格> 表格中發現不相容的資料,並且在資料庫遷移以進行升級期間已將其移至 <新表格>。請檢查移動的資料,以決定是否需要保留它們,並手動移除 <新表格> 表格以消除此警告。

當您看到此類訊息時,表示您的某些資料已損壞,您應該檢查它以確定是否要保留或刪除某些資料。最有可能的是,資料已損壞並且是某些錯誤的殘留物,可以安全地刪除 — 因為這些資料在 Airflow 中無論如何都不可見且無用。但是,如果您出於特定稽核或歷史原因需要,您可以選擇將其儲存在某處。除非您有特定原因要保留資料,否則最有可能刪除它是您的最佳選擇。

有很多方法可以檢查和刪除資料 — 如果您可以使用自己的工具(通常是顯示資料庫物件的圖形工具)直接存取資料庫,則可以使用這些工具移除此類表格或重新命名或將其移動到另一個資料庫。如果您沒有此類工具,您可以使用 airflow db shell 命令 — 這會將您置於您資料庫的資料庫 shell 工具中,您將能夠檢查和刪除表格。

如何使用 Kubernetes 移除表格

  1. Exec 進入任何 Airflow Pod — Web 伺服器或排程器:kubectl exec -it <your-webserver-pod> python

  2. 在 python shell 中執行以下命令

from airflow.settings import Session

session = Session()
session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
session.commit()

請將範例中的 <table> 替換為警告訊息中列印的實際表格名稱。

檢查表格

SELECT * FROM <table>;

刪除表格

DROP TABLE <table>;

遷移最佳實務

根據您的資料庫大小和實際遷移,遷移可能需要相當長的時間,因此,如果您有很長的歷史記錄和大型資料庫,建議先製作資料庫副本,並執行測試遷移以評估遷移需要多長時間。通常,「主要」升級可能需要更長的時間,因為新增功能有時需要重新建構資料庫。

此條目是否有幫助?