建立通知器

BaseNotifier 是一個抽象類別,為在 Airflow 中使用各種 on_*__callback 發送通知提供基本結構。它旨在讓供應商擴展並針對其特定需求進行自訂。

若要擴展 BaseNotifier 類別,您需要建立一個新的類別來繼承它。在這個新的類別中,您應該覆寫 notify 方法,並使用您自己的實作來發送通知。notify 方法接受單一參數,即 Airflow 環境,其中包含有關當前任務和執行的資訊。

您也可以設定 template_fields 屬性,以指定哪些屬性應呈現為範本。

以下是如何建立 Notifier 類別的範例

from airflow.notifications.basenotifier import BaseNotifier
from my_provider import send_message


class MyNotifier(BaseNotifier):
    template_fields = ("message",)

    def __init__(self, message):
        self.message = message

    def notify(self, context):
        # Send notification here, below is an example
        title = f"Task {context['task_instance'].task_id} failed"
        send_message(title, self.message)

使用通知器

一旦您有了通知器實作,您就可以在 DAG 定義中使用它,方法是將其作為參數傳遞給 on_*_callbacks。例如,您可以將其與 on_success_callbackon_failure_callback 一起使用,以根據任務或 DAG 執行的狀態發送通知。

以下是使用上述通知器的範例

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
    dag_id="example_notifier",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    on_success_callback=MyNotifier(message="Success!"),
    on_failure_callback=MyNotifier(message="Failure!"),
):
    task = BashOperator(
        task_id="example_task",
        bash_command="exit 1",
        on_success_callback=MyNotifier(message="Task Succeeded!"),
    )

如需社群管理的通知器列表,請參閱 通知

此條目是否有幫助?