如何建立您自己的供應商

自訂供應商套件

您可以開發和發布您自己的供應商。您的自訂運算子、Hook、感測器、傳輸運算子可以一起打包在標準 Airflow 套件中,並使用相同的機制安裝。此外,它們也可以使用相同的機制來擴充 Airflow Core,包括驗證後端、自訂連線、日誌記錄、密鑰後端和額外運算子連結,如前一章所述。

供應商文件中所述,自訂供應商可以擴充 Airflow 核心 - 它們可以為運算子新增額外連結以及自訂連線。如果您想將此機制用於您自己的自訂供應商,您可以使用建置您自己的供應商並將它們作為套件安裝。

如何建立供應商

將供應商新增到 Airflow 只是建置 Python 套件並將正確的中繼資料新增到套件中的問題。我們使用標準的 Python 機制來定義進入點。您的套件需要定義適當的進入點 apache_airflow_provider,該進入點必須指向由您的套件實作的可調用物件,並傳回一個字典,其中包含您的套件的可探索功能列表。該字典必須遵循json-schema 規範

大多數 schema 為文件提供了擴充點(您可能也想將其用於自己的目的),但從可擴充性的角度來看,重要的欄位是這些

在 CLI/API 中顯示套件資訊

  • package-name - 供應商的套件名稱。

  • name - 供應商的人性化名稱。

  • description - 供應商的額外描述。

  • version - 套件版本列表(依時間倒序排列)。列表中的第一個版本是目前的套件版本。它取自已安裝套件的版本,而不是來自 provider_info 資訊。

向 Airflow 的核心公開自訂功能

  • extra-links - 此欄位應包含所有新增額外連結功能的運算子類別名稱列表。有關如何為您的運算子新增額外連結功能的描述,請參閱定義運算子額外連結

  • connection-types - 此欄位應包含所有連線類型以及實作這些自訂連線類型的 Hook 類別名稱(提供自訂額外欄位和自訂欄位行為)的列表。此欄位自 Airflow 2.2.0 起可用,它取代了已棄用的 hook-class-names。有關更多詳細資訊,請參閱管理連線

  • secret-backends - 此欄位應包含供應商提供的所有密鑰後端類別名稱的列表。有關如何新增的描述,請參閱密鑰後端

  • task-decorators - 此欄位應包含名稱/路徑字典的列表,其中裝飾器可用。有關如何新增自訂裝飾器的描述,請參閱建立自訂 @task 裝飾器

  • logging - 此欄位應包含供應商提供的所有日誌處理常式類別名稱的列表。有關日誌處理常式的描述,請參閱任務日誌記錄

  • auth-backends - 此欄位應包含 API/UI 的驗證後端模組名稱。有關驗證後端的描述,請參閱API

  • notifications - 此欄位應包含通知類別。有關通知的描述,請參閱建立通知器

  • executors - 此欄位應包含執行器類別名稱。有關執行器的描述,請參閱執行器

  • config - 此欄位應包含符合 airflow/config_templates/config.yml.schema.json 的字典,其中包含供應商提供的組態。有關設定組態的詳細資訊,請參閱設定組態選項

  • filesystems - 此欄位應包含所有檔案系統模組名稱的列表。有關檔案系統的描述,請參閱物件儲存

  • dataset-uris - 此欄位應包含 URI 方案列表以及實作正規化函數的類別名稱。有關資料集 URI 的描述,請參閱資料感知排程

注意

已棄用值

  • hook-class-names (已棄用) - 此欄位應包含所有 Hook 類別名稱的列表,這些名稱提供具有自訂額外欄位和欄位行為的自訂連線類型。hook-class-names 陣列已自 Airflow 2.2.0 起棄用(基於最佳化原因),並將在 Airflow 3 中移除。如果您的供應商目標是 Airflow 2.2.0+,則您不必包含 hook-class-names 陣列,如果您也想以較早版本的 Airflow 2 為目標,則應同時包含 hook-class-namesconnection-types 陣列。有關更多詳細資訊,請參閱管理連線

安裝供應商後,您可以使用 airflow providers 命令查詢已安裝的供應商及其功能。這樣,您可以驗證您的供應商是否已正確識別,以及它們是否正確定義了擴充功能。有關可用 CLI 子命令的詳細資訊,請參閱命令列介面和環境變數參考

當您編寫自己的供應商時,請考慮遵循供應商套件的命名慣例

特殊考量

選用的供應商功能

在 2.3.0 版本中新增: 此功能在 Airflow 2.3+ 版本中可用。

某些供應商可能會提供選用功能,這些功能僅在安裝某些套件或程式庫時才可用。這些功能通常會導致 ImportErrors;但是,這些匯入錯誤應靜默忽略,而不是用錯誤警告污染 Airflow 的日誌。錯誤警告是一種非常糟糕的模式,因為它們往往會變成盲點,因此鼓勵避免錯誤警告。但是,在 Airflow 2.3 之前,Airflow 沒有機制可以選擇性地忽略「已知」的 ImportError。因此,Airflow 2.1 和 2.2 靜默忽略了來自供應商的所有 ImportError,實際上導致忽略了重要的匯入錯誤 - 而沒有向 Airflow 使用者提供有關供應商依賴項中缺少某些內容的線索。

將供應商與動態任務映射搭配使用

Airflow 2.3 新增了動態任務映射,並新增了為每個任務分配唯一鍵的可能性。這表示當此類動態映射任務想要從 XCom 檢索值時(例如,在應計算額外連結的情況下),它應始終檢查傳遞的 ti_key 值是否為 None,然後僅使用 XCom.get_value 檢索 XCom 值。這允許與較早版本的 Airflow 保持向後相容性。

想要保持向後相容性的供應商中存取 XCom 值的典型程式碼應如下所示(請注意 if ti_key is not None: 條件)。

def get_link(
    self,
    operator: BaseOperator,
    dttm: datetime | None = None,
    ti_key: "TaskInstanceKey" | None = None,
):
    if ti_key is not None:
        job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
    else:
        assert dttm is not None
        job_ids = XCom.get_one(
            key="job_id",
            dag_id=operator.dag.dag_id,
            task_id=operator.task_id,
            execution_date=dttm,
        )
    if not job_ids:
        return None
    if len(job_ids) < self.index:
        return None
    job_id = job_ids[self.index]
    return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)

自訂供應商常見問題

當我編寫自己的供應商時,我是否需要做任何特殊的事情才能使其對其他人可用?

除了建立 apache_airflow_provider 進入點以傳回正確格式化的中繼資料(包含 extra-linksconnection-types 欄位(以及已棄用的 hook-class-names 欄位,如果您也以 2.2.0 之前的 Airflow 版本為目標)的字典)之外,您不需要做任何特殊的事情。

在已安裝您的 Python 套件的環境中執行 Airflow 的任何人都可以將該套件用作供應商套件。

我應該專門命名我的供應商,還是應該在 ``airflow.providers`` 套件中建立它?

我們有相當多的(>80 個)由社群管理的供應商,我們將與 Apache Airflow 一起維護它們。所有這些供應商都有明確定義的結構,並遵循我們定義的命名慣例,並且它們都在 airflow.providers 套件中。如果您的目的是貢獻您的供應商,那麼您應該遵循這些慣例,並向 Apache Airflow 發出 PR 以進行貢獻。但是您可以自由使用任何套件名稱,只要與其他名稱沒有衝突,因此最好選擇在您的「網域」中的套件。

我需要做什麼才能將套件變成供應商?

您需要執行以下操作才能將現有的 Python 套件變成供應商(請參閱下面的範例)

  • pyproject.toml 檔案中新增 apache_airflow_provider 進入點 - 這會告訴 Airflow 從哪裡取得所需的供應商中繼資料

  • 建立您在第一步中引用的函數作為套件的一部分:此函數傳回一個字典,其中包含有關您的供應商套件的所有中繼資料

  • 如果您希望 Airflow 在供應商頁面中連結到您的供應商文件,請確保將「project-url/documentation」中繼資料新增到您的套件。這也會在 PyPI 中新增指向您文件的連結。

  • 請注意,該字典應符合 airflow/provider_info.schema.json JSON-schema 規範。社群管理的供應商在那裡有更多用於建立文件的欄位,但執行時期資訊的要求僅包含 schema 中定義的幾個欄位

airflow/provider_info.schema.json

{
    "$schema": "https://json-schema.dev.org.tw/draft-07/schema#",
    "type": "object",
    "properties": {
        "package-name": {
            "description": "Package name available under which the package is available in the PyPI repository.",
            "type": "string"
        },
        "name": {
            "description": "Provider name",
            "type": "string"
        },
        "description": {
            "description": "Information about the package in RST format",
            "type": "string"
        },
        "hook-class-names": {
            "type": "array",
            "description": "Hook class names that provide connection types to core (deprecated by connection-types)",
            "items": {
                "type": "string"
            },
            "deprecated": {
                "description": "The hook-class-names property has been deprecated in favour of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
                "deprecatedVersion": "2.2.0"
            }
        },
        "filesystems": {
            "type": "array",
            "description": "Filesystem module names",
            "items": {
                "type": "string"
            }
        },
        "transfers": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "how-to-guide": {
                        "description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
                        "type": "string"
                    },
                    "source-integration-name": {
                        "type": "string",
                        "description": "Integration name. It must have a matching item in the 'integration' section of any provider."
                    },
                    "target-integration-name": {
                        "type": "string",
                        "description": "Target integration name. It must have a matching item in the 'integration' section of any provider."
                    },
                    "python-module": {
                        "type": "string",
                        "description": "List of python modules containing the transfers."
                    }
                },
                "additionalProperties": false,
                "required": [
                    "source-integration-name",
                    "target-integration-name",
                    "python-module"
                ]
            }
        },
        "triggers": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "integration-name": {
                        "type": "string",
                        "description": "Integration name. It must have a matching item in the 'integration' section of any provider."
                    },
                    "python-modules": {
                        "description": "List of Python modules containing the triggers.",
                        "type": "array",
                        "items": {
                            "type": "string"
                        }
                    }
                },
                "additionalProperties": false,
                "required": [
                    "integration-name",
                    "python-modules"
                ]
            }
        },
        "connection-types": {
            "type": "array",
            "description": "Map of connection types mapped to hook class names.",
            "items": {
                "type": "object",
                "properties": {
                    "connection-type": {
                        "description": "Type of connection defined by the provider",
                        "type": "string"
                    },
                    "hook-class-name": {
                        "description": "Hook class name that implements the connection type",
                        "type": "string"
                    }
                },
                "required": [
                    "connection-type",
                    "hook-class-name"
                ]
            }
        },
        "extra-links": {
            "type": "array",
            "description": "Operator class names that provide extra link functionality",
            "items": {
                "type": "string"
            }
        },
        "secrets-backends": {
            "type": "array",
            "description": "Secrets Backend class names",
            "items": {
                "type": "string"
            }
        },
        "logging": {
            "type": "array",
            "description": "Logging Task Handlers class names",
            "items": {
                "type": "string"
            }
        },
        "auth-backends": {
            "type": "array",
            "description": "API Auth Backend module names",
            "items": {
                "type": "string"
            }
        },
        "auth-managers": {
            "type": "array",
            "description": "Auth managers module names",
            "items": {
                "type": "string"
            }
        },
        "notifications": {
            "type": "array",
            "description": "Notification class names",
            "items": {
                "type": "string"
            }
        },
        "executors": {
            "type": "array",
            "description": "Executor class names",
            "items": {
                "type": "string"
            }
        },
        "config": {
            "type": "object",
            "additionalProperties": {
                "type": "object",
                "properties": {
                    "description": {
                        "type": [
                            "string",
                            "null"
                        ]
                    },
                    "options": {
                        "type": "object",
                        "additionalProperties": {
                            "$ref": "#/definitions/option"
                        }
                    },
                    "renamed": {
                        "type": "object",
                        "properties": {
                            "previous_name": {
                                "type": "string"
                            },
                            "version": {
                                "type": "string"
                            }
                        }
                    }
                },
                "required": [
                    "description",
                    "options"
                ],
                "additionalProperties": false
            }
        },
        "task-decorators": {
            "type": "array",
            "description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
            "items": {
                "name": {
                    "type": "string"
                },
                "path": {
                    "type": "string"
                }
            }
        }
    },
    "definitions": {
        "option": {
            "type": "object",
            "properties": {
                "description": {
                    "type": [
                        "string",
                        "null"
                    ]
                },
                "version_added": {
                    "type": [
                        "string",
                        "null"
                    ]
                },
                "type": {
                    "type": "string",
                    "enum": [
                        "string",
                        "boolean",
                        "integer",
                        "float"
                    ]
                },
                "example": {
                    "type": [
                        "string",
                        "null",
                        "number"
                    ]
                },
                "default": {
                    "type": [
                        "string",
                        "null",
                        "number"
                    ]
                },
                "sensitive": {
                    "type": "boolean",
                    "description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
                }
            },
            "required": [
                "description",
                "version_added",
                "type",
                "example",
                "default"
            ],
            "additional_properties": false
        }
    },
    "required": [
        "name",
        "description"
    ]
}

範例 pyproject.toml

[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"

範例 myproviderpackage/get_provider_info.py

def get_provider_info():
    return {
        "package-name": "my-package-name",
        "name": "name",
        "description": "a description",
        "hook-class-names": [
            "myproviderpackage.hooks.source.SourceHook",
        ],
    }

連線 ID 和類型是否有慣例?

很好的問題。很高興您問了。我們通常遵循 <NAME>_default 作為連線 ID,僅使用 <NAME> 作為連線類型的慣例。一些範例

  • google_cloud_default ID 和 google_cloud_platform 類型

  • aws_default ID 和 aws 類型

您應該遵循此慣例。重要的是,連線類型使用唯一的名稱,因此對於您的供應商而言應該是唯一的。如果兩個供應商嘗試新增具有相同類型的連線,則只有其中一個會成功。

我可以將自己的供應商貢獻給 Apache Airflow 嗎?

答案取決於供應商。我們在 PROVIDERS.rst 開發人員文件中對此制定了政策。

我可以在 Apache Airflow 使用者中宣傳自己的供應商,並在 PyPI 中作為套件與其他人分享嗎?

當然可以!我們的網站上有一個生態系統區域,我們在其中分享非社群管理的擴充功能和 Airflow 的工作。歡迎隨時向該頁面發出 PR 並新增,當我們看到這樣的供應商對 Airflow 使用者社群有用時,我們將評估並合併它。

我可以使用我的供應商收費嗎?

這超出了我們的控制範圍和領域。作為一個 Apache 專案,我們對商業友好,並且圍繞 Apache Airflow 和許多其他 Apache 專案建立了許多業務。作為一個社群,我們免費提供所有軟體,這永遠不會改變。第三方開發人員正在做的事情不在 Apache Airflow 社群的控制之下。

此條目是否有幫助?