Webserver 驗證

預設情況下,Airflow 要求使用者在登入前指定密碼。您可以使用以下 CLI 命令來建立帳戶

# create an admin user
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

若要停用驗證並允許使用者以匿名身分識別,則需要在 $AIRFLOW_HOME/webserver_config.py 中設定以下項目,以指定匿名使用者預設將擁有的角色

AUTH_ROLE_PUBLIC = 'Admin'

請務必查看 API 以了解如何保護 API 的安全。

注意

Airflow 使用 Python 的組態解析器。此組態解析器會內插 '%' 符號。請務必將組態檔(但不是環境變數)中的任何 % 符號逸出為 %%,否則 Airflow 可能會在組態解析器例外狀況時將這些密碼洩漏到日誌中。

密碼

最簡單的驗證機制之一是要求使用者在登入前指定密碼。

請使用命令列介面 airflow users create 來建立帳戶,或在 UI 中執行此操作。

其他方法

自 Airflow 2.0 起,預設 UI 為 Flask App Builder RBAC。系統會自動產生 webserver_config.py 組態檔,可用於組態 Airflow 以支援 OAuth、OpenID、LDAP、REMOTE_USER 等驗證方法。應注意,由於 Flask AppBuilder 和 Authlib 的限制,僅支援部分 OAuth2 提供者。此列表包括 githubgithublocaltwitterlinkedingoogleazureopenshiftoktakeycloakkeycloak_before_17

Web 驗證章節中描述的預設驗證選項與 $AIRFLOW_HOME/webserver_config.py 中的以下項目相關。

AUTH_TYPE = AUTH_DB

可以使用 WSGI 中介軟體來管理非常特定的驗證形式(例如 SPNEGO)並利用 REMOTE_USER 方法

from typing import Any, Callable

from flask import current_app
from flask_appbuilder.const import AUTH_REMOTE_USER


class CustomMiddleware:
    def __init__(self, wsgi_app: Callable) -> None:
        self.wsgi_app = wsgi_app

    def __call__(self, environ: dict, start_response: Callable) -> Any:
        # Custom authenticating logic here
        # ...
        environ["REMOTE_USER"] = "username"
        return self.wsgi_app(environ, start_response)


current_app.wsgi_app = CustomMiddleware(current_app.wsgi_app)

AUTH_TYPE = AUTH_REMOTE_USER

另一種建立使用者的途徑是在 UI 登入頁面中,允許使用者透過「註冊」按鈕自行註冊。可以編輯 $AIRFLOW_HOME/webserver_config.py 中的以下項目以使其成為可能

AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Desired Role For The Self Registered User"
RECAPTCHA_PRIVATE_KEY = 'private_key'
RECAPTCHA_PUBLIC_KEY = 'public_key'

MAIL_SERVER = 'smtp.gmail.com'
MAIL_USE_TLS = True
MAIL_USERNAME = 'yourappemail@gmail.com'
MAIL_PASSWORD = 'passwordformail'
MAIL_DEFAULT_SENDER = 'sender@gmail.com'

需要透過 pip 安裝 Flask-Mail 套件,以允許使用者自行註冊,因為這是 Flask-AppBuilder 框架提供的功能。

若要支援透過第三方提供者進行驗證,則需要使用所需的選項(如 OAuth、OpenID、LDAP)更新 AUTH_TYPE 項目,並且需要移除所選選項的參考行之註解,並在 $AIRFLOW_HOME/webserver_config.py 中進行組態。

如需更多詳細資訊,請參閱 FAB 文件的安全性章節

使用基於團隊的授權和 GitHub OAuth 的範例

若要使用基於團隊的授權和 GitHub OAuth,需要執行幾個步驟。

  • 透過 webserver_config.py 中的 FAB 組態來組態 OAuth

  • 建立自訂安全性管理員類別,並在 webserver_config.py 中將其提供給 FAB

  • 將安全性管理員類別傳回的角色對應到 FAB 可以理解的角色。

以下是在您的 webserver_config.py 中可能擁有的範例

from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from flask_appbuilder.security.manager import AUTH_OAUTH
import os

AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True  # Checks roles on every login
AUTH_USER_REGISTRATION = True  # allow users who are not already in the FAB DB to register

AUTH_ROLES_MAPPING = {
    "Viewer": ["Viewer"],
    "Admin": ["Admin"],
}
# If you wish, you can add multiple OAuth providers.
OAUTH_PROVIDERS = [
    {
        "name": "github",
        "icon": "fa-github",
        "token_key": "access_token",
        "remote_app": {
            "client_id": os.getenv("OAUTH_APP_ID"),
            "client_secret": os.getenv("OAUTH_APP_SECRET"),
            "api_base_url": "https://api.github.com",
            "client_kwargs": {"scope": "read:user, read:org"},
            "access_token_url": "https://github.com/login/oauth/access_token",
            "authorize_url": "https://github.com/login/oauth/authorize",
            "request_token_url": None,
        },
    },
]


class CustomSecurityManager(FabAirflowSecurityManagerOverride):
    pass


# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager

以下是定義自訂安全性管理員的範例。此類別必須在 Python 的路徑中可用,並且可以根據需要定義在 webserver_config.py 本身中。

from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
import logging
from typing import Any, List, Union
import os

log = logging.getLogger(__name__)
log.setLevel(os.getenv("AIRFLOW__LOGGING__FAB_LOGGING_LEVEL", "INFO"))

FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public"  # The "Public" role is given no permissions
TEAM_ID_A_FROM_GITHUB = 123  # Replace these with real team IDs for your org
TEAM_ID_B_FROM_GITHUB = 456  # Replace these with real team IDs for your org


def team_parser(team_payload: dict[str, Any]) -> list[int]:
    # Parse the team payload from GitHub however you want here.
    return [team["id"] for team in team_payload]


def map_roles(team_list: list[int]) -> list[str]:
    # Associate the team IDs with Roles here.
    # The expected output is a list of roles that FAB will use to Authorize the user.

    team_role_map = {
        TEAM_ID_A_FROM_GITHUB: FAB_ADMIN_ROLE,
        TEAM_ID_B_FROM_GITHUB: FAB_VIEWER_ROLE,
    }
    return list(set(team_role_map.get(team, FAB_PUBLIC_ROLE) for team in team_list))


class GithubTeamAuthorizer(FabAirflowSecurityManagerOverride):
    # In this example, the oauth provider == 'github'.
    # If you ever want to support other providers, see how it is done here:
    # https://github.com/dpgaspar/Flask-AppBuilder/blob/master/flask_appbuilder/security/manager.py#L550
    def get_oauth_user_info(self, provider: str, resp: Any) -> dict[str, Union[str, list[str]]]:
        # Creates the user info payload from Github.
        # The user previously allowed your app to act on their behalf,
        #   so now we can query the user and teams endpoints for their data.
        # Username and team membership are added to the payload and returned to FAB.

        remote_app = self.appbuilder.sm.oauth_remotes[provider]
        me = remote_app.get("user")
        user_data = me.json()
        team_data = remote_app.get("user/teams")
        teams = team_parser(team_data.json())
        roles = map_roles(teams)
        log.debug(f"User info from Github: {user_data}\nTeam info from Github: {teams}")
        return {"username": "github_" + user_data.get("login"), "role_keys": roles}

使用基於團隊的授權和 KeyCloak 的範例

以下是在您的 webserver_config.py 中可能擁有的範例

import os
import jwt
import requests
import logging
from base64 import b64decode
from cryptography.hazmat.primitives import serialization
from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH
from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager

log = logging.getLogger(__name__)

AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_USER_REGISTRATION_ROLE = "Viewer"
OIDC_ISSUER = "https://sso.keycloak.me/realms/airflow"

# Make sure you create these role on Keycloak
AUTH_ROLES_MAPPING = {
    "Viewer": ["Viewer"],
    "Admin": ["Admin"],
    "User": ["User"],
    "Public": ["Public"],
    "Op": ["Op"],
}

OAUTH_PROVIDERS = [
    {
        "name": "keycloak",
        "icon": "fa-key",
        "token_key": "access_token",
        "remote_app": {
            "client_id": "airflow",
            "client_secret": "xxx",
            "server_metadata_url": "https://sso.keycloak.me/realms/airflow/.well-known/openid-configuration",
            "api_base_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect",
            "client_kwargs": {"scope": "email profile"},
            "access_token_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/token",
            "authorize_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/auth",
            "request_token_url": None,
        },
    }
]

# Fetch public key
req = requests.get(OIDC_ISSUER)
key_der_base64 = req.json()["public_key"]
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)


class CustomSecurityManager(AirflowSecurityManager):
    def get_oauth_user_info(self, provider, response):
        if provider == "keycloak":
            token = response["access_token"]
            me = jwt.decode(token, public_key, algorithms=["HS256", "RS256"])

            # Extract roles from resource access
            realm_access = me.get("realm_access", {})
            groups = realm_access.get("roles", [])

            log.info("groups: {0}".format(groups))

            if not groups:
                groups = ["Viewer"]

            userinfo = {
                "username": me.get("preferred_username"),
                "email": me.get("email"),
                "first_name": me.get("given_name"),
                "last_name": me.get("family_name"),
                "role_keys": groups,
            }

            log.info("user info: {0}".format(userinfo))

            return userinfo
        else:
            return {}


# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager

此條目是否有幫助?