設定資料庫後端

Airflow 的建置目的在於使用 SqlAlchemy 與其metadata互動。

以下文件說明了資料庫引擎組態、與 Airflow 搭配使用時的必要組態變更,以及連線到這些資料庫的 Airflow 組態變更。

選擇資料庫後端

如果您想要真正試用 Airflow,您應該考慮設定 PostgreSQLMySQL 的資料庫後端。 預設情況下,Airflow 使用 SQLite,僅適用於開發目的。

Airflow 支援以下資料庫引擎版本,因此請確認您擁有的版本。 舊版本可能不支援所有 SQL 陳述式。

  • PostgreSQL: 12、13、14、15、16

  • MySQL: 8.0、Innovation

  • SQLite: 3.15.0+

如果您計畫執行多個排程器,則必須符合其他需求。 如需詳細資訊,請參閱 排程器 HA 資料庫需求

警告

儘管 MariaDB 和 MySQL 之間有很大的相似之處,但我們不支援 MariaDB 作為 Airflow 的後端。 MariaDB 和 MySQL 之間存在已知問題(例如索引處理),我們也不會在 Maria DB 上測試我們的移轉腳本或應用程式執行。 我們知道有人將 MariaDB 用於 Airflow,這為他們帶來了很多操作上的麻煩,因此我們強烈建議不要嘗試使用 MariaDB 作為後端,使用者也不能期望社群對其提供任何支援,因為嘗試將 MariaDB 用於 Airflow 的使用者數量非常少。

資料庫 URI

Airflow 使用 SQLAlchemy 連線到資料庫,這需要您設定資料庫 URL。 您可以在 [database] 區段中的 sql_alchemy_conn 選項中執行此操作。 使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 環境變數設定此選項也很常見。

注意

如需設定組態的詳細資訊,請參閱 設定組態選項

如果您想要檢查目前的值,可以使用 airflow config get-value database sql_alchemy_conn 命令,如下例所示。

$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db

確切的格式說明在 SQLAlchemy 文件中說明,請參閱 資料庫 Urls。 我們也會在下面向您展示一些範例。

設定 SQLite 資料庫

SQLite 資料庫可用於執行 Airflow 以進行開發目的,因為它不需要任何資料庫伺服器(資料庫儲存在本機檔案中)。 使用 SQLite 資料庫有很多限制(例如,它僅適用於循序執行器),絕不應在生產環境中使用。

執行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本為 3.15.0。 有些舊系統預設安裝了較早版本的 sqlite,對於這些系統,您需要手動升級 SQLite 才能使用高於 3.15.0 的版本。 請注意,這不是 python library 版本,而是需要升級的 SQLite 系統層級應用程式。 安裝 SQLite 的方式有很多種,您可以在 SQLite 官方網站以及特定於您的作業系統發行版本的文件中找到一些相關資訊。

疑難排解

有時即使您將 SQLite 升級到較高版本,並且您的本機 python 報告了較高版本,Airflow 使用的 python 解譯器可能仍然使用 LD_LIBRARY_PATH 中可用的較舊版本,該路徑是為用於啟動 Airflow 的 python 解譯器設定的。

您可以執行此檢查來確定解譯器使用的版本

root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>

但請注意,為您的 Airflow 部署設定環境變數可能會變更首先找到的 SQLite 程式庫,因此您可能需要確保「夠高」版本的 SQLite 是系統中安裝的唯一版本。

sqlite 資料庫的 URI 範例

sqlite:////home/airflow/airflow.db

在 AmazonLinux AMI 或容器映像上升級 SQLite

AmazonLinux SQLite 只能使用原始碼儲存庫升級到 v3.7。 Airflow 需要 v3.15 或更高版本。 使用以下指示設定具有最新 SQLite3 的基礎映像 (或 AMI)

先決條件:您需要 wgettargzipgccmakeexpect 才能使升級程序正常運作。

yum -y install wget tar gzip gcc make expect

https://sqlite.dev.org.tw/ 下載原始碼,在本機編譯並安裝。

wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
    -DSQLITE_ENABLE_FTS3_PARENTHESIS \
    -DSQLITE_ENABLE_FTS4 \
    -DSQLITE_ENABLE_FTS5 \
    -DSQLITE_ENABLE_JSON1 \
    -DSQLITE_ENABLE_LOAD_EXTENSION \
    -DSQLITE_ENABLE_RTREE \
    -DSQLITE_ENABLE_STAT4 \
    -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
    -DSQLITE_SOUNDEX \
    -DSQLITE_TEMP_STORE=3 \
    -DSQLITE_USE_URI \
    -O2 \
    -fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install

安裝後將 /usr/local/lib 新增至程式庫路徑

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

設定 PostgreSQL 資料庫

您需要建立一個資料庫和一個資料庫使用者,Airflow 將使用該資料庫使用者來存取此資料庫。 在以下範例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
GRANT ALL ON SCHEMA public TO airflow_user;

注意

資料庫必須使用 UTF-8 字元集

您可能需要更新 Postgres pg_hba.conf 以將 airflow 使用者新增至資料庫存取控制清單;並重新載入資料庫組態以載入您的變更。 請參閱 Postgres 文件中的 pg_hba.conf 檔案以瞭解更多資訊。

警告

當您使用 SQLAlchemy 1.4.0+ 時,您需要在 sql_alchemy_conn 中使用 postgresql:// 作為資料庫。 在舊版本的 SQLAlchemy 中,可以使用 postgres://,但在 SQLAlchemy 1.4.0+ 中使用它會導致

>       raise exc.NoSuchModuleError(
            "Can't load plugin: %s:%s" % (self.group, name)
        )
E       sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres

如果您無法立即變更 URL 的前綴,Airflow 會繼續與 SQLAlchemy 1.3 搭配使用,您可以降級 SQLAlchemy,但我們建議更新前綴。

詳細資訊請參閱 SQLAlchemy 變更記錄

我們建議使用 psycopg2 驅動程式,並在您的 SqlAlchemy 連線字串中指定它。

postgresql+psycopg2://<user>:<password>@<host>/<db>

另請注意,由於 SqlAlchemy 沒有公開在資料庫 URI 中指定特定結構描述的方式,因此您需要確保結構描述 public 在您的 Postgres 使用者的 search_path 中。

如果您為 Airflow 建立新的 Postgres 帳戶

  • 新 Postgres 使用者的預設 search_path 為:"$user", public,無需變更。

如果您使用具有自訂 search_path 的目前 Postgres 使用者,則可以使用以下命令變更 search_path

ALTER USER airflow_user SET search_path = public;

如需有關設定 PostgreSQL 連線的詳細資訊,請參閱 SQLAlchemy 文件中的 PostgreSQL 方言

注意

眾所周知,Airflow - 尤其是在高效能設定中 - 會開啟許多與 metadata 資料庫的連線。 這可能會導致 Postgres 資源使用問題,因為在 Postgres 中,每個連線都會建立一個新的程序,並且當開啟大量連線時,它會使 Postgres 非常耗用資源。 因此,我們建議將 PGBouncer 用作所有 Postgres 生產環境安裝的資料庫 Proxy。 PGBouncer 可以處理來自多個元件的連線集區,而且如果您有遠端資料庫且連線可能不穩定,它也會使您的資料庫連線更能夠應對暫時性的網路問題。 PGBouncer 部署的範例實作可以在 Apache Airflow 的 Helm Chart 中找到,您可以在其中透過切換布林值旗標來啟用預先設定的 PGBouncer 執行個體。 您可以查看我們在那裡採用的方法,並將其用作靈感,當您準備自己的部署時,即使您不使用官方 Helm Chart 也是如此。

另請參閱 Helm Chart 生產環境指南

注意

對於受管理的 Postgres(例如 Azure Postgresql、CloudSQL、Amazon RDS),您應該在連線參數中使用 keepalives_idle,並將其設定為小於閒置時間,因為這些服務會在閒置一段時間(通常為 300 秒)後關閉閒置連線,這會導致錯誤 The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detectedkeepalive 設定可以透過 組態參考[database] 區段的 sql_alchemy_connect_args 組態參數變更。 您可以在您的 local_settings.py 中設定 args,而 sql_alchemy_connect_args 應該是儲存組態參數的字典的完整匯入路徑。 您可以閱讀 Postgres Keepalives。 已觀察到可以解決問題的 keepalives 的範例設定可能是

keepalive_kwargs = {
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 5,
    "keepalives_count": 5,
}

然後,如果它放在 airflow_local_settings.py 中,則組態匯入路徑將為

sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs

如需有關如何設定本機設定的詳細資訊,請參閱 設定本機設定

設定 MySQL 資料庫

您需要建立一個資料庫和一個資料庫使用者,Airflow 將使用該資料庫使用者來存取此資料庫。 在以下範例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

注意

資料庫必須使用 UTF-8 字元集。 您必須注意的一個小警告是,較新版本的 MySQL 中的 utf8 實際上是 utf8mb4,這會導致 Airflow 索引變得太大(請參閱 https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。 因此,從 Airflow 2.2 開始,所有 MySQL 資料庫的 sql_engine_collation_for_ids 都會自動設定為 utf8mb3_bin(除非您覆寫它)。 這可能會導致 Airflow 資料庫中 ID 欄位的對照 ID 混合,但這沒有負面影響,因為 Airflow 中的所有相關 ID 都僅使用 ASCII 字元。

我們依賴 MySQL 更嚴格的 ANSI SQL 設定,以便擁有合理的預設值。 請確保在您的 my.cnf 檔案中 [mysqld] 區段下指定 explicit_defaults_for_timestamp=1 選項。 您也可以使用傳遞給 mysqld 可執行檔的 --explicit-defaults-for-timestamp 參數來啟用這些選項

我們建議使用 mysqlclient 驅動程式,並在您的 SqlAlchemy 連線字串中指定它。

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

重要

MySQL 後端的整合僅在使用 Apache Airflow 的持續整合 (CI) 流程期間使用 mysqlclient 驅動程式進行驗證。

如果您想要使用其他驅動程式,請造訪 SQLAlchemy 文件中的 MySQL 方言,以瞭解有關 SqlAlchemy 連線的下載和設定的更多資訊。

此外,您也應該特別注意 MySQL 的編碼。 雖然 utf8mb4 字元集在 MySQL 中越來越受歡迎(實際上,utf8mb4 成為 MySQL8.0 中的預設字元集),但在 Airflow 2+ 中使用 utf8mb4 編碼需要額外設定(如需更多詳細資訊,請參閱 #7570。)。 如果您使用 utf8mb4 作為字元集,您也應該設定 sql_engine_collation_for_ids=utf8mb3_bin

注意

在嚴格模式下,MySQL 不允許 0000-00-00 作為有效日期。 然後,在某些情況下,您可能會收到類似 "Invalid default value for 'end_date'" 的錯誤(某些 Airflow 資料表使用 0000-00-00 00:00:00 作為時間戳記欄位預設值)。 為了避免此錯誤,您可以停用 MySQL 伺服器上的 NO_ZERO_DATE 模式。 請閱讀 https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field,以瞭解如何停用它。 如需更多資訊,請參閱 SQL 模式 - NO_ZERO_DATE

MsSQL 資料庫

警告

經過討論投票程序,Airflow 的 PMC 成員和提交者已達成決議,不再將 MsSQL 維護為受支援的資料庫後端。

從 Airflow 2.9.0 開始,已移除對 Airflow 資料庫後端的 MsSQL 支援。 這不會影響現有的提供者套件(operators 和 hooks),DAG 仍然可以存取和處理來自 MsSQL 的資料。 但是,進一步使用可能會擲回錯誤,導致 Airflow 的核心功能無法使用。

從 MsSQL Server 移轉

由於 Airflow 2.9.0 已終止對 MSSQL 的支援,移轉腳本可以協助 Airflow 版本 2.7.x 或 2.8.x 從 SQL-Server 移轉。 移轉腳本可在 Github 上的 airflow-mssql-migration 儲存庫中取得。

請注意,移轉腳本不提供支援和保固。

其他組態選項

還有更多組態選項可用於設定 SQLAlchemy 行為。 如需詳細資訊,請參閱 參考文件,以取得 [database] 區段中的 sqlalchemy_* 選項。

例如,您可以指定資料庫結構描述,Airflow 將在其下建立所需的資料表。 如果您想要 Airflow 將其資料表安裝在 PostgreSQL 資料庫的 airflow 結構描述中,請指定以下環境變數

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

請注意 SQL_ALCHEMY_CONN 資料庫 URL 結尾的 search_path

初始化資料庫

在組態資料庫並在 Airflow 組態中連線到它之後,您應該建立資料庫結構描述。

airflow db migrate

Airflow 中的資料庫監控與維護

Airflow 廣泛使用關聯式 metadata 資料庫進行工作排程和執行。 監控和正確組態此資料庫對於最佳 Airflow 效能至關重要。

主要考量

  1. 效能影響:長時間或過多的查詢可能會嚴重影響 Airflow 的功能。 這些可能是由於工作流程的特定性、缺乏最佳化或程式碼錯誤所引起。

  2. 資料庫統計資訊:資料庫引擎不正確的最佳化決策(通常是由於過時的資料統計資訊)可能會降低效能。

責任

Airflow 環境中資料庫監控和維護的責任因您使用的是自我管理資料庫和 Airflow 執行個體,還是選擇受管理服務而異。

自我管理環境:

在資料庫和 Airflow 都是自我管理的設定中,部署管理員負責設定、組態和維護資料庫。 這包括監控其效能、管理備份、定期清理並確保其與 Airflow 的最佳運作。

受管理服務:

  • 受管理資料庫服務:當使用受管理 DB 服務時,許多維護工作(例如備份、修補和基本監控)都由提供者處理。 但是,部署管理員仍然需要監督 Airflow 的組態,並最佳化特定於其工作流程的效能設定、管理定期清理並監控其 DB 以確保與 Airflow 的最佳運作。

  • 受管理 Airflow 服務:使用受管理 Airflow 服務時,這些服務提供者負責 Airflow 及其資料庫的組態和維護。 但是,部署管理員需要與服務組態協作,以確保大小調整和工作流程需求與受管理服務的大小調整和組態相符。

監控面向

定期監控應包括

  • CPU、I/O 和記憶體使用率。

  • 查詢頻率和數量。

  • 識別和記錄緩慢或長時間執行的查詢。

  • 偵測效率低下的查詢執行計畫。

  • 分析磁碟交換與記憶體使用率和快取交換頻率。

工具與策略

  • Airflow 沒有提供用於資料庫監控的直接工具。

  • 使用伺服器端監控和記錄來取得指標。

  • 根據定義的閾值啟用長時間執行查詢的追蹤。

  • 定期執行內部管理工作(例如 ANALYZE SQL 命令)以進行維護。

資料庫清理工具

  • Airflow DB Clean 命令:利用 airflow db clean 命令來協助管理和清理您的資料庫。

  • airflow.utils.db_cleanup 中的 Python 方法:此模組提供用於資料庫清理和維護的其他 Python 方法,為特定需求提供更精細的控制和自訂。

建議

  • 主動監控:在生產環境中實作監控和記錄,而不會顯著影響效能。

  • 資料庫特定指南:請參閱所選資料庫的文件,以取得特定監控設定指示。

  • 受管理資料庫服務:檢查您的資料庫提供者是否提供自動維護工作。

SQLAlchemy 日誌記錄

如需詳細的查詢分析,請啟用 SQLAlchemy 用戶端記錄(SQLAlchemy 引擎組態中的 echo=True)。

  • 此方法更具侵入性,並且可能會影響 Airflow 的用戶端效能。

  • 它會產生大量記錄,尤其是在忙碌的 Airflow 環境中。

  • 適用於非生產環境,例如預備系統。

您可以按照 SQLAlchemy 日誌記錄文件中的說明,使用 echo=True 作為 sqlalchemy 引擎組態來執行此操作。

使用 sql_alchemy_engine_args 組態參數將 echo arg 設定為 True。

注意

  • 啟用廣泛的記錄時,請注意對 Airflow 效能和系統資源的影響。

  • 在生產環境中,相較於用戶端記錄,最好使用伺服器端監控,以最大程度地減少效能干擾。

下一步?

預設情況下,Airflow 使用 SequentialExecutor,這不提供平行處理。 您應該考慮組態不同的 執行器 以獲得更好的效能。

此條目是否有幫助?