管理連線

另請參閱

如需 hooks 和連線的概觀,請參閱連線 & Hooks

Airflow 的 Connection 物件用於儲存連線至外部服務所需的憑證和其他資訊。

連線可以用以下方式定義

將連線儲存在環境變數中

Airflow 連線可以在環境變數中定義。

命名慣例為 AIRFLOW_CONN_{CONN_ID},全部大寫(請注意 CONN 周圍的單底線)。因此,如果您的連線 ID 為 my_prod_db,則變數名稱應為 AIRFLOW_CONN_MY_PROD_DB

值可以是 JSON 或 Airflow 的 URI 格式。

JSON 格式範例

版本 2.3.0 新增。

如果使用 JSON 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

產生 JSON 連線表示法

版本 2.8.0 新增。

為了簡化連線 JSON 的產生,Connection 類別具有方便的屬性 as_json()。它可以像這樣使用

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,也可以使用相同的方法將連線從 URI 格式轉換為 JSON 格式

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI 格式範例

如果使用 Airflow URI 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

請參閱連線 URI 格式以取得有關如何產生有效 URI 的更多詳細資訊。

注意

在環境變數中定義的連線不會顯示在 Airflow UI 中,也不會在使用 airflow connections list 時顯示。

將連線儲存在 Secrets Backend 中

您可以將 Airflow 連線儲存在外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此類服務。如需更多詳細資訊,請參閱Secrets Backend

將連線儲存在資料庫中

另請參閱

連線也可以選擇儲存在環境變數外部 secrets backend中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。

當將連線儲存在資料庫中時,您可以使用 Web UI 或 Airflow CLI 來管理它們。

使用 UI 建立連線

開啟 UI 的 管理員->連線 區段。按一下 建立 連結以建立新的連線。

../_images/connection_create.png
  1. 連線 ID 欄位中填寫所需的連線 ID。建議您使用小寫字元並用底線分隔單字。

  2. 使用 連線類型 欄位選擇連線類型。

  3. 填寫剩餘欄位。請參閱處理 extra 中的任意 dict,以瞭解屬於不同連線類型的欄位說明。

  4. 按一下 儲存 按鈕以建立連線。

使用 UI 編輯連線

開啟 UI 的 管理員->連線 區段。按一下您想要在連線清單中編輯的連線旁邊的鉛筆圖示。

../_images/connection_edit.png

修改連線屬性,然後按一下 儲存 按鈕以儲存您的變更。

從 CLI 建立連線

您可以從 CLI 將連線新增至資料庫。

您可以使用 JSON 格式新增連線(從版本 2.3.0 開始)

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者,您可以使用 Airflow 的連線 URI 格式(請參閱產生連線 URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最後,您也可以個別指定每個參數

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

將連線匯出到檔案

您可以將儲存在資料庫中的連線匯出到檔案(例如,將連線從一個環境遷移到另一個環境)。請參閱匯出連線以瞭解用法。

資料庫中連線的安全性

對於儲存在 Airflow 元數據資料庫中的連線,Airflow 使用 Fernet 來加密密碼和其他潛在的敏感資料。它保證在沒有加密密碼的情況下,Connection Passwords 無法在沒有金鑰的情況下被操縱或讀取。如需有關設定 Fernet 的資訊,請查看Fernet

測試連線

基於安全考量,預設情況下,Airflow UI、API 和 CLI 中的測試連線功能已停用。

如需使用者功能的更多資訊,請參閱文件:https://airflow.dev.org.tw/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。強烈建議您在確保只有高度信任的 UI/API 使用者具有「編輯連線」權限之前,不要啟用此功能。

此功能的可用性可以透過 Airflow 組態 (airflow.cfg) 的 core 區段中的 test_connection 標誌來控制。它也可以透過環境變數 AIRFLOW__CORE__TEST_CONNECTION 來控制。

以下值適用於此組態參數

  • 停用:停用測試連線功能,並停用 UI 中的「測試連線」按鈕。這也是 Airflow 組態中設定的預設值。

  • 啟用:啟用測試連線功能,並啟用 UI 中的「測試連線」按鈕。

  • 隱藏:停用測試連線功能,並隱藏 UI 中的「測試連線」按鈕。

啟用「測試連線」後,可以從 UI 中的建立編輯連線頁面使用,透過呼叫連線 REST API,或執行 airflow connections test CLI 命令

警告

當使用 Airflow UI 或 REST API 時,此功能不適用於位於外部 secrets backend 中的連線。

為了測試連線,Airflow 從相關聯的 hook 類別呼叫 test_connection 方法並報告結果。可能會發生連線類型沒有任何相關聯的 hook,或者 hook 沒有 test_connection 方法實作的情況,在任何一種情況下,都會顯示錯誤訊息,或者功能將被停用(如果您在 UI 中進行測試)。

注意

當在 Airflow UI 中進行測試時,測試從 Webserver 執行,因此此功能受限於為您的 Webserver 設定的網路輸出規則。

注意

如果 Webserver 和 Worker 機器(如果透過 Airflow UI 進行測試)或機器/pods(如果透過 Airflow CLI 進行測試)安裝了不同的 libs 或 provider 套件,則測試結果可能會有所不同。

自訂連線類型

Airflow 允許定義自訂連線類型 – 包括修改連線的新增/編輯表單。自訂連線類型在社群維護的 providers 中定義,但您也可以新增自訂 provider 以新增自訂連線類型。請參閱 Provider 套件,以瞭解如何新增自訂 providers 的說明。

自訂連線類型透過 providers 提供的 Hooks 定義。Hooks 可以實作在協定類別 DiscoverableHook 中定義的方法。請注意,您的自訂 Hook 不應從此類別衍生,此類別是一個範例,用於記錄關於您的 Hook 可能定義的類別欄位和方法的期望。另一個很好的範例是 JdbcHook

透過在您的 hooks 中實作這些方法,並透過 provider 元數據中的 connection-types 陣列(以及已棄用的 hook-class-names)公開它們,您可以透過以下方式自訂 Airflow

  • 新增自訂連線類型

  • 從連線類型新增自動 Hook 建立

  • 新增自訂表單 widget,以在您的連線 URL 中顯示和編輯自訂「額外」參數

  • 隱藏未用於您的連線的欄位

  • 新增顯示欄位應如何格式化的範例的佔位符

您可以在 Provider 套件中閱讀有關如何新增自訂 provider 套件的更多詳細資訊

自訂連線欄位

可以在 Airflow Webserver 中的連線新增/編輯檢視中新增自訂表單欄位。自訂欄位以 JSON 格式儲存在 Connection.extra 欄位中。若要新增自訂欄位,請實作方法 get_connection_form_widgets()。此方法應傳回字典。金鑰應為欄位的字串名稱,因為它應儲存在 extra dict 中。值應為 wtforms.fields.core.Field 的繼承者。

這是一個範例

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自訂欄位不再需要 extra__<conn type>__ 字首

在 Airflow 2.3 之前,如果您想要在 UI 中使用自訂欄位,則必須以 extra__<conn type>__ 作為字首,這就是其值儲存在 extra dict 中的方式。從 2.3 開始,您不再需要這樣做。

方法 get_ui_field_behaviour() 可讓您自訂兩者的行為。例如,您可以隱藏或重新標記欄位(例如,如果它未使用或重新用途),並且您可以新增佔位符文字。

一個範例

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果您想要為 extra 欄位新增表單佔位符,而該欄位的名稱與標準連線屬性(即 login、password、host、scheme、port、extra)衝突,則您必須以 extra__<conn type>__ 作為字首。例如 extra__myservice__password

查看 providers 以取得您可以執行的操作範例,例如 JdbcHookAsanaHook 都使用了此功能。

注意

已棄用的 hook-class-names

在 Airflow 2.2.0 之前,providers 中的連線已透過 provider 元數據中的 hook-class-names 陣列公開。但是,當在 workers 中使用個別 hooks 時,這已被證明是效率低下的,並且 hook-class-names 陣列現在已由 connection-types 陣列取代。在 provider 支援低於 2.2.0 的 Airflow 之前,connection-typeshook-class-names 都應存在。CI 建置期間的自動檢查將驗證這兩個陣列的一致性。

URI 格式

注意

從 2.3.0 版本開始,您可以使用 JSON 序列化連線。請參閱範例

由於歷史原因,Airflow 具有特殊的 URI 格式,可用於將 Connection 物件序列化為字串值。

一般來說,Airflow 的 URI 格式如下

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上述 URI 將產生一個 Connection 物件,其等效於以下內容

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

產生連線 URI

為了簡化連線 URI 的產生,Connection 類別具有方便的方法 get_uri()。它可以像這樣使用

>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'

此外,如果您已建立連線,則可以使用 airflow connections get 命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

處理 extra 中的任意 dict

某些 JSON 結構無法在不遺失的情況下進行 urlencode。對於此類 JSON,get_uri 將整個字串儲存在 url 查詢參數 __extra__ 下。

例如

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

我們可以驗證它是否傳回相同的字典

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但對於僅儲存鍵值對的最常見情況,使用純 url 編碼。

您可以驗證 URI 是否已正確解析,如下所示

>>> from airflow.models.connection import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

處理連線參數中的特殊字元

注意

產生連線時,請使用 Connection.get_uri 便利方法,如產生連線 URI章節中所述。本節僅供參考。

手動建置 URI 時,某些字元需要特殊處理。

例如,如果您的密碼中有 /,則會失敗

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

為了修正此問題,您可以使用 quote_plus() 進行編碼

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

此條目是否有幫助?