TaskFlow¶
2.0 版本新增功能。
如果您主要使用純 Python 程式碼而不是運算子來編寫 DAG,那麼 TaskFlow API 將使您更容易編寫乾淨的 DAG,而無需額外的樣板程式碼,全部都使用 @task
裝飾器。
TaskFlow 負責處理您的任務之間輸入和輸出的移動,為您使用 XComs,以及自動計算依賴關係 - 當您在 DAG 檔案中呼叫 TaskFlow 函數時,您將獲得一個代表結果 XCom 的物件(XComArg
),而不是執行它,然後您可以將其用作下游任務或運算子的輸入。 例如
from airflow.decorators import task
from airflow.operators.email import EmailOperator
@task
def get_ip():
return my_ip_service.get_main_ip()
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject':f'Server connected from {external_ip}',
'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email_notification',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)
在這裡,有三個任務 - get_ip
、compose_email
和 send_email_notification
。
前兩個是使用 TaskFlow 宣告的,並自動將 get_ip
的傳回值傳遞到 compose_email
,不僅連結了 XCom,還自動宣告 compose_email
是 get_ip
的下游。
send_email_notification
是一個更傳統的運算子,但即使如此,它也可以使用 compose_email
的傳回值來設定其參數,並且再次自動計算出它必須是 compose_email
的下游。
您也可以使用純值或變數來呼叫 TaskFlow 函數 - 例如,這將如您預期的那樣工作(但當然,在 DAG 執行之前不會執行任務內部的程式碼 - name
值會作為任務參數持續存在,直到那時)
@task
def hello_name(name: str):
print(f'Hello {name}!')
hello_name('Airflow users')
如果您想了解更多關於使用 TaskFlow 的資訊,您應該查閱TaskFlow 教學。
上下文¶
您可以透過將 Airflow 上下文變數 作為關鍵字參數添加,如下例所示
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
或者,您可以將 **kwargs
添加到任務的簽名中,所有 Airflow 上下文變數都可以在 kwargs
字典中存取
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(**kwargs): ti: TaskInstance = kwargs["task_instance"] print(f"Run ID: {ti.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {ti.duration}") # Duration: 0.972019 dr: DagRun = kwargs["dag_run"] print(f"DAG Run queued at: {dr.queued_at}") # 2023-08-10 00:00:01+02:20
有關上下文變數的完整列表,請參閱上下文變數。
日誌記錄¶
若要從您的任務函數中使用日誌記錄,只需匯入並使用 Python 的日誌記錄系統
logger = logging.getLogger("airflow.task")
以這種方式建立的每個日誌記錄行都將記錄在任務日誌中。
將任意物件作為參數傳遞¶
2.5.0 版本新增功能。
如前所述,TaskFlow 使用 XCom 將變數傳遞到每個任務。 這要求用作參數的變數需要能夠被序列化。 Airflow 開箱即用支援所有內建類型(例如 int 或 str),並且它支援使用 @dataclass
或 @attr.define
裝飾的物件。 以下範例顯示了 Dataset
的使用,它是用 @attr.define
裝飾的,與 TaskFlow 一起使用。
注意
使用 Dataset
的另一個好處是,如果它用作輸入參數,它會自動註冊為 inlet
。 如果任務的傳回值是 dataset
或 list[Dataset]]
,它也會自動註冊為 outlet
。
import json
import pendulum
import requests
from airflow import Dataset
from airflow.decorators import dag, task
SRC = Dataset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
@task()
def retrieve(src: Dataset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, celsius in temps.items():
ret[year] = float(celsius) * 1.8 + 32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Dataset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Dataset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
etl()
自訂物件¶
您可能想要傳遞自訂物件。 通常,您會使用 @dataclass
或 @attr.define
裝飾您的類別,而 Airflow 將會弄清楚它需要做什麼。 有時您可能想要自己控制序列化。 若要執行此操作,請將 serialize()
方法新增至您的類別,並將靜態方法 deserialize(data: dict, version: int)
新增至您的類別。 像這樣
from typing import ClassVar
class MyCustom:
__version__: ClassVar[int] = 1
def __init__(self, x):
self.x = x
def serialize(self) -> dict:
return dict({"x": self.x})
@staticmethod
def deserialize(data: dict, version: int):
if version > 1:
raise TypeError(f"version > {MyCustom.version}")
return MyCustom(data["x"])
物件版本控制¶
對將在序列化中使用的物件進行版本控制是一個好習慣。 若要執行此操作,請將 __version__: ClassVar[int] = <x>
新增至您的類別。 Airflow 假設您的類別是向後相容的,因此版本 2 能夠反序列化版本 1。 如果您需要自訂邏輯進行反序列化,請確保指定 deserialize(data: dict, version: int)
。
注意
__version__
的類型是必需的,並且需要是 ClassVar[int]
歷史¶
TaskFlow API 是 Airflow 2.0 的新功能,您可能會遇到為先前版本的 Airflow 編寫的 DAG,這些 DAG 反而使用 PythonOperator
來實現類似目標,儘管程式碼量更多。
關於 TaskFlow API 的新增和設計的更多上下文,可以在其 Airflow 改善提案 AIP-31:「TaskFlow API」用於更清晰/更簡單的 DAG 定義 中找到