建立自訂 Operator¶
Airflow 允許您建立新的 operators,以滿足您或您團隊的需求。這種可擴展性是使 Apache Airflow 強大的眾多功能之一。
您可以透過擴展 airflow.models.baseoperator.BaseOperator
來建立您想要的任何 operator
在衍生類別中,您需要覆寫兩種方法
建構子 - 定義 operator 所需的參數。您只需要指定 operator 特有的引數。您可以在 DAG 檔案中指定
default_args
。請參閱 預設引數 以取得更多詳細資訊。Execute - 當執行器呼叫 operator 時要執行的程式碼。此方法包含 Airflow 環境背景資訊作為參數,可用於讀取組態值。
注意
在實作自訂 operators 時,請勿在 __init__
方法中進行任何耗費資源的操作。operators 將在每個排程器週期中針對使用它們的每個任務實例化一次,並且進行資料庫呼叫可能會顯著減慢排程速度並浪費資源。
讓我們在一個新檔案 hello_operator.py
中實作一個範例 HelloOperator
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
注意
為了使匯入能夠運作,您應該將檔案放置在 PYTHONPATH
環境變數中存在的目錄中。Airflow 預設會將 Airflow 首頁中的 dags/
、plugins/
和 config/
目錄新增至 PYTHONPATH
。例如,在我們的範例中,檔案放置在 custom_operator/
目錄中。請參閱 模組管理 以取得有關 Python 和 Airflow 如何管理模組的詳細資訊。
您現在可以按如下所示使用衍生的自訂 operator
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
您也可以繼續使用您的 plugins 資料夾來儲存您的自訂 operators。如果您在 plugins 資料夾中有檔案 hello_operator.py
,您可以按如下所示匯入 operator
from hello_operator import HelloOperator
如果 operator 與外部服務(API、資料庫等)通訊,最好使用 Hooks 實作通訊層。這樣,實作的邏輯可以被其他使用者在不同的 operators 中重複使用。這種方法比為每個外部服務使用 CustomServiceBaseOperator
提供了更好的解耦和新增整合的利用率。
另一個考量是暫時狀態。如果操作需要記憶體內狀態(例如,應該在 on_kill
方法中用於取消請求的 job id),則狀態應保留在 operator 中,而不是在 hook 中。這樣,服務 hook 可以完全無狀態,並且操作的整個邏輯都在一個地方 - 在 operator 中。
Hooks¶
Hooks 作為介面,用於與 DAG 中的外部共用資源進行通訊。例如,DAG 中的多個任務可能需要存取 MySQL 資料庫。您可以從 hook 檢索連線並加以利用,而無需為每個任務建立連線。Hook 也有助於避免將連線驗證參數儲存在 DAG 中。請參閱 管理連線 以了解如何建立和管理連線,以及 Provider packages 以取得有關如何透過 providers 新增自訂連線類型的詳細資訊。
讓我們擴展先前的範例,以從 MySQL 提取名稱
class HelloDBOperator(BaseOperator):
def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = f"Hello {result['name']}"
print(message)
return message
當 operator 在 hook 物件上調用查詢時,如果連線不存在,則會建立一個新連線。hook 從 Airflow 後端檢索驗證參數,例如使用者名稱和密碼,並將參數傳遞給 airflow.hooks.base.BaseHook.get_connection()
。execute
方法或從 execute
呼叫的任何方法中,您才應該建立 hook。建構子會在 Airflow 剖析 DAG 時被呼叫,這會頻繁發生。並且在那裡實例化 hook 將導致許多不必要的資料庫連線。execute
僅在 DAG 執行期間被呼叫。
使用者介面¶
Airflow 也允許開發人員控制 operator 在 DAG UI 中的顯示方式。覆寫 ui_color
以變更 UI 中 operator 的背景顏色。覆寫 ui_fgcolor
以變更標籤的顏色。覆寫 custom_operator_name
以將顯示名稱變更為類別名稱以外的名稱。
class HelloOperator(BaseOperator):
ui_color = "#ff0000"
ui_fgcolor = "#000000"
custom_operator_name = "Howdy"
# ...
範本化¶
您可以使用 Jinja 範本 來參數化您的 operator。Airflow 在呈現 operator 時,會考量 template_fields
中存在的欄位名稱以進行範本化。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
您可以按如下所示使用範本
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
在此範例中,Jinja 尋找 name
參數,並將 {{ task_instance.task_id }}
替換為 task_id_1
。
參數也可以包含檔案名稱,例如 bash 腳本或 SQL 檔案。您需要在 template_ext
中新增檔案的副檔名。如果 template_field
包含以 template_ext
中提及的副檔名結尾的字串,則 Jinja 會讀取檔案的內容並將範本替換為實際值。請注意,Jinja 會替換 operator 屬性,而不是 args。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("guest_name",)
template_ext = ".sql"
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
在範例中,template_fields
應該是 ['guest_name']
,而不是 ['name']
此外,您可以提供 template_fields_renderers
字典,其定義範本欄位中的值在 Web UI 中以何種樣式呈現。例如
class MyRequestOperator(BaseOperator):
template_fields: Sequence[str] = ("request_body",)
template_fields_renderers = {"request_body": "json"}
def __init__(self, request_body: str, **kwargs) -> None:
super().__init__(**kwargs)
self.request_body = request_body
在 template_field
本身是字典的情況下,也可以指定以點分隔的索引鍵路徑,以適當地提取和呈現個別元素。例如
class MyConfigOperator(BaseOperator):
template_fields: Sequence[str] = ("configuration",)
template_fields_renderers = {
"configuration": "json",
"configuration.query.sql": "sql",
}
def __init__(self, configuration: dict, **kwargs) -> None:
super().__init__(**kwargs)
self.configuration = configuration
然後按如下所示使用此範本
with dag:
config_task = MyConfigOperator(
task_id="task_id_1",
configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
)
這將導致 UI 將 configuration
呈現為 json,此外,query.sql
中包含的值將使用 SQL 詞法分析器呈現。

目前可用的詞法分析器
bash
bash_command
doc
doc_json
doc_md
doc_rst
doc_yaml
doc_md
hql
html
jinja
json
md
mysql
postgresql
powershell
py
python_callable
rst
sql
tsql
yaml
如果您使用不存在的詞法分析器,則範本欄位的值將呈現為美觀列印的物件。
限制¶
為了防止濫用,在 operator 的建構子中定義和指派範本欄位時,必須遵守以下限制(當存在建構子時,否則 - 請參閱下方)
1. 傳遞到建構子的範本欄位的對應參數,其名稱必須與欄位完全相同。以下範例無效,因為傳遞到建構子的參數與範本欄位不同
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a_id) -> None: # <- should be def __init__(field_a)-> None
self.field_a = field_a_id # <- should be self.field_a = field_a
2. 範本欄位的實例成員必須使用建構子中的對應參數進行指派,可以透過直接指派或呼叫父系的建構子(在其中這些欄位被定義為 template_fields
)並顯式指派參數來完成。以下範例無效,因為實例成員 self.field_a
完全未被指派,儘管它是範本欄位
class HelloOperator(BaseOperator):
template_fields = ("field_a", "field_b")
def __init__(field_a, field_b) -> None:
self.field_b = field_b
以下範例也無效,因為 MyHelloOperator
的實例成員 self.field_a
是作為傳遞到其父系建構子的 kwargs
的一部分隱含地初始化的
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = ("field_a", "field_b")
def __init__(field_b, **kwargs) -> None: # <- should be def __init__(field_a, field_b, **kwargs)
super().__init__(**kwargs) # <- should be super().__init__(field_a=field_a, **kwargs)
self.field_b = field_b
3. 在建構子中指派期間,不允許對參數套用動作。對值的任何動作都應在 execute()
方法中套用。因此,以下範例無效
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a.lower() # <- assignment should be only self.field_a = field_a
當 operator 從基礎 operator 繼承並且沒有定義自己的建構子時,以上限制不適用。但是,範本欄位必須根據這些限制在父系中正確設定。
因此,以下範例有效
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = "field_a"
以上限制由名為 'validate-operators-init' 的 pre-commit 強制執行。
透過子類別新增範本欄位¶
建立自訂 operator 的常見使用案例是簡單地擴增現有的 template_fields
。可能會有這樣的情況:您想要使用的 operator 未將某些參數定義為範本,但您希望能夠動態地將引數作為 Jinja 運算式傳遞。這可以透過快速子類別化現有的 operator 輕鬆實現。
假設您想要使用先前定義的 HelloOperator
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
但是,您想要動態地參數化 world
引數。由於 template_fields
屬性保證為 Sequence[str]
類型(即字串的列表或元組),您可以子類別化 HelloOperator
以輕鬆地修改所需的 template_fields
。
class MyHelloOperator(HelloOperator):
template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")
現在您可以像這樣使用 MyHelloOperator
with dag:
hello_task = MyHelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="{{ var.value.my_world }}",
)
在此範例中,world
引數將透過 Jinja 運算式動態設定為名為 “my_world” 的 Airflow 變數的值。
Sensors¶
Airflow 為一種特殊類型的 operator 提供了一個基本元件,其目的是定期輪詢某些狀態(例如檔案是否存在),直到滿足成功標準。
您可以透過擴展 airflow.sensors.base.BaseSensorOperator
來建立您想要的任何 sensor,並定義一個 poke
方法來輪詢您的外部狀態並評估成功標準。
Sensors 具有一個強大的功能,稱為 'reschedule'
模式,該模式允許重新排程 sensor 的任務,而不是在輪詢之間阻止 worker 插槽。當您可以容忍更長的輪詢間隔並預期長時間輪詢時,這非常有用。
重新排程模式有一個注意事項,即您的 sensor 無法在重新排程的執行之間維護內部狀態。在這種情況下,您應該使用 airflow.sensors.base.poke_mode_only()
裝飾您的 sensor。這將讓使用者知道您的 sensor 不適合與重新排程模式一起使用。
保留內部狀態且無法與重新排程模式一起使用的 sensor 範例是 airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor
。它輪詢前綴下的物件數量(此數量是 sensor 的內部狀態),並在經過一定時間且物件數量沒有變更時成功。