Opsgenie 警報通知器

使用 OpsgenieNotifier 以發送警報至 Opsgenie。

使用通知器

發送帶有特定訊息的警報至 Opsgenie。

tests/system/opsgenie/example_opsgenie_notifier.py[原始碼]

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.providers.opsgenie.notifications.opsgenie import send_opsgenie_notification
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    "opsgenie_notifier",
    start_date=datetime(2023, 1, 1),
    schedule=None,
    on_failure_callback=[send_opsgenie_notification(payload={"message": "Something went wrong!"})],
) as dag:
    BashOperator(
        task_id="mytask",
        bash_command="fail",
        on_failure_callback=[send_opsgenie_notification(payload={"message": "Something went wrong!"})],
    )

這個條目有幫助嗎?