Google Cloud Stackdriver 運算子

先決條件任務

若要使用這些運算子,您必須執行幾件事

StackdriverListAlertPoliciesOperator

使用 StackdriverListAlertPoliciesOperator 以擷取由指定篩選器識別的所有警示政策。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

list_alert_policies = StackdriverListAlertPoliciesOperator(
    task_id="list-alert-policies",
)

StackdriverEnableAlertPoliciesOperator

使用 StackdriverEnableAlertPoliciesOperator 以啟用由指定篩選器識別的警示政策。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

enable_alert_policy = StackdriverEnableAlertPoliciesOperator(
    task_id="enable-alert-policies",
    filter_=f'(displayName="{ALERT_1_NAME}" OR displayName="{ALERT_2_NAME}")',
)

StackdriverDisableAlertPoliciesOperator

使用 StackdriverDisableAlertPoliciesOperator 以停用由指定篩選器識別的警示政策。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

disable_alert_policy = StackdriverDisableAlertPoliciesOperator(
    task_id="disable-alert-policies",
    filter_=f'displayName="{ALERT_1_NAME}"',
)

StackdriverUpsertAlertOperator

使用 StackdriverUpsertAlertOperator 以更新插入由給定篩選器 JSON 字串識別的警示政策。如果具有給定名稱的警示已存在,則運算子會更新現有的政策,否則會建立新的政策。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

create_alert_policy = StackdriverUpsertAlertOperator(
    task_id="create-alert-policies",
    alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
)

StackdriverDeleteAlertOperator

使用 StackdriverDeleteAlertOperator 以刪除由給定名稱識別的警示政策。

使用運算子

要刪除的警示名稱應以 projects/<PROJECT_NAME>/alertPolicies/<ALERT_NAME> 格式給定

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

delete_alert_policy = StackdriverDeleteAlertOperator(
    task_id="delete-alert-policy",
    name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)

StackdriverListNotificationChannelsOperator

使用 StackdriverListNotificationChannelsOperator 以擷取由給定篩選器識別的所有通知管道。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

list_notification_channel = StackdriverListNotificationChannelsOperator(
    task_id="list-notification-channel", filter_='type="pubsub"'
)

StackdriverEnableNotificationChannelsOperator

使用 StackdriverEnableNotificationChannelsOperator 以啟用由給定篩選器識別的通知管道。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

enable_notification_channel = StackdriverEnableNotificationChannelsOperator(
    task_id="enable-notification-channel", filter_='type="pubsub"'
)

StackdriverDisableNotificationChannelsOperator

使用 StackdriverDisableNotificationChannelsOperator 以停用由給定篩選器識別的通知管道。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverUpsertNotificationChannelOperator

使用 StackdriverUpsertNotificationChannelOperator 以更新插入由給定管道 JSON 字串識別的通知管道。如果具有給定名稱的管道已存在,則運算子會更新現有的管道,否則會建立新的管道。

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverDeleteNotificationChannelOperator

要刪除的警示名稱應以 projects/<PROJECT_NAME>/notificationChannels/<CHANNEL_NAME> 格式給定

使用運算子

您可以使用此運算子,無論是否帶有專案 ID,來擷取所有警示政策。如果缺少專案 ID,將會從使用的 Google Cloud 連線中擷取。

tests/system/google/cloud/stackdriver/example_stackdriver.py[原始碼]

delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
    task_id="delete-notification-channel",
    name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)

這篇文章是否有幫助?