建立自訂 Operator

Airflow 允許您建立新的 operators,以滿足您或您團隊的需求。這種可擴展性是使 Apache Airflow 強大的眾多功能之一。

您可以透過擴展 airflow.models.baseoperator.BaseOperator 來建立您想要的任何 operator

在衍生類別中,您需要覆寫兩種方法

  • 建構子 - 定義 operator 所需的參數。您只需要指定 operator 特有的引數。您可以在 DAG 檔案中指定 default_args。請參閱 預設引數 以取得更多詳細資訊。

  • Execute - 當執行器呼叫 operator 時要執行的程式碼。此方法包含 Airflow 環境背景資訊作為參數,可用於讀取組態值。

注意

在實作自訂 operators 時,請勿在 __init__ 方法中進行任何耗費資源的操作。operators 將在每個排程器週期中針對使用它們的每個任務實例化一次,並且進行資料庫呼叫可能會顯著減慢排程速度並浪費資源。

讓我們在一個新檔案 hello_operator.py 中實作一個範例 HelloOperator

from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

注意

為了使匯入能夠運作,您應該將檔案放置在 PYTHONPATH 環境變數中存在的目錄中。Airflow 預設會將 Airflow 首頁中的 dags/plugins/config/ 目錄新增至 PYTHONPATH。例如,在我們的範例中,檔案放置在 custom_operator/ 目錄中。請參閱 模組管理 以取得有關 Python 和 Airflow 如何管理模組的詳細資訊。

您現在可以按如下所示使用衍生的自訂 operator

from custom_operator.hello_operator import HelloOperator

with dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")

您也可以繼續使用您的 plugins 資料夾來儲存您的自訂 operators。如果您在 plugins 資料夾中有檔案 hello_operator.py,您可以按如下所示匯入 operator

from hello_operator import HelloOperator

如果 operator 與外部服務(API、資料庫等)通訊,最好使用 Hooks 實作通訊層。這樣,實作的邏輯可以被其他使用者在不同的 operators 中重複使用。這種方法比為每個外部服務使用 CustomServiceBaseOperator 提供了更好的解耦和新增整合的利用率。

另一個考量是暫時狀態。如果操作需要記憶體內狀態(例如,應該在 on_kill 方法中用於取消請求的 job id),則狀態應保留在 operator 中,而不是在 hook 中。這樣,服務 hook 可以完全無狀態,並且操作的整個邏輯都在一個地方 - 在 operator 中。

Hooks

Hooks 作為介面,用於與 DAG 中的外部共用資源進行通訊。例如,DAG 中的多個任務可能需要存取 MySQL 資料庫。您可以從 hook 檢索連線並加以利用,而無需為每個任務建立連線。Hook 也有助於避免將連線驗證參數儲存在 DAG 中。請參閱 管理連線 以了解如何建立和管理連線,以及 Provider packages 以取得有關如何透過 providers 新增自訂連線類型的詳細資訊。

讓我們擴展先前的範例,以從 MySQL 提取名稱

class HelloDBOperator(BaseOperator):
    def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.mysql_conn_id = mysql_conn_id
        self.database = database

    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
        sql = "select name from user"
        result = hook.get_first(sql)
        message = f"Hello {result['name']}"
        print(message)
        return message

當 operator 在 hook 物件上調用查詢時,如果連線不存在,則會建立一個新連線。hook 從 Airflow 後端檢索驗證參數,例如使用者名稱和密碼,並將參數傳遞給 airflow.hooks.base.BaseHook.get_connection()execute 方法或從 execute 呼叫的任何方法中,您才應該建立 hook。建構子會在 Airflow 剖析 DAG 時被呼叫,這會頻繁發生。並且在那裡實例化 hook 將導致許多不必要的資料庫連線。execute 僅在 DAG 執行期間被呼叫。

使用者介面

Airflow 也允許開發人員控制 operator 在 DAG UI 中的顯示方式。覆寫 ui_color 以變更 UI 中 operator 的背景顏色。覆寫 ui_fgcolor 以變更標籤的顏色。覆寫 custom_operator_name 以將顯示名稱變更為類別名稱以外的名稱。

class HelloOperator(BaseOperator):
    ui_color = "#ff0000"
    ui_fgcolor = "#000000"
    custom_operator_name = "Howdy"
    # ...

範本化

您可以使用 Jinja 範本 來參數化您的 operator。Airflow 在呈現 operator 時,會考量 template_fields 中存在的欄位名稱以進行範本化。

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

您可以按如下所示使用範本

with dag:
    hello_task = HelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="Earth",
    )

在此範例中,Jinja 尋找 name 參數,並將 {{ task_instance.task_id }} 替換為 task_id_1

參數也可以包含檔案名稱,例如 bash 腳本或 SQL 檔案。您需要在 template_ext 中新增檔案的副檔名。如果 template_field 包含以 template_ext 中提及的副檔名結尾的字串,則 Jinja 會讀取檔案的內容並將範本替換為實際值。請注意,Jinja 會替換 operator 屬性,而不是 args。

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("guest_name",)
    template_ext = ".sql"

    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.guest_name = name

在範例中,template_fields 應該是 ['guest_name'],而不是 ['name']

此外,您可以提供 template_fields_renderers 字典,其定義範本欄位中的值在 Web UI 中以何種樣式呈現。例如

class MyRequestOperator(BaseOperator):
    template_fields: Sequence[str] = ("request_body",)
    template_fields_renderers = {"request_body": "json"}

    def __init__(self, request_body: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.request_body = request_body

template_field 本身是字典的情況下,也可以指定以點分隔的索引鍵路徑,以適當地提取和呈現個別元素。例如

class MyConfigOperator(BaseOperator):
    template_fields: Sequence[str] = ("configuration",)
    template_fields_renderers = {
        "configuration": "json",
        "configuration.query.sql": "sql",
    }

    def __init__(self, configuration: dict, **kwargs) -> None:
        super().__init__(**kwargs)
        self.configuration = configuration

然後按如下所示使用此範本

with dag:
    config_task = MyConfigOperator(
        task_id="task_id_1",
        configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
    )

這將導致 UI 將 configuration 呈現為 json,此外,query.sql 中包含的值將使用 SQL 詞法分析器呈現。

../_images/template_field_renderer_path.png

目前可用的詞法分析器

  • bash

  • bash_command

  • doc

  • doc_json

  • doc_md

  • doc_rst

  • doc_yaml

  • doc_md

  • hql

  • html

  • jinja

  • json

  • md

  • mysql

  • postgresql

  • powershell

  • py

  • python_callable

  • rst

  • sql

  • tsql

  • yaml

如果您使用不存在的詞法分析器,則範本欄位的值將呈現為美觀列印的物件。

限制

為了防止濫用,在 operator 的建構子中定義和指派範本欄位時,必須遵守以下限制(當存在建構子時,否則 - 請參閱下方)

1. 傳遞到建構子的範本欄位的對應參數,其名稱必須與欄位完全相同。以下範例無效,因為傳遞到建構子的參數與範本欄位不同

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a_id) -> None:  # <- should be def __init__(field_a)-> None
        self.field_a = field_a_id  # <- should be self.field_a = field_a

2. 範本欄位的實例成員必須使用建構子中的對應參數進行指派,可以透過直接指派或呼叫父系的建構子(在其中這些欄位被定義為 template_fields)並顯式指派參數來完成。以下範例無效,因為實例成員 self.field_a 完全未被指派,儘管它是範本欄位

class HelloOperator(BaseOperator):
    template_fields = ("field_a", "field_b")

    def __init__(field_a, field_b) -> None:
        self.field_b = field_b

以下範例也無效,因為 MyHelloOperator 的實例成員 self.field_a 是作為傳遞到其父系建構子的 kwargs 的一部分隱含地初始化的

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a) -> None:
        self.field_a = field_a


class MyHelloOperator(HelloOperator):
    template_fields = ("field_a", "field_b")

    def __init__(field_b, **kwargs) -> None:  # <- should be def __init__(field_a, field_b, **kwargs)
        super().__init__(**kwargs)  # <- should be super().__init__(field_a=field_a, **kwargs)
        self.field_b = field_b

3. 在建構子中指派期間,不允許對參數套用動作。對值的任何動作都應在 execute() 方法中套用。因此,以下範例無效

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a) -> None:
        self.field_a = field_a.lower()  # <- assignment should be only self.field_a = field_a

當 operator 從基礎 operator 繼承並且沒有定義自己的建構子時,以上限制不適用。但是,範本欄位必須根據這些限制在父系中正確設定。

因此,以下範例有效

class HelloOperator(BaseOperator):
    template_fields = "field_a"

    def __init__(field_a) -> None:
        self.field_a = field_a


class MyHelloOperator(HelloOperator):
    template_fields = "field_a"

以上限制由名為 'validate-operators-init' 的 pre-commit 強制執行。

透過子類別新增範本欄位

建立自訂 operator 的常見使用案例是簡單地擴增現有的 template_fields。可能會有這樣的情況:您想要使用的 operator 未將某些參數定義為範本,但您希望能夠動態地將引數作為 Jinja 運算式傳遞。這可以透過快速子類別化現有的 operator 輕鬆實現。

假設您想要使用先前定義的 HelloOperator

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

但是,您想要動態地參數化 world 引數。由於 template_fields 屬性保證為 Sequence[str] 類型(即字串的列表或元組),您可以子類別化 HelloOperator 以輕鬆地修改所需的 template_fields

class MyHelloOperator(HelloOperator):
    template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")

現在您可以像這樣使用 MyHelloOperator

with dag:
    hello_task = MyHelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="{{ var.value.my_world }}",
    )

在此範例中,world 引數將透過 Jinja 運算式動態設定為名為 “my_world” 的 Airflow 變數的值。

Sensors

Airflow 為一種特殊類型的 operator 提供了一個基本元件,其目的是定期輪詢某些狀態(例如檔案是否存在),直到滿足成功標準。

您可以透過擴展 airflow.sensors.base.BaseSensorOperator 來建立您想要的任何 sensor,並定義一個 poke 方法來輪詢您的外部狀態並評估成功標準。

Sensors 具有一個強大的功能,稱為 'reschedule' 模式,該模式允許重新排程 sensor 的任務,而不是在輪詢之間阻止 worker 插槽。當您可以容忍更長的輪詢間隔並預期長時間輪詢時,這非常有用。

重新排程模式有一個注意事項,即您的 sensor 無法在重新排程的執行之間維護內部狀態。在這種情況下,您應該使用 airflow.sensors.base.poke_mode_only() 裝飾您的 sensor。這將讓使用者知道您的 sensor 不適合與重新排程模式一起使用。

保留內部狀態且無法與重新排程模式一起使用的 sensor 範例是 airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor。它輪詢前綴下的物件數量(此數量是 sensor 的內部狀態),並在經過一定時間且物件數量沒有變更時成功。

此條目是否有幫助?