定義運算子額外連結¶
如果您想為運算子新增更多連結,您可以透過外掛程式或供應商套件來定義它們。額外連結將顯示在網格視圖的任務詳細資訊頁面中。

以下程式碼示範如何透過外掛程式為運算子新增額外連結
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
運算子額外連結應透過 Airflow 外掛程式或自訂 Airflow 供應商註冊才能運作。
您也可以新增全域運算子額外連結,該連結將透過 Airflow 外掛程式或 Airflow 供應商提供給所有運算子使用。您可以在外掛程式介面和供應商套件中了解更多相關資訊。
您可以在額外連結中查看透過社群管理的供應商提供的所有額外連結。
新增或覆寫現有運算子的連結¶
您也可以透過 Airflow 外掛程式或自訂供應商為現有運算子新增(或覆寫)額外連結。
例如,以下 Airflow 外掛程式將在使用 GCSToS3Operator
運算子的所有任務上新增運算子連結。
將運算子連結新增至現有運算子 plugins/extra_link.py
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆寫現有運算子的運算子連結:
也可以透過外掛程式替換運算子上的內建連結。例如,BigQueryExecuteQueryOperator
包含一個指向 Google Cloud Console 的連結,但如果我們想變更該連結,我們可以
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = "BigQuery Console"
operators = [BigQueryOperator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
job_id = XCom.get_one(ti_key=ti_key, key="job_id")
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryConsoleLink(),
]
透過供應商新增運算子連結
如供應商套件中所述,當您建立自己的 Airflow 供應商時,您可以指定提供額外連結功能的運算子清單。這透過在供應商套件中繼資料中儲存的 provider-info
資訊中包含運算子類別名稱來完成
您的 provider-info 字典中需要的範例中繼資料(這是目前 apache-airflow-providers-google
供應商傳回的中繼資料的一部分
extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
您可以根據需要包含任意數量的具有額外連結的運算子。