外掛程式

Airflow 內建一個簡單的外掛程式管理器,只需將檔案放入您的 $AIRFLOW_HOME/plugins 資料夾中,即可將外部功能整合到其核心。

plugins 資料夾中的 Python 模組會被匯入,而 巨集 和 Web 視圖 會整合到 Airflow 的主要集合中,並可供使用。

若要排除外掛程式的問題,您可以使用 airflow plugins 命令。此命令會傾印有關已載入外掛程式的資訊。

在 2.0 版本中變更:不再支援透過 airflow.{operators,sensors,hooks}.<plugin_name> 在外掛程式中加入匯入運算子、感測器、Hook,這些擴充功能應僅作為常規 Python 模組匯入。如需更多資訊,請參閱:模組管理建立自訂運算子

用途?

Airflow 提供了一個用於處理資料的通用工具箱。不同的組織有不同的堆疊和不同的需求。使用 Airflow 外掛程式可以讓公司自訂其 Airflow 安裝,以反映其生態系統。

外掛程式可以用作編寫、分享和啟用新功能集的簡單方法。

也需要一組更複雜的應用程式來與不同類型的資料和元資料互動。

範例

  • 一組用於解析 Hive 日誌並公開 Hive 元資料(CPU / IO / 階段/ 傾斜 /…)的工具

  • 一個異常偵測框架,允許人們收集指標、設定閾值和警報

  • 一個稽核工具,幫助了解誰存取了什麼

  • 一個組態驅動的 SLA 監控工具,允許您設定監控的表格以及它們應該在何時到達,警示人員,並公開中斷的可視化圖表

為何要基於 Airflow 建構?

Airflow 有許多組件可以在建構應用程式時重複使用

  • 一個 Web 伺服器,您可以用來呈現您的視圖

  • 一個元資料資料庫,用於儲存您的模型

  • 存取您的資料庫,以及了解如何連線到它們

  • 一組您的應用程式可以將工作負載推送到的 Worker

  • Airflow 已部署,您可以直接依附在其部署物流上

  • 基本圖表功能、底層函式庫和抽象概念

外掛程式何時(重新)載入?

外掛程式預設為延遲載入,一旦載入,它們永遠不會重新載入(除了 UI 外掛程式在 Web 伺服器中自動載入)。若要在每個 Airflow 程式啟動時載入它們,請在 airflow.cfg 中設定 [core] lazy_load_plugins = False

這表示如果您對外掛程式進行任何變更,並且您希望 Web 伺服器或排程器使用該新程式碼,則需要重新啟動這些程序。但是,在排程器啟動後,它才會反映在新的執行任務中。

預設情況下,任務執行使用 Forking。這避免了與建立新的 Python 直譯器和重新解析所有 Airflow 程式碼和啟動常式相關的效能降低。這種方法提供了顯著的好處,尤其對於較短的任務。這確實表示,如果您在任務中使用外掛程式,並且希望它們更新,您將需要重新啟動 Worker(如果使用 CeleryExecutor)或排程器(Local 或 Sequential Executor)。另一個選項是您可以接受啟動時的速度降低,將 core.execute_tasks_new_python_interpreter 組態設定設為 True,這將導致為任務啟動一個全新的 Python 直譯器。

(另一方面,僅由 DAG 檔案匯入的模組不會遇到此問題,因為 DAG 檔案不會在任何長時間運行的 Airflow 程序中載入/解析。)

介面

若要建立外掛程式,您需要衍生 airflow.plugins_manager.AirflowPlugin 類別,並參考您想要插入 Airflow 的物件。以下是您需要衍生的類別的外觀

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
    flask_blueprints = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
    appbuilder_menu_items = []

    # A callback to perform actions when airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in DAGs.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以透過繼承來衍生它(請參考以下範例)。在範例中,所有選項都已定義為類別屬性,但如果您需要執行額外的初始化,也可以將它們定義為屬性。請注意,此類別內的 name 必須指定。

請確保在變更外掛程式後重新啟動 Web 伺服器和排程器,以便它們生效。

範例

以下程式碼定義了一個外掛程式,該外掛程式在 Airflow 中注入一組說明性物件定義。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
    pass


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Creating flask appbuilder Menu Items
appbuilder_mitem = {
    "name": "Google",
    "href": "https://www.google.com",
    "category": "Search",
}
appbuilder_mitem_toplevel = {
    "name": "Apache",
    "href": "https://www.apache.org/",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]
    macros = [plugin_macro]
    flask_blueprints = [bp]
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
    appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]

從 CSRF 保護中排除視圖

我們強烈建議您應該使用 CSRF 保護所有視圖。但是,如果需要,您可以使用裝飾器排除某些視圖。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

作為 Python 套件的外掛程式

可以透過 setuptools entrypoint 機制載入外掛程式。若要執行此操作,請使用套件中的 entrypoint 連結您的外掛程式。如果套件已安裝,Airflow 將自動從 entrypoint 清單中載入已註冊的外掛程式。

注意

entrypoint 名稱(例如,my_plugin)和外掛程式類別的名稱都不會影響外掛程式本身的模組和類別名稱。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"
    flask_blueprints = [bp]

然後在 pyproject.toml 內

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

自動重新載入 Web 伺服器

若要啟用 Web 伺服器的自動重新載入,當偵測到外掛程式目錄中的變更時,您應該將 [webserver] 區段中的 reload_on_plugin_change 選項設定為 True

注意

有關設定組態的更多資訊,請參閱 設定組態選項

注意

請參閱 模組管理,以了解有關 Python 和 Airflow 如何管理模組的詳細資訊。

疑難排解

您可以使用 Flask CLI 來排除問題。若要執行此操作,您需要將變數 FLASK_APP 設定為 airflow.www.app:create_app

例如,若要列印所有路由,請執行

FLASK_APP=airflow.www.app:create_app flask routes

此條目是否有幫助?