使用 TaskFlow

本教學課程建立在常規 Airflow 教學課程的基礎上,特別著重於使用 TaskFlow API 範例撰寫資料管線,TaskFlow API 範例作為 Airflow 2.0 的一部分引入,並與使用傳統範例撰寫的 DAG 進行比較。

此處選擇的資料管線是一個簡單的模式,包含三個獨立的「提取 (Extract)」、「轉換 (Transform)」和「載入 (Load)」任務。

範例「TaskFlow API」管線

以下是一個非常簡單的使用 TaskFlow API 範例的管線。更詳細的說明如下。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]


import json

import pendulum

from airflow.decorators import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

這是一個 DAG 定義檔

如果您是第一次查看 DAG 檔案,請注意這個 Python 腳本是由 Airflow 解譯的,並且是您的資料管線的組態檔。如需 DAG 檔案的完整介紹,請查看核心基礎教學課程,其中廣泛涵蓋了 DAG 結構和定義。

實例化 DAG

我們正在建立一個 DAG,它是任務的集合,任務之間存在依賴關係。這是一個非常簡單的定義,因為我們只希望在我們使用 Airflow 進行設定時執行 DAG,而無需任何重試或複雜的排程。在本範例中,請注意我們正在使用 @dag 裝飾器建立此 DAG,如下所示,Python 函數名稱充當 DAG 識別碼。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

現在,為了實際啟用它作為 DAG 執行,我們調用使用 @dag 裝飾器先前設定的 Python 函數 tutorial_taskflow_api,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

tutorial_taskflow_api()

在版本 2.4 中變更:如果 DAG 在 with 區塊內使用,或者如果是 @dag 裝飾函數的結果,則不再需要將 DAG「註冊」到全域變數中,Airflow 才能偵測到 DAG。

任務

在這個資料管線中,任務是根據使用 @task 裝飾器的 Python 函數建立的,如下所示。函數名稱充當任務的唯一識別碼。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict

傳回值(在本例中是一個字典)將可用於後續任務中。

「轉換」和「載入」任務的建立方式與上面顯示的「提取」任務相同。

DAG 的主要流程

現在我們已經根據 Python 函數定義了「提取」、「轉換」和「載入」任務,我們可以繼續 DAG 的主要部分。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

就這樣,我們完成了!我們調用了「提取」任務,從中獲取了訂單資料,並將其發送到「轉換」任務進行摘要,然後使用摘要資料調用了「載入」任務。任務之間的依賴關係以及在這些任務之間傳遞資料(這些任務可能在網路上不同節點上的不同工作人員上執行)都由 Airflow 處理。

現在,為了實際啟用它作為 DAG 執行,我們調用使用 @dag 裝飾器先前設定的 Python 函數 tutorial_taskflow_api,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

tutorial_taskflow_api()

但是如何運作?

對於經驗豐富的 Airflow DAG 作者來說,這簡單得令人驚訝!讓我們將其與 Airflow 2.0 之前必須編寫此 DAG 的方式進行比較,如下所示

airflow/example_dags/tutorial_dag.py[原始碼]


import json
import textwrap

import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
    "tutorial_dag",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__
    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)
    def transform(**kwargs):
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)
    def load(**kwargs):
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)

        print(total_order_value)
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task

上面顯示的所有處理都在新的 Airflow 2.0 DAG 中完成,但它都從 DAG 開發人員那裡抽象出來了。

讓我們透過隔離查看「轉換」任務來詳細檢查這一點,因為它位於資料管線的中間。在 Airflow 1.x 中,此任務的定義如下所示

airflow/example_dags/tutorial_dag.py[原始碼]

def transform(**kwargs):
    ti = kwargs["ti"]
    extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
    order_data = json.loads(extract_data_string)

    total_order_value = 0
    for value in order_data.values():
        total_order_value += value

    total_value = {"total_order_value": total_order_value}
    total_value_json_string = json.dumps(total_value)
    ti.xcom_push("total_order_value", total_value_json_string)

正如我們在這裡看到的,在「轉換」函數中處理的資料是使用 XCom 變數傳遞給它的。反過來,「轉換」函數中的摘要資料也放入另一個 XCom 變數中,然後「載入」任務將使用該變數。

將其與 Airflow 2.0 中的 TaskFlow API 進行比較,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

在 Airflow 2.0 中,所有用於在這些任務之間傳遞資料的 XCom 使用方式都從 DAG 作者那裡抽象出來了。但是,XCom 變數在幕後使用,並且可以使用 Airflow UI 檢視,以根據需要進行偵錯或 DAG 監控。

同樣,任務依賴關係會在 TaskFlow 內根據任務的功能調用自動產生。在 Airflow 1.x 中,必須顯式建立任務並指定依賴關係,如下所示。

airflow/example_dags/tutorial_dag.py[原始碼]

extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
    """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
    """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)

load_task = PythonOperator(
    task_id="load",
    python_callable=load,
)
load_task.doc_md = textwrap.dedent(
    """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)

extract_task >> transform_task >> load_task

相反,使用 Airflow 2.0 中的 TaskFlow API,調用本身會自動產生依賴關係,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[原始碼]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

重複使用已裝飾的任務

已裝飾的任務非常靈活。您可以在多個 DAG 中重複使用已裝飾的任務,覆寫任務參數,例如 task_idqueuepool 等。

以下是如何在多個 DAG 中重複使用已裝飾任務的範例

from airflow.decorators import task, dag
from datetime import datetime


@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)


@dag(start_date=datetime(2022, 1, 1))
def mydag2():
    start = add_task(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)


first_dag = mydag()
second_dag = mydag2()

您也可以匯入上述 add_task 並在另一個 DAG 檔案中使用它。假設 add_task 程式碼位於名為 common.py 的檔案中。您可以執行此操作

from common import add_task
from airflow.decorators import dag
from datetime import datetime


@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
    start = add_task.override(priority_weight=3)(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)


created_dag = use_add_task()

將 TaskFlow API 與複雜/衝突的 Python 依賴項搭配使用

如果您的任務需要複雜或衝突的需求,那麼您將能夠將 TaskFlow API 與 Python 虛擬環境 (自 2.0.2 起)、Docker 容器 (自 2.2.0 起)、ExternalPythonOperator (自 2.4.0 起) 或 KubernetesPodOperator (自 2.4.0 起) 搭配使用。

此功能允許 TaskFlow API 更全面的使用案例,因為您不受 Airflow 工作人員的套件和系統程式庫的限制。對於下面描述的已裝飾函數的所有情況,您都必須確保函數是可序列化的,並且它們僅對您使用的其他依賴項使用本機匯入。這些匯入的其他程式庫必須在目標環境中可用 - 它們不需要在主要的 Airflow 環境中可用。

您應該使用哪個運算子取決於幾個因素

  • 您是否正在執行可以存取 Docker 引擎或 Kubernetes 的 Airflow

  • 您是否可以承擔動態建立具有新依賴項的虛擬環境的額外負擔

  • 您是否可以為所有 Airflow 組件部署預先存在的、不可變的 Python 環境。

這些選項應該可以讓希望保持工作流程更簡單、更 Pythonic 的使用者擁有更大的靈活性 - 並讓您可以將 DAG 的完整邏輯保留在 DAG 本身中。

您也可以在處理衝突/複雜 Python 依賴項的最佳實務中取得更多關於管理衝突依賴項方法的上下文,包括對每個選項的界限和後果的更詳細說明

為每個任務動態建立 Virtualenv

最簡單的方法是在同一部機器上動態建立(每次執行任務時)一個單獨的虛擬環境,您可以使用 @task.virtualenv 裝飾器。該裝飾器允許您動態建立一個新的 virtualenv,其中包含自訂程式庫,甚至可以使用不同的 Python 版本來執行您的函數。

範例(動態建立的 virtualenv)

airflow/example_dags/example_python_operator.py[原始碼]

    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
    )

使用具有預先安裝依賴項的 Python 環境

更複雜的 @task.external_python 裝飾器允許您在預先定義的、不可變的 virtualenv(或在系統層級安裝且沒有 virtualenv 的 Python 二進位檔)中執行 Airflow 任務。這個 virtualenv 或系統 python 也可以安裝不同的自訂程式庫集,並且必須在可以執行相同位置任務的所有工作人員中提供。

使用 @task.external_python 的範例(使用不可變的、預先存在的 virtualenv)

airflow/example_dags/example_python_operator.py[原始碼]

    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = ExternalPythonOperator(
        task_id="external_python",
        python_callable=callable_external_python,
        python=PATH_TO_PYTHON_BINARY,
    )

使用 Docker Operator 進行依賴項隔離

如果您的 Airflow 工作人員可以存取 docker 引擎,您可以改為使用 DockerOperator 並新增任何需要的引數以正確執行任務。請注意,docker 映像必須安裝可運作的 Python,並將 bash 命令作為 command 引數。

值得注意的是,Python 原始碼(從已裝飾的函數中提取)和任何可調用引數都透過(編碼和 pickle 化的)環境變數傳送到容器,因此這些引數的長度不是無限的(確切的限制取決於系統設定)。

以下是使用 @task.docker 裝飾器執行 Python 任務的範例。

tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py[原始碼]

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

關於使用運算子的注意事項

注意

在較早版本的 Airflow 中使用 @task.docker 裝飾器

由於 @task.docker 裝飾器在 docker 提供者中可用,您可能會想在 2.2 之前的 Airflow 版本中使用它,但這不會起作用。如果您嘗試,您將收到此錯誤

AttributeError: '_TaskDecorator' object has no attribute 'docker'

您應該升級到 Airflow 2.2 或以上版本才能使用它。

使用 Kubernetes Pod Operator 進行依賴項隔離

如果您的 Airflow 工作人員可以存取 Kubernetes,您可以改為使用 KubernetesPodOperator 並新增任何需要的引數以正確執行任務。

以下是使用 @task.kubernetes 裝飾器執行 Python 任務的範例。

tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py[原始碼]

@task.kubernetes(
    image="python:3.8-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.8-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for _ in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

關於使用運算子的注意事項

注意

在較早版本的 Airflow 中使用 @task.kubernetes 裝飾器

由於 @task.kubernetes 裝飾器在 docker 提供者中可用,您可能會想在 2.4 之前的 Airflow 版本中使用它,但這不會起作用。如果您嘗試,您將收到此錯誤

AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'

您應該升級到 Airflow 2.4 或以上版本才能使用它。

將 TaskFlow API 與 Sensor 運算子搭配使用

您可以應用 @task.sensor 裝飾器將常規 Python 函數轉換為 BaseSensorOperator 類別的實例。Python 函數實作了 poke 邏輯,並傳回 PokeReturnValue 類別的實例,如同 BaseSensorOperator 中的 poke() 方法一樣。在 Airflow 2.3 中,sensor 運算子將能夠傳回 XCOM 值。這是透過在 poke() 方法的末尾傳回 PokeReturnValue 物件的實例來實現的

from airflow.sensors.base import PokeReturnValue


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        return PokeReturnValue(is_done, xcom_value)

若要實作推送 XCOM 值並同時支援 2.3 版和 2.3 之前版本的 sensor 運算子,您需要明確推送 XCOM 值(如果版本為 2.3 之前版本)。

try:
    from airflow.sensors.base import PokeReturnValue
except ImportError:
    PokeReturnValue = None


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> bool:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        if PokeReturnValue is not None:
            return PokeReturnValue(is_done, xcom_value)
        else:
            if is_done:
                context["ti"].xcom_push(key="xcom_key", value=xcom_value)
            return is_done

或者,在 sensor 不需要推送 XCOM 值的情況下:poke() 和封裝的函數都可以傳回類似布林值的值,其中 True 表示 sensor 的操作已完成,而 False 表示 sensor 的操作未完成。

airflow/example_dags/example_sensor_decorator.py[原始碼]


import pendulum

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

多個輸出推斷

任務也可以透過使用 dict Python 型別來推斷多個輸出。

@task
def identity_dict(x: int, y: int) -> dict[str, int]:
    return {"x": x, "y": y}

透過使用型別 dict 或任何其他符合 typing.Mapping 協定的類別作為函數傳回型別,multiple_outputs 參數會自動設定為 true。

注意,如果您手動設定 multiple_outputs 參數,則會停用推斷,並使用參數值。

在已裝飾任務和傳統任務之間新增依賴關係

上述教學課程說明如何在 TaskFlow 函數之間建立依賴關係。但是,也可以在傳統任務(例如 BashOperatorFileSensor)和 TaskFlow 函數之間設定依賴關係。

以下程式碼顯示了如何建立此依賴關係

@task()
def extract_from_file():
    """
    #### Extract from file task
    A simple Extract task to get data ready for the rest of the data
    pipeline, by reading the data from a file into a pandas dataframe
    """
    order_data_file = "/tmp/order_data.csv"
    order_data_df = pd.read_csv(order_data_file)
    return order_data_df


file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()

file_task >> order_data

在上面的程式碼區塊中,新的 TaskFlow 函數定義為 extract_from_file,它從已知的檔案位置讀取資料。在主要 DAG 中,定義了一個新的 FileSensor 任務來檢查此檔案。請注意,這是一個 Sensor 任務,它等待檔案。TaskFlow 函數調用放在變數 order_data 中。最後,使用變數指定此 Sensor 任務和 TaskFlow 函數之間的依賴關係。

在已裝飾任務和傳統任務之間使用 XCom

如上所述,TaskFlow API 允許以從 DAG 作者那裡抽象出來的方式在任務之間使用或傳遞 XCom。本節將進一步深入探討詳細範例,說明這不僅在 TaskFlow 函數之間是可能的,而且在 TaskFlow 函數傳統任務之間也是可能的。

您可能會發現有必要使用來自傳統任務的 XCom 作為下游任務的輸入,無論是在任務執行期間推送的,還是透過其傳回值推送的。您可以透過使用為所有運算子公開的 .output 屬性來存取推送的 XCom(也稱為 XComArg)。

預設情況下,使用 .output 屬性來檢索 XCom 結果相當於

task_instance.xcom_pull(task_ids="my_task_id", key="return_value")

若要檢索金鑰不是 return_value 的 XCom 結果,您可以使用

my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")

注意

僅針對列為 template_field 的運算子參數,才支援使用 .output 屬性作為另一個任務的輸入。

在下面的程式碼範例中,HttpOperator 結果透過 XComs 擷取。然後,此 XCom 結果(即任務輸出)會傳遞到 TaskFlow 函數,該函數會將回應剖析為 JSON。

get_api_results_task = HttpOperator(
    task_id="get_api_results",
    endpoint="/api/query",
    do_xcom_push=True,
    http_conn_id="http",
)


@task
def parse_results(api_results):
    return json.loads(api_results)


parsed_results = parse_results(api_results=get_api_results_task.output)

反向操作也可以完成:將 TaskFlow 函數的輸出作為傳統任務的輸入傳遞。

@task(retries=3)
def create_queue():
    """This is a Python function that creates an SQS queue"""
    hook = SqsHook()
    result = hook.create_queue(queue_name="sample-queue")

    return result["QueueUrl"]


sqs_queue = create_queue()

publish_to_queue = SqsPublishOperator(
    task_id="publish_to_queue",
    sqs_queue=sqs_queue,
    message_content="{{ task_instance }}-{{ execution_date }}",
    message_attributes=None,
    delay_seconds=0,
)

請注意上面的程式碼範例,來自 create_queue TaskFlow 函數的輸出(新建立的 Amazon SQS 佇列的 URL)然後作為 sqs_queue 引數傳遞到 SqsPublishOperator 任務。

最後,您不僅可以使用傳統運算子輸出作為 TaskFlow 函數的輸入,還可以作為其他傳統運算子的輸入。在下面的範例中,來自 SalesforceToS3Operator 任務的輸出(即目的地檔案位置的 S3 URI)用作 S3CopyObjectOperator 任務的輸入,以將相同檔案複製到 S3 中按日期分割的儲存位置,以便在資料湖中長期儲存。

BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"


upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
    task_id="upload_salesforce_data_to_s3",
    salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
    s3_bucket_name="landing-bucket",
    s3_key=f"{BASE_PATH}/{FILE_NAME}",
    salesforce_conn_id="salesforce",
    aws_conn_id="s3",
    replace=True,
)


store_to_s3_data_lake = S3CopyObjectOperator(
    task_id="store_to_s3_data_lake",
    aws_conn_id="s3",
    source_bucket_key=upload_salesforce_data_to_s3_landing.output,
    dest_bucket_name="data_lake",
    dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)

在已裝飾任務中存取上下文變數

執行可調用物件時,Airflow 將傳遞一組關鍵字引數,這些引數可用於您的函數中。這組 kwargs 完全對應於您可以在 Jinja 模板中使用的內容。為了使此功能運作,您可以新增您想要在函數中接收的上下文金鑰作為關鍵字引數。

例如,下面程式碼區塊中的可調用物件將取得 tinext_ds 上下文變數的值

@task
def my_python_callable(*, ti, next_ds):
    pass

在版本 2.8 中變更:先前,上下文金鑰引數必須提供預設值,例如 ti=None。現在不再需要這樣做了。

您也可以選擇使用 **kwargs 接收整個上下文。請注意,這可能會產生輕微的效能損失,因為 Airflow 將需要展開可能包含許多您實際上不需要的東西的整個上下文。因此,更建議您使用顯式引數,如上一段所示。

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

此外,有時您可能想要在堆疊深處存取上下文,但您不想從任務可調用物件傳遞上下文變數。您仍然可以透過 get_current_context 方法存取執行上下文。

from airflow.operators.python import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

目前上下文僅在任務執行期間可存取。在 pre_executepost_execute 期間無法存取上下文。在執行上下文之外調用此方法將引發錯誤。

在已裝飾任務中使用模板

傳遞到已裝飾函數的引數會自動模板化。

您也可以使用 templates_exts 參數來模板化整個檔案。

@task(templates_exts=[".sql"])
def template_test(sql):
    print(f"sql: {sql}")


template_test(sql="sql/test.sql")

這將讀取 sql/test.sql 的內容並取代所有模板變數。您也可以傳遞檔案清單,所有檔案都將被模板化。

您可以透過params 參數將其他參數傳遞到模板引擎。

但是,params 參數必須傳遞給裝飾器,而不是直接傳遞給您的函數,例如 @task(templates_exts=['.sql'], params={'my_param'}),然後可以與 {{ params.my_param }} 一起在您的模板化檔案和函數參數中使用。

或者,您也可以使用 .override() 方法傳遞它

@task()
def template_test(input_var):
    print(f"input_var: {input_var}")


template_test.override(params={"my_param": "wow"})(
    input_var="my param is: {{ params.my_param }}",
)

最後,您也可以手動呈現模板

@task(params={"my_param": "wow"})
def template_test():
    template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}"

    context = get_current_context()
    rendered_template = context["task"].render_template(
        template_str,
        context,
    )

以下是一個完整範例,示範了上述所有內容

airflow/example_dags/tutorial_taskflow_templates.py[原始碼]


import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
    params={"foobar": "param_from_dag", "other_param": "from_dag"},
)
def tutorial_taskflow_templates():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the templates in the TaskFlow API.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task(
        # Causes variables that end with `.sql` to be read and templates
        # within to be rendered.
        templates_exts=[".sql"],
    )
    def template_test(sql, test_var, data_interval_end):
        context = get_current_context()

        # Will print...
        # select * from test_data
        # where 1=1
        #     and run_id = 'scheduled__2024-10-09T00:00:00+00:00'
        #     and something_else = 'param_from_task'
        print(f"sql: {sql}")

        # Will print `scheduled__2024-10-09T00:00:00+00:00`
        print(f"test_var: {test_var}")

        # Will print `2024-10-10 00:00:00+00:00`.
        # Note how we didn't pass this value when calling the task. Instead
        # it was passed by the decorator from the context
        print(f"data_interval_end: {data_interval_end}")

        # Will print...
        # run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag
        template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}"
        rendered_template = context["task"].render_template(
            template_str,
            context,
        )
        print(f"rendered template: {rendered_template}")

        # Will print the full context dict
        print(f"context: {context}")
    template_test.override(
        # Will be merged with the dict defined in the dag
        # and override existing parameters.
        #
        # Must be passed into the decorator's parameters
        # through `.override()` not into the actual task
        # function
        params={"foobar": "param_from_task"},
    )(
        sql="sql/test.sql",
        test_var="{{ run_id }}",
    )
tutorial_taskflow_templates()

有條件地跳過任務

run_if()skip_if() 是 TaskFlow 的語法糖,可讓您根據條件跳過 Task。您可以使用它們來簡單地設定執行條件,而無需變更 DAGTask 的結構。

它也允許您使用 Context 設定條件,這基本上與使用 pre_execute 相同。

run_if() 的範例用法如下

@task.run_if(lambda context: context["task_instance"].task_id == "run")
@task.bash()
def echo() -> str:
    return "echo 'run'"

僅當 task_idrun 時,才會執行上述程式碼中定義的 echo

如果您想在跳過任務時留下記錄,您有兩個選項。

@task.run_if(lambda context: context["task_instance"].task_id == "run", skip_message="only task_id is 'run'")
@task.bash()
def echo() -> str:
    return "echo 'run'"
@task.run_if(
    lambda context: (context["task_instance"].task_id == "run", f"{context['ts']}: only task_id is 'run'")
)
@task.bash()
def echo() -> str:
    return "echo 'run'"

還有一個 skip_if(),其運作方式與 run_if() 相反,並且以相同的方式使用。

@task.skip_if(lambda context: context["task_instance"].task_id == "skip")
@task.bash()
def echo() -> str:
    return "echo 'run'"

下一步?

您已經了解在 Airflow 2.0 中使用 TaskFlow API 範例撰寫 DAG 是多麼簡單。以下是您可能想要採取的幾個後續步驟

另請參閱

這個條目有幫助嗎?