模組管理

Airflow 允許您在 DAG 和 Airflow 組態中使用您自己的 Python 模組。以下文章將說明如何建立您自己的模組,以便 Airflow 可以正確載入它,以及在模組未正確載入時診斷問題。

通常,您會希望在您的 Airflow 部署中使用您自己的 python 程式碼,例如共用程式碼、函式庫,您可能想要使用共用的 python 程式碼產生 DAG,並擁有多個 DAG python 檔案。

您可以透過以下其中一種方式執行此操作

  • 將您的模組新增到 Airflow 自動新增到 PYTHONPATH 的其中一個資料夾

  • 將您存放程式碼的額外資料夾新增到 PYTHONPATH

  • 將您的程式碼打包到 Python 套件中,並與 Airflow 一起安裝。

下一章概述 Python 如何載入套件和模組,並深入探討上述三種可能性的細節。

Python 中套件/模組載入的運作方式

Python 嘗試從中載入模組的目錄列表由變數 sys.path 給定。Python 確實嘗試 智慧地判斷 此變數的內容,具體取決於作業系統、Python 的安裝方式以及使用的 Python 版本。

您可以透過執行互動式終端機(如下例所示)來檢查目前 Python 環境中此變數的內容

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

sys.path 在程式啟動期間初始化。第一個優先權給予目前目錄,即 path[0] 是包含用於調用之目前腳本的目錄,如果它是互動式 shell,則為空字串。第二個優先權給予提供的 PYTHONPATH,然後是安裝相依的預設路徑,這些路徑由 site 模組管理。

sys.path 也可以在 Python 工作階段期間修改,只需使用 append(例如,sys.path.append("/path/to/custom/package"))。Python 將在新增路徑後立即開始在新路徑中搜尋套件。Airflow 利用此功能,如 將目錄新增至 PYTHONPATH 區段中所述。

在變數 sys.path 中,有一個目錄 site-packages,其中包含已安裝的外部套件,這表示您可以使用 pipanaconda 安裝套件,並且可以在 Airflow 中使用它們。在下一節中,您將學習如何建立您自己的簡單可安裝套件,以及如何指定要使用環境變數 PYTHONPATH 新增至 sys.path 的其他目錄。

另請務必在您的資料夾中新增 init 檔案

套件的典型結構

這是您可能在 dags 資料夾中擁有的範例結構

<DIRECTORY ON PYTHONPATH>
| .airflowignore  -- only needed in ``dags`` folder, see below
| -- my_company
              | __init__.py
              | common_package
              |              |  __init__.py
              |              | common_module.py
              |              | subpackage
              |                         | __init__.py
              |                         | subpackaged_util_module.py
              |
              | my_custom_dags
                              | __init__.py
                              | my_dag1.py
                              | my_dag2.py
                              | base_dag.py

在上述情況下,這些是您可以匯入 python 檔案的方式

from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag

您可以在資料夾的根目錄中看到 .airflowignore 檔案。這是一個您可以放在 dags 資料夾中的檔案,以告知 Airflow 在 Airflow 排程器尋找 DAG 時應忽略資料夾中的哪些檔案。它應包含應忽略路徑的正規表示式(預設)或 glob 運算式。您不需要在 PYTHONPATH 中的任何其他資料夾中擁有該檔案(而且您也只能將共用程式碼保存在其他資料夾中,而不是實際的 DAG)。

在上面的範例中,DAG 僅位於 my_custom_dags 資料夾中,common_package 在搜尋 DAG 時不應由排程器掃描,因此我們應忽略 common_package 資料夾。如果您在那裡保留一個基礎 DAG,base_dag.py,而 my_dag1.pymy_dag2.py 從該基礎 DAG 衍生而來,您也會想要忽略 base_dag.py。您的 .airflowignore 應如下所示

my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py

如果 DAG_IGNORE_FILE_SYNTAX 設定為 glob,則等效的 .airflowignore 檔案將會是

my_company/common_package/
my_company/my_custom_dags/base_dag.py

Airflow 中內建的 PYTHONPATH 條目

Airflow 在動態執行時會將三個目錄新增至 sys.path

  • dags 資料夾:它在 [core] 區段中使用選項 dags_folder 進行組態。

  • config 資料夾:它預設透過設定 AIRFLOW_HOME 變數 ({AIRFLOW_HOME}/config) 進行組態。

  • plugins 資料夾:它在 [core] 區段中使用選項 plugins_folder 進行組態。

注意

Airflow 2 中的 DAGS 資料夾不應與 webserver 共用。雖然您可以這樣做,但與 Airflow 1.10 不同,Airflow 不期望 DAGS 資料夾存在於 webserver 中。實際上,與 webserver 共用 dags 資料夾有點安全風險,因為這表示編寫 DAG 的人員可以編寫 webserver 將能夠執行的程式碼(理想情況下,webserver 永遠不應執行可由編寫 DAG 的使用者修改的程式碼)。因此,如果您需要與 webserver 共用某些程式碼,強烈建議您透過 configplugins 資料夾或透過已安裝的 Airflow 套件(請參閱下文)來共用。這些資料夾通常由與 DAG 資料夾(通常是資料科學家)不同的使用者(管理員/DevOps)管理和存取,因此它們被認為是安全的,因為它們是 Airflow 安裝組態的一部分,並由管理安裝的人員控制。

程式碼命名的最佳實務

當您匯入程式碼時,應注意一些陷阱。

有時,您可能會看到從 Airflow 或您使用的其他函式庫程式碼中引發的 module 'X' has no attribute 'Y' 例外狀況。這通常是由於您在頂層的 PYTHONPATH 中有一個名為 'X' 的模組或套件,並且匯入了它而不是原始程式碼預期的模組。

您應始終為您的套件和模組使用唯一的名稱,並且有一些方法可以確保強制執行唯一性,如下所述。

使用唯一的頂層套件名稱

最重要的是,避免為您直接在 PYTHONPATH 頂層新增的任何項目使用通用名稱。例如,如果您將包含 __init__.pyairflow 資料夾新增至您的 DAGS_FOLDER,它將與 Airflow 套件衝突,並且您將無法從 Airflow 套件匯入任何內容。同樣地,不要直接在那裡新增 airflow.py 檔案。標準函式庫套件使用的通用名稱(例如 multiprocessinglogging 等)也不應用作頂層 - 無論是作為套件(即包含 __init__.py 的資料夾)還是作為模組(即 .py 檔案)。

這同樣適用於 configplugins 資料夾,它們也位於 PYTHONPATH,以及您手動新增到 PYTHONPATH 的任何項目(請參閱以下章節中的詳細資訊)。

建議您始終將 DAG/共用檔案放在對您的部署而言是唯一的子套件中(以下範例中的 my_company)。為將與系統中已存在的其他套件衝突的資料夾使用通用名稱太容易了。例如,如果您建立 airflow/operators 子資料夾,它將無法存取,因為 Airflow 已經有一個名為 airflow.operators 的套件,並且在匯入 from airflow.operators 時,它將在那裡尋找。

不要使用相對匯入

永遠不要使用 Python 3 中新增的相對匯入(以 . 開頭)。

my_dag1.py 中執行類似操作是很誘人的

from .base_dag import BaseDag  # NEVER DO THAT!!!!

您應使用完整路徑(從新增到 PYTHONPATH 的目錄開始)匯入此類共用 DAG

from my_company.my_custom_dags.base_dag import BaseDag  # This is cool

相對匯入違反直覺,並且根據您啟動 python 程式碼的方式,它們的行為可能會有所不同。在 Airflow 中,相同的 DAG 檔案可能會在不同的環境中(由排程器、工作人員或在測試期間)進行剖析,在這些情況下,相對匯入的行為可能會有所不同。當您在 Airflow DAG 中匯入任何內容時,請始終使用完整的 python 套件路徑,這將為您省去很多麻煩。您可以在 此 Stack Overflow 執行緒中閱讀有關相對匯入注意事項的更多資訊。

在套件資料夾中新增 __init__.py

當您建立資料夾時,應在您的資料夾中新增 __init__.py 檔案作為空檔案。雖然在 Python 3 中有一個隱式命名空間的概念,您不必將這些檔案新增到資料夾,但 Airflow 期望檔案已新增到您新增的所有套件。

檢查您的 PYTHONPATH 載入設定

您也可以使用 airflow info 命令查看確切的路徑,並將它們用於類似於使用環境變數 PYTHONPATH 指定的目錄。此命令指定的 sys.path 變數內容範例可能如下所示

Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.8/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]

以下是 airflow info 命令的範例輸出

Apache Airflow: 2.0.0b3

System info
OS              | Linux
architecture    | x86_64
uname           | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale          | ('en_US', 'UTF-8')
python_version  | 3.8.6 (default, Nov 25 2020, 02:47:44)  [GCC 8.3.0]
python_location | /usr/local/bin/python

Tools info
git             | git version 2.20.1
ssh             | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d  10 Sep 2019
kubectl         | NOT AVAILABLE
gcloud          | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql           | mysql  Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3         | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql            | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)

Paths info
airflow_home    | /root/airflow
system_path     | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path     | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.8:/usr/
                | local/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/site-packages:/files/dags:/root/airflow/conf
                | ig:/root/airflow/plugins
airflow_on_path | True

Config info
executor             | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn     | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder          | /files/dags
plugins_folder       | /root/airflow/plugins
base_log_folder      | /root/airflow/logs

Providers info
apache-airflow-providers-amazon           | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid     | 1.0.0b2
apache-airflow-providers-apache-hdfs      | 1.0.0b2
apache-airflow-providers-apache-hive      | 1.0.0b2

將目錄新增至 PYTHONPATH

您可以使用環境變數 PYTHONPATH 指定要新增至 sys.path 的其他目錄。透過使用以下命令提供專案根目錄的路徑來啟動 python shell

PYTHONPATH=/home/arch/projects/airflow_operators python

sys.path 變數將如下所示

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/projects/airflow_operators'
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

我們可以看到我們提供的目錄現在已新增到路徑,讓我們現在嘗試匯入套件

>>> import airflow_operators
Hello from airflow_operators
>>>

我們也可以將 PYTHONPATH 變數與 airflow 命令一起使用。例如,如果我們執行以下 airflow 命令

PYTHONPATH=/home/arch/projects/airflow_operators airflow info

我們將看到 Python PATH 已使用我們提及的 PYTHONPATH 值更新,如下所示

Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/arch/venv/lib/python3.8/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]

在 Python 中建立套件

這是新增自訂程式碼最井然有序的方式。由於使用套件,您可以組織您的版本控制方法、控制安裝的共用程式碼版本,並以受控方式將程式碼部署到您的所有執行個體和容器 - 所有這些都由系統管理員/DevOps 而不是 DAG 編寫者完成。當您有一個單獨的團隊管理此共用程式碼時,通常適合使用此方法,但如果您了解您的 python 方法,您也可以在較小的部署中以此方式發布您的程式碼。您也可以將您的 外掛程式供應商套件 安裝為 python 套件,因此學習如何建置您的套件非常方便。

以下是如何建立您的套件

1. 在開始之前,選擇並安裝您將使用的建置/打包工具,理想情況下,它應符合 PEP-621 標準,以便能夠輕鬆切換到不同的工具。常見的選擇有 setuptools、poetry、hatch、flit。

  1. 決定何時建立您自己的套件。建立套件目錄 - 在我們的例子中,我們將其稱為 airflow_operators

mkdir airflow_operators
  1. 在套件內建立檔案 __init__.py 並新增以下程式碼

print("Hello from airflow_operators")

當我們匯入此套件時,它應列印以上訊息。

4. 建立 pyproject.toml 並使用您選擇的建置工具組態填寫它。請參閱 pyproject.toml 規格

  1. 使用您選擇的工具建置您的專案。例如,對於 hatch,它可以是

hatch build -t wheel

這將在您的 dist 資料夾中建立 .whl 檔案

  1. 使用 pip 安裝 .whl 檔案

pip install dist/airflow_operators-0.0.0-py3-none-any.whl
  1. 該套件現在可以使用了!

>>> import airflow_operators
Hello from airflow_operators
>>>

可以使用 pip 命令移除套件

pip uninstall airflow_operators

有關如何建立和發布 python 套件的更多詳細資訊,請參閱 打包 Python 專案

此條目是否有幫助?