外掛程式¶
Airflow 內建一個簡單的外掛程式管理器,只需將檔案放入您的 $AIRFLOW_HOME/plugins
資料夾中,即可將外部功能整合到其核心。
plugins
資料夾中的 Python 模組會被匯入,而 巨集 和 Web 視圖 會整合到 Airflow 的主要集合中,並可供使用。
若要排除外掛程式的問題,您可以使用 airflow plugins
命令。此命令會傾印有關已載入外掛程式的資訊。
在 2.0 版本中變更:不再支援透過 airflow.{operators,sensors,hooks}.<plugin_name>
在外掛程式中加入匯入運算子、感測器、Hook,這些擴充功能應僅作為常規 Python 模組匯入。如需更多資訊,請參閱:模組管理 和 建立自訂運算子
用途?¶
Airflow 提供了一個用於處理資料的通用工具箱。不同的組織有不同的堆疊和不同的需求。使用 Airflow 外掛程式可以讓公司自訂其 Airflow 安裝,以反映其生態系統。
外掛程式可以用作編寫、分享和啟用新功能集的簡單方法。
也需要一組更複雜的應用程式來與不同類型的資料和元資料互動。
範例
一組用於解析 Hive 日誌並公開 Hive 元資料(CPU / IO / 階段/ 傾斜 /…)的工具
一個異常偵測框架,允許人們收集指標、設定閾值和警報
一個稽核工具,幫助了解誰存取了什麼
一個組態驅動的 SLA 監控工具,允許您設定監控的表格以及它們應該在何時到達,警示人員,並公開中斷的可視化圖表
為何要基於 Airflow 建構?¶
Airflow 有許多組件可以在建構應用程式時重複使用
一個 Web 伺服器,您可以用來呈現您的視圖
一個元資料資料庫,用於儲存您的模型
存取您的資料庫,以及了解如何連線到它們
一組您的應用程式可以將工作負載推送到的 Worker
Airflow 已部署,您可以直接依附在其部署物流上
基本圖表功能、底層函式庫和抽象概念
外掛程式何時(重新)載入?¶
外掛程式預設為延遲載入,一旦載入,它們永遠不會重新載入(除了 UI 外掛程式在 Web 伺服器中自動載入)。若要在每個 Airflow 程式啟動時載入它們,請在 airflow.cfg
中設定 [core] lazy_load_plugins = False
。
這表示如果您對外掛程式進行任何變更,並且您希望 Web 伺服器或排程器使用該新程式碼,則需要重新啟動這些程序。但是,在排程器啟動後,它才會反映在新的執行任務中。
預設情況下,任務執行使用 Forking。這避免了與建立新的 Python 直譯器和重新解析所有 Airflow 程式碼和啟動常式相關的效能降低。這種方法提供了顯著的好處,尤其對於較短的任務。這確實表示,如果您在任務中使用外掛程式,並且希望它們更新,您將需要重新啟動 Worker(如果使用 CeleryExecutor)或排程器(Local 或 Sequential Executor)。另一個選項是您可以接受啟動時的速度降低,將 core.execute_tasks_new_python_interpreter
組態設定設為 True,這將導致為任務啟動一個全新的 Python 直譯器。
(另一方面,僅由 DAG 檔案匯入的模組不會遇到此問題,因為 DAG 檔案不會在任何長時間運行的 Airflow 程序中載入/解析。)
介面¶
若要建立外掛程式,您需要衍生 airflow.plugins_manager.AirflowPlugin
類別,並參考您想要插入 Airflow 的物件。以下是您需要衍生的類別的外觀
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseHook
hooks = []
# A list of references to inject into the macros namespace
macros = []
# A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
flask_blueprints = []
# A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
appbuilder_views = []
# A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
appbuilder_menu_items = []
# A callback to perform actions when airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
pass
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in DAGs.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
您可以透過繼承來衍生它(請參考以下範例)。在範例中,所有選項都已定義為類別屬性,但如果您需要執行額外的初始化,也可以將它們定義為屬性。請注意,此類別內的 name
必須指定。
請確保在變更外掛程式後重新啟動 Web 伺服器和排程器,以便它們生效。
範例¶
以下程式碼定義了一個外掛程式,該外掛程式在 Airflow 中注入一組說明性物件定義。
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
"test_plugin",
__name__,
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
static_folder="static",
static_url_path="/static/test_plugin",
)
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
@expose("/")
@has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
]
)
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
default_view = "test"
@expose("/")
@has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
]
)
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
}
v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}
# Creating flask appbuilder Menu Items
appbuilder_mitem = {
"name": "Google",
"href": "https://www.google.com",
"category": "Search",
}
appbuilder_mitem_toplevel = {
"name": "Apache",
"href": "https://www.apache.org/",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
hooks = [PluginHook]
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
另請參閱
從 CSRF 保護中排除視圖¶
我們強烈建議您應該使用 CSRF 保護所有視圖。但是,如果需要,您可以使用裝飾器排除某些視圖。
from airflow.www.app import csrf
@csrf.exempt
def my_handler():
# ...
return "ok"
作為 Python 套件的外掛程式¶
可以透過 setuptools entrypoint 機制載入外掛程式。若要執行此操作,請使用套件中的 entrypoint 連結您的外掛程式。如果套件已安裝,Airflow 將自動從 entrypoint 清單中載入已註冊的外掛程式。
注意
entrypoint 名稱(例如,my_plugin
)和外掛程式類別的名稱都不會影響外掛程式本身的模組和類別名稱。
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
"test_plugin",
__name__,
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
static_folder="static",
static_url_path="/static/test_plugin",
)
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
flask_blueprints = [bp]
然後在 pyproject.toml 內
[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"