使用 TaskFlow¶
本教學課程建立在常規 Airflow 教學課程的基礎上,特別著重於使用 TaskFlow API 範例撰寫資料管線,TaskFlow API 範例作為 Airflow 2.0 的一部分引入,並與使用傳統範例撰寫的 DAG 進行比較。
此處選擇的資料管線是一個簡單的模式,包含三個獨立的「提取 (Extract)」、「轉換 (Transform)」和「載入 (Load)」任務。
範例「TaskFlow API」管線¶
以下是一個非常簡單的使用 TaskFlow API 範例的管線。更詳細的說明如下。
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
這是一個 DAG 定義檔¶
如果您是第一次查看 DAG 檔案,請注意這個 Python 腳本是由 Airflow 解譯的,並且是您的資料管線的組態檔。如需 DAG 檔案的完整介紹,請查看核心基礎教學課程,其中廣泛涵蓋了 DAG 結構和定義。
實例化 DAG¶
我們正在建立一個 DAG,它是任務的集合,任務之間存在依賴關係。這是一個非常簡單的定義,因為我們只希望在我們使用 Airflow 進行設定時執行 DAG,而無需任何重試或複雜的排程。在本範例中,請注意我們正在使用 @dag
裝飾器建立此 DAG,如下所示,Python 函數名稱充當 DAG 識別碼。
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
現在,為了實際啟用它作為 DAG 執行,我們調用使用 @dag
裝飾器先前設定的 Python 函數 tutorial_taskflow_api
,如下所示。
tutorial_taskflow_api()
在版本 2.4 中變更:如果 DAG 在 with
區塊內使用,或者如果是 @dag
裝飾函數的結果,則不再需要將 DAG「註冊」到全域變數中,Airflow 才能偵測到 DAG。
任務¶
在這個資料管線中,任務是根據使用 @task
裝飾器的 Python 函數建立的,如下所示。函數名稱充當任務的唯一識別碼。
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
傳回值(在本例中是一個字典)將可用於後續任務中。
「轉換」和「載入」任務的建立方式與上面顯示的「提取」任務相同。
DAG 的主要流程¶
現在我們已經根據 Python 函數定義了「提取」、「轉換」和「載入」任務,我們可以繼續 DAG 的主要部分。
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
就這樣,我們完成了!我們調用了「提取」任務,從中獲取了訂單資料,並將其發送到「轉換」任務進行摘要,然後使用摘要資料調用了「載入」任務。任務之間的依賴關係以及在這些任務之間傳遞資料(這些任務可能在網路上不同節點上的不同工作人員上執行)都由 Airflow 處理。
現在,為了實際啟用它作為 DAG 執行,我們調用使用 @dag
裝飾器先前設定的 Python 函數 tutorial_taskflow_api
,如下所示。
tutorial_taskflow_api()
但是如何運作?¶
對於經驗豐富的 Airflow DAG 作者來說,這簡單得令人驚訝!讓我們將其與 Airflow 2.0 之前必須編寫此 DAG 的方式進行比較,如下所示
import json
import textwrap
import pendulum
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
"tutorial_dag",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={"retries": 2},
description="DAG tutorial",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
def transform(**kwargs):
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
def load(**kwargs):
ti = kwargs["ti"]
total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
total_order_value = json.loads(total_value_string)
print(total_order_value)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
"""\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
"""\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
)
load_task.doc_md = textwrap.dedent(
"""\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)
extract_task >> transform_task >> load_task
上面顯示的所有處理都在新的 Airflow 2.0 DAG 中完成,但它都從 DAG 開發人員那裡抽象出來了。
讓我們透過隔離查看「轉換」任務來詳細檢查這一點,因為它位於資料管線的中間。在 Airflow 1.x 中,此任務的定義如下所示
def transform(**kwargs):
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
正如我們在這裡看到的,在「轉換」函數中處理的資料是使用 XCom 變數傳遞給它的。反過來,「轉換」函數中的摘要資料也放入另一個 XCom 變數中,然後「載入」任務將使用該變數。
將其與 Airflow 2.0 中的 TaskFlow API 進行比較,如下所示。
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
在 Airflow 2.0 中,所有用於在這些任務之間傳遞資料的 XCom 使用方式都從 DAG 作者那裡抽象出來了。但是,XCom 變數在幕後使用,並且可以使用 Airflow UI 檢視,以根據需要進行偵錯或 DAG 監控。
同樣,任務依賴關係會在 TaskFlow 內根據任務的功能調用自動產生。在 Airflow 1.x 中,必須顯式建立任務並指定依賴關係,如下所示。
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
"""\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
"""\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
)
load_task.doc_md = textwrap.dedent(
"""\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)
extract_task >> transform_task >> load_task
相反,使用 Airflow 2.0 中的 TaskFlow API,調用本身會自動產生依賴關係,如下所示。
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
重複使用已裝飾的任務¶
已裝飾的任務非常靈活。您可以在多個 DAG 中重複使用已裝飾的任務,覆寫任務參數,例如 task_id
、queue
、pool
等。
以下是如何在多個 DAG 中重複使用已裝飾任務的範例
from airflow.decorators import task, dag
from datetime import datetime
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
@dag(start_date=datetime(2022, 1, 1))
def mydag2():
start = add_task(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)
first_dag = mydag()
second_dag = mydag2()
您也可以匯入上述 add_task
並在另一個 DAG 檔案中使用它。假設 add_task
程式碼位於名為 common.py
的檔案中。您可以執行此操作
from common import add_task
from airflow.decorators import dag
from datetime import datetime
@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
start = add_task.override(priority_weight=3)(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)
created_dag = use_add_task()
將 TaskFlow API 與複雜/衝突的 Python 依賴項搭配使用¶
如果您的任務需要複雜或衝突的需求,那麼您將能夠將 TaskFlow API 與 Python 虛擬環境 (自 2.0.2 起)、Docker 容器 (自 2.2.0 起)、ExternalPythonOperator (自 2.4.0 起) 或 KubernetesPodOperator (自 2.4.0 起) 搭配使用。
此功能允許 TaskFlow API 更全面的使用案例,因為您不受 Airflow 工作人員的套件和系統程式庫的限制。對於下面描述的已裝飾函數的所有情況,您都必須確保函數是可序列化的,並且它們僅對您使用的其他依賴項使用本機匯入。這些匯入的其他程式庫必須在目標環境中可用 - 它們不需要在主要的 Airflow 環境中可用。
您應該使用哪個運算子取決於幾個因素
您是否正在執行可以存取 Docker 引擎或 Kubernetes 的 Airflow
您是否可以承擔動態建立具有新依賴項的虛擬環境的額外負擔
您是否可以為所有 Airflow 組件部署預先存在的、不可變的 Python 環境。
這些選項應該可以讓希望保持工作流程更簡單、更 Pythonic 的使用者擁有更大的靈活性 - 並讓您可以將 DAG 的完整邏輯保留在 DAG 本身中。
您也可以在處理衝突/複雜 Python 依賴項的最佳實務中取得更多關於管理衝突依賴項方法的上下文,包括對每個選項的界限和後果的更詳細說明
為每個任務動態建立 Virtualenv¶
最簡單的方法是在同一部機器上動態建立(每次執行任務時)一個單獨的虛擬環境,您可以使用 @task.virtualenv
裝飾器。該裝飾器允許您動態建立一個新的 virtualenv,其中包含自訂程式庫,甚至可以使用不同的 Python 版本來執行您的函數。
範例(動態建立的 virtualenv)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
使用具有預先安裝依賴項的 Python 環境¶
更複雜的 @task.external_python
裝飾器允許您在預先定義的、不可變的 virtualenv(或在系統層級安裝且沒有 virtualenv 的 Python 二進位檔)中執行 Airflow 任務。這個 virtualenv 或系統 python 也可以安裝不同的自訂程式庫集,並且必須在可以執行相同位置任務的所有工作人員中提供。
使用 @task.external_python
的範例(使用不可變的、預先存在的 virtualenv)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
使用 Docker Operator 進行依賴項隔離¶
如果您的 Airflow 工作人員可以存取 docker 引擎,您可以改為使用 DockerOperator
並新增任何需要的引數以正確執行任務。請注意,docker 映像必須安裝可運作的 Python,並將 bash 命令作為 command
引數。
值得注意的是,Python 原始碼(從已裝飾的函數中提取)和任何可調用引數都透過(編碼和 pickle 化的)環境變數傳送到容器,因此這些引數的長度不是無限的(確切的限制取決於系統設定)。
以下是使用 @task.docker
裝飾器執行 Python 任務的範例。
@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
關於使用運算子的注意事項
注意
在較早版本的 Airflow 中使用 @task.docker
裝飾器
由於 @task.docker
裝飾器在 docker 提供者中可用,您可能會想在 2.2 之前的 Airflow 版本中使用它,但這不會起作用。如果您嘗試,您將收到此錯誤
AttributeError: '_TaskDecorator' object has no attribute 'docker'
您應該升級到 Airflow 2.2 或以上版本才能使用它。
使用 Kubernetes Pod Operator 進行依賴項隔離¶
如果您的 Airflow 工作人員可以存取 Kubernetes,您可以改為使用 KubernetesPodOperator
並新增任何需要的引數以正確執行任務。
以下是使用 @task.kubernetes
裝飾器執行 Python 任務的範例。
@task.kubernetes(
image="python:3.8-slim-buster",
name="k8s_test",
namespace="default",
in_cluster=False,
config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
import time
print("Hello from k8s pod")
time.sleep(2)
@task.kubernetes(image="python:3.8-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
n = 5
for i in range(n):
# inner loop to handle number of columns
# values changing acc. to outer loop
for _ in range(i + 1):
# printing stars
print("* ", end="")
# ending line after each row
print("\r")
execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()
execute_in_k8s_pod_instance >> print_pattern_instance
關於使用運算子的注意事項
注意
在較早版本的 Airflow 中使用 @task.kubernetes
裝飾器
由於 @task.kubernetes
裝飾器在 docker 提供者中可用,您可能會想在 2.4 之前的 Airflow 版本中使用它,但這不會起作用。如果您嘗試,您將收到此錯誤
AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'
您應該升級到 Airflow 2.4 或以上版本才能使用它。
將 TaskFlow API 與 Sensor 運算子搭配使用¶
您可以應用 @task.sensor
裝飾器將常規 Python 函數轉換為 BaseSensorOperator 類別的實例。Python 函數實作了 poke 邏輯,並傳回 PokeReturnValue
類別的實例,如同 BaseSensorOperator 中的 poke()
方法一樣。在 Airflow 2.3 中,sensor 運算子將能夠傳回 XCOM 值。這是透過在 poke()
方法的末尾傳回 PokeReturnValue
物件的實例來實現的
from airflow.sensors.base import PokeReturnValue class SensorWithXcomValue(BaseSensorOperator): def poke(self, context: Context) -> Union[bool, PokeReturnValue]: # ... is_done = ... # set to true if the sensor should stop poking. xcom_value = ... # return value of the sensor operator to be pushed to XCOM. return PokeReturnValue(is_done, xcom_value)
若要實作推送 XCOM 值並同時支援 2.3 版和 2.3 之前版本的 sensor 運算子,您需要明確推送 XCOM 值(如果版本為 2.3 之前版本)。
try: from airflow.sensors.base import PokeReturnValue except ImportError: PokeReturnValue = None class SensorWithXcomValue(BaseSensorOperator): def poke(self, context: Context) -> bool: # ... is_done = ... # set to true if the sensor should stop poking. xcom_value = ... # return value of the sensor operator to be pushed to XCOM. if PokeReturnValue is not None: return PokeReturnValue(is_done, xcom_value) else: if is_done: context["ti"].xcom_push(key="xcom_key", value=xcom_value) return is_done
或者,在 sensor 不需要推送 XCOM 值的情況下:poke()
和封裝的函數都可以傳回類似布林值的值,其中 True
表示 sensor 的操作已完成,而 False
表示 sensor 的操作未完成。
import pendulum
from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_sensor_decorator():
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
@task
def dummy_operator() -> None:
pass
wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()
多個輸出推斷¶
任務也可以透過使用 dict Python 型別來推斷多個輸出。
@task
def identity_dict(x: int, y: int) -> dict[str, int]:
return {"x": x, "y": y}
透過使用型別 dict
或任何其他符合 typing.Mapping
協定的類別作為函數傳回型別,multiple_outputs
參數會自動設定為 true。
注意,如果您手動設定 multiple_outputs
參數,則會停用推斷,並使用參數值。
在已裝飾任務和傳統任務之間新增依賴關係¶
上述教學課程說明如何在 TaskFlow 函數之間建立依賴關係。但是,也可以在傳統任務(例如 BashOperator
或 FileSensor
)和 TaskFlow 函數之間設定依賴關係。
以下程式碼顯示了如何建立此依賴關係
@task()
def extract_from_file():
"""
#### Extract from file task
A simple Extract task to get data ready for the rest of the data
pipeline, by reading the data from a file into a pandas dataframe
"""
order_data_file = "/tmp/order_data.csv"
order_data_df = pd.read_csv(order_data_file)
return order_data_df
file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()
file_task >> order_data
在上面的程式碼區塊中,新的 TaskFlow 函數定義為 extract_from_file
,它從已知的檔案位置讀取資料。在主要 DAG 中,定義了一個新的 FileSensor
任務來檢查此檔案。請注意,這是一個 Sensor 任務,它等待檔案。TaskFlow 函數調用放在變數 order_data
中。最後,使用變數指定此 Sensor 任務和 TaskFlow 函數之間的依賴關係。
在已裝飾任務和傳統任務之間使用 XCom¶
如上所述,TaskFlow API 允許以從 DAG 作者那裡抽象出來的方式在任務之間使用或傳遞 XCom。本節將進一步深入探討詳細範例,說明這不僅在 TaskFlow 函數之間是可能的,而且在 TaskFlow 函數和傳統任務之間也是可能的。
您可能會發現有必要使用來自傳統任務的 XCom 作為下游任務的輸入,無論是在任務執行期間推送的,還是透過其傳回值推送的。您可以透過使用為所有運算子公開的 .output
屬性來存取推送的 XCom(也稱為 XComArg
)。
預設情況下,使用 .output
屬性來檢索 XCom 結果相當於
task_instance.xcom_pull(task_ids="my_task_id", key="return_value")
若要檢索金鑰不是 return_value
的 XCom 結果,您可以使用
my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")
注意
僅針對列為 template_field
的運算子參數,才支援使用 .output
屬性作為另一個任務的輸入。
在下面的程式碼範例中,HttpOperator
結果透過 XComs 擷取。然後,此 XCom 結果(即任務輸出)會傳遞到 TaskFlow 函數,該函數會將回應剖析為 JSON。
get_api_results_task = HttpOperator(
task_id="get_api_results",
endpoint="/api/query",
do_xcom_push=True,
http_conn_id="http",
)
@task
def parse_results(api_results):
return json.loads(api_results)
parsed_results = parse_results(api_results=get_api_results_task.output)
反向操作也可以完成:將 TaskFlow 函數的輸出作為傳統任務的輸入傳遞。
@task(retries=3)
def create_queue():
"""This is a Python function that creates an SQS queue"""
hook = SqsHook()
result = hook.create_queue(queue_name="sample-queue")
return result["QueueUrl"]
sqs_queue = create_queue()
publish_to_queue = SqsPublishOperator(
task_id="publish_to_queue",
sqs_queue=sqs_queue,
message_content="{{ task_instance }}-{{ execution_date }}",
message_attributes=None,
delay_seconds=0,
)
請注意上面的程式碼範例,來自 create_queue
TaskFlow 函數的輸出(新建立的 Amazon SQS 佇列的 URL)然後作為 sqs_queue
引數傳遞到 SqsPublishOperator
任務。
最後,您不僅可以使用傳統運算子輸出作為 TaskFlow 函數的輸入,還可以作為其他傳統運算子的輸入。在下面的範例中,來自 SalesforceToS3Operator
任務的輸出(即目的地檔案位置的 S3 URI)用作 S3CopyObjectOperator
任務的輸入,以將相同檔案複製到 S3 中按日期分割的儲存位置,以便在資料湖中長期儲存。
BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"
upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
task_id="upload_salesforce_data_to_s3",
salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
s3_bucket_name="landing-bucket",
s3_key=f"{BASE_PATH}/{FILE_NAME}",
salesforce_conn_id="salesforce",
aws_conn_id="s3",
replace=True,
)
store_to_s3_data_lake = S3CopyObjectOperator(
task_id="store_to_s3_data_lake",
aws_conn_id="s3",
source_bucket_key=upload_salesforce_data_to_s3_landing.output,
dest_bucket_name="data_lake",
dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)
在已裝飾任務中存取上下文變數¶
執行可調用物件時,Airflow 將傳遞一組關鍵字引數,這些引數可用於您的函數中。這組 kwargs 完全對應於您可以在 Jinja 模板中使用的內容。為了使此功能運作,您可以新增您想要在函數中接收的上下文金鑰作為關鍵字引數。
例如,下面程式碼區塊中的可調用物件將取得 ti
和 next_ds
上下文變數的值
@task
def my_python_callable(*, ti, next_ds):
pass
在版本 2.8 中變更:先前,上下文金鑰引數必須提供預設值,例如 ti=None
。現在不再需要這樣做了。
您也可以選擇使用 **kwargs
接收整個上下文。請注意,這可能會產生輕微的效能損失,因為 Airflow 將需要展開可能包含許多您實際上不需要的東西的整個上下文。因此,更建議您使用顯式引數,如上一段所示。
@task
def my_python_callable(**kwargs):
ti = kwargs["ti"]
next_ds = kwargs["next_ds"]
此外,有時您可能想要在堆疊深處存取上下文,但您不想從任務可調用物件傳遞上下文變數。您仍然可以透過 get_current_context
方法存取執行上下文。
from airflow.operators.python import get_current_context
def some_function_in_your_library():
context = get_current_context()
ti = context["ti"]
目前上下文僅在任務執行期間可存取。在 pre_execute
或 post_execute
期間無法存取上下文。在執行上下文之外調用此方法將引發錯誤。
在已裝飾任務中使用模板¶
傳遞到已裝飾函數的引數會自動模板化。
您也可以使用 templates_exts
參數來模板化整個檔案。
@task(templates_exts=[".sql"])
def template_test(sql):
print(f"sql: {sql}")
template_test(sql="sql/test.sql")
這將讀取 sql/test.sql
的內容並取代所有模板變數。您也可以傳遞檔案清單,所有檔案都將被模板化。
您可以透過params 參數將其他參數傳遞到模板引擎。
但是,params
參數必須傳遞給裝飾器,而不是直接傳遞給您的函數,例如 @task(templates_exts=['.sql'], params={'my_param'})
,然後可以與 {{ params.my_param }}
一起在您的模板化檔案和函數參數中使用。
或者,您也可以使用 .override()
方法傳遞它
@task()
def template_test(input_var):
print(f"input_var: {input_var}")
template_test.override(params={"my_param": "wow"})(
input_var="my param is: {{ params.my_param }}",
)
最後,您也可以手動呈現模板
@task(params={"my_param": "wow"})
def template_test():
template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}"
context = get_current_context()
rendered_template = context["task"].render_template(
template_str,
context,
)
以下是一個完整範例,示範了上述所有內容
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
@dag(
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
params={"foobar": "param_from_dag", "other_param": "from_dag"},
)
def tutorial_taskflow_templates():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the templates in the TaskFlow API.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task(
# Causes variables that end with `.sql` to be read and templates
# within to be rendered.
templates_exts=[".sql"],
)
def template_test(sql, test_var, data_interval_end):
context = get_current_context()
# Will print...
# select * from test_data
# where 1=1
# and run_id = 'scheduled__2024-10-09T00:00:00+00:00'
# and something_else = 'param_from_task'
print(f"sql: {sql}")
# Will print `scheduled__2024-10-09T00:00:00+00:00`
print(f"test_var: {test_var}")
# Will print `2024-10-10 00:00:00+00:00`.
# Note how we didn't pass this value when calling the task. Instead
# it was passed by the decorator from the context
print(f"data_interval_end: {data_interval_end}")
# Will print...
# run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag
template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}"
rendered_template = context["task"].render_template(
template_str,
context,
)
print(f"rendered template: {rendered_template}")
# Will print the full context dict
print(f"context: {context}")
template_test.override(
# Will be merged with the dict defined in the dag
# and override existing parameters.
#
# Must be passed into the decorator's parameters
# through `.override()` not into the actual task
# function
params={"foobar": "param_from_task"},
)(
sql="sql/test.sql",
test_var="{{ run_id }}",
)
tutorial_taskflow_templates()
有條件地跳過任務¶
run_if()
和 skip_if()
是 TaskFlow 的語法糖,可讓您根據條件跳過 Task
。您可以使用它們來簡單地設定執行條件,而無需變更 DAG
或 Task
的結構。
它也允許您使用 Context
設定條件,這基本上與使用 pre_execute
相同。
run_if()
的範例用法如下
@task.run_if(lambda context: context["task_instance"].task_id == "run")
@task.bash()
def echo() -> str:
return "echo 'run'"
僅當 task_id
為 run
時,才會執行上述程式碼中定義的 echo
。
如果您想在跳過任務時留下記錄,您有兩個選項。
@task.run_if(lambda context: context["task_instance"].task_id == "run", skip_message="only task_id is 'run'")
@task.bash()
def echo() -> str:
return "echo 'run'"
@task.run_if(
lambda context: (context["task_instance"].task_id == "run", f"{context['ts']}: only task_id is 'run'")
)
@task.bash()
def echo() -> str:
return "echo 'run'"
還有一個 skip_if()
,其運作方式與 run_if()
相反,並且以相同的方式使用。
@task.skip_if(lambda context: context["task_instance"].task_id == "skip")
@task.bash()
def echo() -> str:
return "echo 'run'"
下一步?¶
您已經了解在 Airflow 2.0 中使用 TaskFlow API 範例撰寫 DAG 是多麼簡單。以下是您可能想要採取的幾個後續步驟
另請參閱
繼續教學課程的下一步:建置執行中的管線
閱讀概念章節,以詳細了解 Airflow 概念,例如 DAG、任務、運算子等等
檢視關於TaskFlow API 和
@task
裝飾器的章節。