如何建立您自己的供應商¶
自訂供應商套件¶
您可以開發和發布您自己的供應商。您的自訂運算子、Hook、感測器、傳輸運算子可以一起打包在標準 Airflow 套件中,並使用相同的機制安裝。此外,它們也可以使用相同的機制來擴充 Airflow Core,包括驗證後端、自訂連線、日誌記錄、密鑰後端和額外運算子連結,如前一章所述。
如供應商文件中所述,自訂供應商可以擴充 Airflow 核心 - 它們可以為運算子新增額外連結以及自訂連線。如果您想將此機制用於您自己的自訂供應商,您可以使用建置您自己的供應商並將它們作為套件安裝。
如何建立供應商¶
將供應商新增到 Airflow 只是建置 Python 套件並將正確的中繼資料新增到套件中的問題。我們使用標準的 Python 機制來定義進入點。您的套件需要定義適當的進入點 apache_airflow_provider
,該進入點必須指向由您的套件實作的可調用物件,並傳回一個字典,其中包含您的套件的可探索功能列表。該字典必須遵循json-schema 規範。
大多數 schema 為文件提供了擴充點(您可能也想將其用於自己的目的),但從可擴充性的角度來看,重要的欄位是這些
在 CLI/API 中顯示套件資訊
package-name
- 供應商的套件名稱。name
- 供應商的人性化名稱。description
- 供應商的額外描述。version
- 套件版本列表(依時間倒序排列)。列表中的第一個版本是目前的套件版本。它取自已安裝套件的版本,而不是來自 provider_info 資訊。
向 Airflow 的核心公開自訂功能
extra-links
- 此欄位應包含所有新增額外連結功能的運算子類別名稱列表。有關如何為您的運算子新增額外連結功能的描述,請參閱定義運算子額外連結。connection-types
- 此欄位應包含所有連線類型以及實作這些自訂連線類型的 Hook 類別名稱(提供自訂額外欄位和自訂欄位行為)的列表。此欄位自 Airflow 2.2.0 起可用,它取代了已棄用的hook-class-names
。有關更多詳細資訊,請參閱管理連線。secret-backends
- 此欄位應包含供應商提供的所有密鑰後端類別名稱的列表。有關如何新增的描述,請參閱密鑰後端。task-decorators
- 此欄位應包含名稱/路徑字典的列表,其中裝飾器可用。有關如何新增自訂裝飾器的描述,請參閱建立自訂 @task 裝飾器。logging
- 此欄位應包含供應商提供的所有日誌處理常式類別名稱的列表。有關日誌處理常式的描述,請參閱任務日誌記錄。auth-backends
- 此欄位應包含 API/UI 的驗證後端模組名稱。有關驗證後端的描述,請參閱API。notifications
- 此欄位應包含通知類別。有關通知的描述,請參閱建立通知器。executors
- 此欄位應包含執行器類別名稱。有關執行器的描述,請參閱執行器。config
- 此欄位應包含符合airflow/config_templates/config.yml.schema.json
的字典,其中包含供應商提供的組態。有關設定組態的詳細資訊,請參閱設定組態選項。filesystems
- 此欄位應包含所有檔案系統模組名稱的列表。有關檔案系統的描述,請參閱物件儲存。
dataset-uris
- 此欄位應包含 URI 方案列表以及實作正規化函數的類別名稱。有關資料集 URI 的描述,請參閱資料感知排程。
注意
已棄用值
hook-class-names
(已棄用) - 此欄位應包含所有 Hook 類別名稱的列表,這些名稱提供具有自訂額外欄位和欄位行為的自訂連線類型。hook-class-names
陣列已自 Airflow 2.2.0 起棄用(基於最佳化原因),並將在 Airflow 3 中移除。如果您的供應商目標是 Airflow 2.2.0+,則您不必包含hook-class-names
陣列,如果您也想以較早版本的 Airflow 2 為目標,則應同時包含hook-class-names
和connection-types
陣列。有關更多詳細資訊,請參閱管理連線。
安裝供應商後,您可以使用 airflow providers
命令查詢已安裝的供應商及其功能。這樣,您可以驗證您的供應商是否已正確識別,以及它們是否正確定義了擴充功能。有關可用 CLI 子命令的詳細資訊,請參閱命令列介面和環境變數參考。
當您編寫自己的供應商時,請考慮遵循供應商套件的命名慣例
特殊考量¶
選用的供應商功能¶
在 2.3.0 版本中新增: 此功能在 Airflow 2.3+ 版本中可用。
某些供應商可能會提供選用功能,這些功能僅在安裝某些套件或程式庫時才可用。這些功能通常會導致 ImportErrors
;但是,這些匯入錯誤應靜默忽略,而不是用錯誤警告污染 Airflow 的日誌。錯誤警告是一種非常糟糕的模式,因為它們往往會變成盲點,因此鼓勵避免錯誤警告。但是,在 Airflow 2.3 之前,Airflow 沒有機制可以選擇性地忽略「已知」的 ImportError。因此,Airflow 2.1 和 2.2 靜默忽略了來自供應商的所有 ImportError,實際上導致忽略了重要的匯入錯誤 - 而沒有向 Airflow 使用者提供有關供應商依賴項中缺少某些內容的線索。
將供應商與動態任務映射搭配使用¶
Airflow 2.3 新增了動態任務映射,並新增了為每個任務分配唯一鍵的可能性。這表示當此類動態映射任務想要從 XCom 檢索值時(例如,在應計算額外連結的情況下),它應始終檢查傳遞的 ti_key 值是否為 None,然後僅使用 XCom.get_value 檢索 XCom 值。這允許與較早版本的 Airflow 保持向後相容性。
想要保持向後相容性的供應商中存取 XCom 值的典型程式碼應如下所示(請注意 if ti_key is not None:
條件)。
def get_link( self, operator: BaseOperator, dttm: datetime | None = None, ti_key: "TaskInstanceKey" | None = None, ): if ti_key is not None: job_ids = XCom.get_value(key="job_id", ti_key=ti_key) else: assert dttm is not None job_ids = XCom.get_one( key="job_id", dag_id=operator.dag.dag_id, task_id=operator.task_id, execution_date=dttm, ) if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
自訂供應商常見問題¶
當我編寫自己的供應商時,我是否需要做任何特殊的事情才能使其對其他人可用?
除了建立 apache_airflow_provider
進入點以傳回正確格式化的中繼資料(包含 extra-links
和 connection-types
欄位(以及已棄用的 hook-class-names
欄位,如果您也以 2.2.0 之前的 Airflow 版本為目標)的字典)之外,您不需要做任何特殊的事情。
在已安裝您的 Python 套件的環境中執行 Airflow 的任何人都可以將該套件用作供應商套件。
我應該專門命名我的供應商,還是應該在 ``airflow.providers`` 套件中建立它?
我們有相當多的(>80 個)由社群管理的供應商,我們將與 Apache Airflow 一起維護它們。所有這些供應商都有明確定義的結構,並遵循我們定義的命名慣例,並且它們都在 airflow.providers
套件中。如果您的目的是貢獻您的供應商,那麼您應該遵循這些慣例,並向 Apache Airflow 發出 PR 以進行貢獻。但是您可以自由使用任何套件名稱,只要與其他名稱沒有衝突,因此最好選擇在您的「網域」中的套件。
我需要做什麼才能將套件變成供應商?
您需要執行以下操作才能將現有的 Python 套件變成供應商(請參閱下面的範例)
在
pyproject.toml
檔案中新增apache_airflow_provider
進入點 - 這會告訴 Airflow 從哪裡取得所需的供應商中繼資料建立您在第一步中引用的函數作為套件的一部分:此函數傳回一個字典,其中包含有關您的供應商套件的所有中繼資料
如果您希望 Airflow 在供應商頁面中連結到您的供應商文件,請確保將「project-url/documentation」中繼資料新增到您的套件。這也會在 PyPI 中新增指向您文件的連結。
請注意,該字典應符合
airflow/provider_info.schema.json
JSON-schema 規範。社群管理的供應商在那裡有更多用於建立文件的欄位,但執行時期資訊的要求僅包含 schema 中定義的幾個欄位
airflow/provider_info.schema.json
{
"$schema": "https://json-schema.dev.org.tw/draft-07/schema#",
"type": "object",
"properties": {
"package-name": {
"description": "Package name available under which the package is available in the PyPI repository.",
"type": "string"
},
"name": {
"description": "Provider name",
"type": "string"
},
"description": {
"description": "Information about the package in RST format",
"type": "string"
},
"hook-class-names": {
"type": "array",
"description": "Hook class names that provide connection types to core (deprecated by connection-types)",
"items": {
"type": "string"
},
"deprecated": {
"description": "The hook-class-names property has been deprecated in favour of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
"deprecatedVersion": "2.2.0"
}
},
"filesystems": {
"type": "array",
"description": "Filesystem module names",
"items": {
"type": "string"
}
},
"transfers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"how-to-guide": {
"description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
"type": "string"
},
"source-integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"target-integration-name": {
"type": "string",
"description": "Target integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-module": {
"type": "string",
"description": "List of python modules containing the transfers."
}
},
"additionalProperties": false,
"required": [
"source-integration-name",
"target-integration-name",
"python-module"
]
}
},
"triggers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "List of Python modules containing the triggers.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"connection-types": {
"type": "array",
"description": "Map of connection types mapped to hook class names.",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection defined by the provider",
"type": "string"
},
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
}
},
"required": [
"connection-type",
"hook-class-name"
]
}
},
"extra-links": {
"type": "array",
"description": "Operator class names that provide extra link functionality",
"items": {
"type": "string"
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
"items": {
"type": "string"
}
},
"logging": {
"type": "array",
"description": "Logging Task Handlers class names",
"items": {
"type": "string"
}
},
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
"items": {
"type": "string"
}
},
"auth-managers": {
"type": "array",
"description": "Auth managers module names",
"items": {
"type": "string"
}
},
"notifications": {
"type": "array",
"description": "Notification class names",
"items": {
"type": "string"
}
},
"executors": {
"type": "array",
"description": "Executor class names",
"items": {
"type": "string"
}
},
"config": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"options": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/option"
}
},
"renamed": {
"type": "object",
"properties": {
"previous_name": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
},
"required": [
"description",
"options"
],
"additionalProperties": false
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
}
},
"definitions": {
"option": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"version_added": {
"type": [
"string",
"null"
]
},
"type": {
"type": "string",
"enum": [
"string",
"boolean",
"integer",
"float"
]
},
"example": {
"type": [
"string",
"null",
"number"
]
},
"default": {
"type": [
"string",
"null",
"number"
]
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
"description",
"version_added",
"type",
"example",
"default"
],
"additional_properties": false
}
},
"required": [
"name",
"description"
]
}
範例 pyproject.toml
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"
範例 myproviderpackage/get_provider_info.py
def get_provider_info():
return {
"package-name": "my-package-name",
"name": "name",
"description": "a description",
"hook-class-names": [
"myproviderpackage.hooks.source.SourceHook",
],
}
連線 ID 和類型是否有慣例?
很好的問題。很高興您問了。我們通常遵循 <NAME>_default
作為連線 ID,僅使用 <NAME>
作為連線類型的慣例。一些範例
google_cloud_default
ID 和google_cloud_platform
類型aws_default
ID 和aws
類型
您應該遵循此慣例。重要的是,連線類型使用唯一的名稱,因此對於您的供應商而言應該是唯一的。如果兩個供應商嘗試新增具有相同類型的連線,則只有其中一個會成功。
我可以將自己的供應商貢獻給 Apache Airflow 嗎?
答案取決於供應商。我們在 PROVIDERS.rst 開發人員文件中對此制定了政策。
我可以在 Apache Airflow 使用者中宣傳自己的供應商,並在 PyPI 中作為套件與其他人分享嗎?
當然可以!我們的網站上有一個生態系統區域,我們在其中分享非社群管理的擴充功能和 Airflow 的工作。歡迎隨時向該頁面發出 PR 並新增,當我們看到這樣的供應商對 Airflow 使用者社群有用時,我們將評估並合併它。
我可以使用我的供應商收費嗎?
這超出了我們的控制範圍和領域。作為一個 Apache 專案,我們對商業友好,並且圍繞 Apache Airflow 和許多其他 Apache 專案建立了許多業務。作為一個社群,我們免費提供所有軟體,這永遠不會改變。第三方開發人員正在做的事情不在 Apache Airflow 社群的控制之下。