管理連線¶
另請參閱
如需 hooks 和連線的概觀,請參閱連線 & Hooks。
Airflow 的 Connection
物件用於儲存連線至外部服務所需的憑證和其他資訊。
連線可以用以下方式定義
在環境變數中
在外部Secrets Backend中
在 Airflow 元數據資料庫 中 (使用 CLI 或 Web UI)
將連線儲存在環境變數中¶
Airflow 連線可以在環境變數中定義。
命名慣例為 AIRFLOW_CONN_{CONN_ID}
,全部大寫(請注意 CONN
周圍的單底線)。因此,如果您的連線 ID 為 my_prod_db
,則變數名稱應為 AIRFLOW_CONN_MY_PROD_DB
。
值可以是 JSON 或 Airflow 的 URI 格式。
JSON 格式範例¶
版本 2.3.0 新增。
如果使用 JSON 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
產生 JSON 連線表示法¶
版本 2.8.0 新增。
為了簡化連線 JSON 的產生,Connection
類別具有方便的屬性 as_json()
。它可以像這樣使用
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
此外,也可以使用相同的方法將連線從 URI 格式轉換為 JSON 格式
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="awesome_conn",
... description="Example Connection",
... uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'
將連線儲存在 Secrets Backend 中¶
您可以將 Airflow 連線儲存在外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此類服務。如需更多詳細資訊,請參閱Secrets Backend。
將連線儲存在資料庫中¶
另請參閱
連線也可以選擇儲存在環境變數或外部 secrets backend中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。
當將連線儲存在資料庫中時,您可以使用 Web UI 或 Airflow CLI 來管理它們。
使用 UI 建立連線¶
開啟 UI 的 管理員->連線
區段。按一下 建立
連結以建立新的連線。

在
連線 ID
欄位中填寫所需的連線 ID。建議您使用小寫字元並用底線分隔單字。使用
連線類型
欄位選擇連線類型。填寫剩餘欄位。請參閱處理 extra 中的任意 dict,以瞭解屬於不同連線類型的欄位說明。
按一下
儲存
按鈕以建立連線。
從 CLI 建立連線¶
您可以從 CLI 將連線新增至資料庫。
您可以使用 JSON 格式新增連線(從版本 2.3.0 開始)
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
或者,您可以使用 Airflow 的連線 URI 格式(請參閱產生連線 URI)。
airflow connections add 'my_prod_db' \
--conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1¶m2=val2&...'
最後,您也可以個別指定每個參數
airflow connections add 'my_prod_db' \
--conn-type 'my-conn-type' \
--conn-login 'login' \
--conn-password 'password' \
--conn-host 'host' \
--conn-port 'port' \
--conn-schema 'schema' \
...
資料庫中連線的安全性¶
對於儲存在 Airflow 元數據資料庫中的連線,Airflow 使用 Fernet 來加密密碼和其他潛在的敏感資料。它保證在沒有加密密碼的情況下,Connection Passwords 無法在沒有金鑰的情況下被操縱或讀取。如需有關設定 Fernet 的資訊,請查看Fernet。
測試連線¶
基於安全考量,預設情況下,Airflow UI、API 和 CLI 中的測試連線功能已停用。
如需使用者功能的更多資訊,請參閱文件:https://airflow.dev.org.tw/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。強烈建議您在確保只有高度信任的 UI/API 使用者具有「編輯連線」權限之前,不要啟用此功能。
此功能的可用性可以透過 Airflow 組態 (airflow.cfg) 的 core 區段中的 test_connection 標誌來控制。它也可以透過環境變數 AIRFLOW__CORE__TEST_CONNECTION
來控制。
以下值適用於此組態參數
停用:停用測試連線功能,並停用 UI 中的「測試連線」按鈕。這也是 Airflow 組態中設定的預設值。
啟用:啟用測試連線功能,並啟用 UI 中的「測試連線」按鈕。
隱藏:停用測試連線功能,並隱藏 UI 中的「測試連線」按鈕。
啟用「測試連線」後,可以從 UI 中的建立或編輯連線頁面使用,透過呼叫連線 REST API,或執行 airflow connections test
CLI 命令。
警告
當使用 Airflow UI 或 REST API 時,此功能不適用於位於外部 secrets backend 中的連線。
為了測試連線,Airflow 從相關聯的 hook 類別呼叫 test_connection
方法並報告結果。可能會發生連線類型沒有任何相關聯的 hook,或者 hook 沒有 test_connection
方法實作的情況,在任何一種情況下,都會顯示錯誤訊息,或者功能將被停用(如果您在 UI 中進行測試)。
注意
當在 Airflow UI 中進行測試時,測試從 Webserver 執行,因此此功能受限於為您的 Webserver 設定的網路輸出規則。
注意
如果 Webserver 和 Worker 機器(如果透過 Airflow UI 進行測試)或機器/pods(如果透過 Airflow CLI 進行測試)安裝了不同的 libs 或 provider 套件,則測試結果可能會有所不同。
自訂連線類型¶
Airflow 允許定義自訂連線類型 – 包括修改連線的新增/編輯表單。自訂連線類型在社群維護的 providers 中定義,但您也可以新增自訂 provider 以新增自訂連線類型。請參閱 Provider 套件,以瞭解如何新增自訂 providers 的說明。
自訂連線類型透過 providers 提供的 Hooks 定義。Hooks 可以實作在協定類別 DiscoverableHook
中定義的方法。請注意,您的自訂 Hook 不應從此類別衍生,此類別是一個範例,用於記錄關於您的 Hook 可能定義的類別欄位和方法的期望。另一個很好的範例是 JdbcHook
。
透過在您的 hooks 中實作這些方法,並透過 provider 元數據中的 connection-types
陣列(以及已棄用的 hook-class-names
)公開它們,您可以透過以下方式自訂 Airflow
新增自訂連線類型
從連線類型新增自動 Hook 建立
新增自訂表單 widget,以在您的連線 URL 中顯示和編輯自訂「額外」參數
隱藏未用於您的連線的欄位
新增顯示欄位應如何格式化的範例的佔位符
您可以在 Provider 套件中閱讀有關如何新增自訂 provider 套件的更多詳細資訊
自訂連線欄位¶
可以在 Airflow Webserver 中的連線新增/編輯檢視中新增自訂表單欄位。自訂欄位以 JSON 格式儲存在 Connection.extra
欄位中。若要新增自訂欄位,請實作方法 get_connection_form_widgets()
。此方法應傳回字典。金鑰應為欄位的字串名稱,因為它應儲存在 extra
dict 中。值應為 wtforms.fields.core.Field
的繼承者。
這是一個範例
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
return {
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}
注意
自訂欄位不再需要 extra__<conn type>__
字首
在 Airflow 2.3 之前,如果您想要在 UI 中使用自訂欄位,則必須以 extra__<conn type>__
作為字首,這就是其值儲存在 extra
dict 中的方式。從 2.3 開始,您不再需要這樣做。
方法 get_ui_field_behaviour()
可讓您自訂兩者的行為。例如,您可以隱藏或重新標記欄位(例如,如果它未使用或重新用途),並且您可以新增佔位符文字。
一個範例
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"workspace": "My workspace gid",
"project": "My project gid",
},
}
注意
如果您想要為 extra
欄位新增表單佔位符,而該欄位的名稱與標準連線屬性(即 login、password、host、scheme、port、extra)衝突,則您必須以 extra__<conn type>__
作為字首。例如 extra__myservice__password
。
查看 providers 以取得您可以執行的操作範例,例如 JdbcHook
和 AsanaHook
都使用了此功能。
注意
已棄用的 hook-class-names
在 Airflow 2.2.0 之前,providers 中的連線已透過 provider 元數據中的 hook-class-names
陣列公開。但是,當在 workers 中使用個別 hooks 時,這已被證明是效率低下的,並且 hook-class-names
陣列現在已由 connection-types
陣列取代。在 provider 支援低於 2.2.0 的 Airflow 之前,connection-types
和 hook-class-names
都應存在。CI 建置期間的自動檢查將驗證這兩個陣列的一致性。
URI 格式¶
注意
從 2.3.0 版本開始,您可以使用 JSON 序列化連線。請參閱範例。
由於歷史原因,Airflow 具有特殊的 URI 格式,可用於將 Connection 物件序列化為字串值。
一般來說,Airflow 的 URI 格式如下
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
上述 URI 將產生一個 Connection
物件,其等效於以下內容
Connection(
conn_id="",
conn_type="my_conn_type",
description=None,
login="my-login",
password="my-password",
host="my-host",
port=5432,
schema="my-schema",
extra=json.dumps(dict(param1="val1", param2="val2")),
)
產生連線 URI¶
為了簡化連線 URI 的產生,Connection
類別具有方便的方法 get_uri()
。它可以像這樣使用
>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
此外,如果您已建立連線,則可以使用 airflow connections get
命令。
$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db
處理 extra 中的任意 dict¶
某些 JSON 結構無法在不遺失的情況下進行 urlencode。對於此類 JSON,get_uri
將整個字串儲存在 url 查詢參數 __extra__
下。
例如
>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
... conn_type="scheme",
... host="host/location",
... schema="schema",
... login="user",
... password="password",
... port=1234,
... extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
我們可以驗證它是否傳回相同的字典
>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True
但對於僅儲存鍵值對的最常見情況,使用純 url 編碼。
您可以驗證 URI 是否已正確解析,如下所示
>>> from airflow.models.connection import Connection
>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password
處理連線參數中的特殊字元¶
注意
產生連線時,請使用 Connection.get_uri
便利方法,如產生連線 URI章節中所述。本節僅供參考。
手動建置 URI 時,某些字元需要特殊處理。
例如,如果您的密碼中有 /
,則會失敗
>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'
為了修正此問題,您可以使用 quote_plus()
進行編碼
>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.password)
my-pa/ssword