動態任務映射

動態任務映射允許工作流程根據當前資料在執行時建立許多任務,而不是 DAG 作者必須預先知道需要多少任務。

這類似於在 for 迴圈中定義您的任務,但調度器可以根據先前任務的輸出執行此操作,而不是讓 DAG 檔案擷取資料並自行執行。在執行映射任務之前,調度器將建立 n 個任務副本,每個輸入一個副本。

也可以讓任務對映射任務的收集輸出進行操作,通常稱為 map 和 reduce。

簡單映射

在其最簡單的形式中,您可以使用 expand() 函數,而不是直接呼叫您的任務,來映射直接在您的 DAG 檔案中定義的清單。

如果您想查看動態任務映射的簡單用法,您可以查看下面

airflow/example_dags/example_dynamic_task_mapping.py[原始碼]

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""

from __future__ import annotations

from datetime import datetime

from airflow.decorators import task
from airflow.models.dag import DAG

with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

這將在執行時在任務日誌中顯示 Total was 9

這是產生的 DAG 結構

../_images/mapping-simple-graph.png

網格視圖也提供在詳細資訊面板中查看您的映射任務

../_images/mapping-simple-grid.png

注意

只允許將關鍵字引數傳遞給 expand()

注意

從映射任務傳遞的值是延遲代理

在上面的範例中,sum_it 收到的 values 是由每個 add_one 的映射實例傳回的所有值的聚合。但是,由於不可能預先知道我們將有多少 add_one 的實例,因此 values 不是普通的清單,而是一個「延遲序列」,僅在被要求時才檢索每個單獨的值。因此,如果您直接執行 print(values),您會得到類似這樣的東西

LazySelectSequence([15 items])

您可以在此物件上使用正常的序列語法(例如 values[0]),或使用 for 迴圈正常迭代它。list(values) 將為您提供一個「真實」的 list,但由於這會從所有引用的上游映射任務中急切地載入值,如果映射數量很大,您必須注意潛在的效能影響。

請注意,當您將此代理物件推送到 XCom 時,情況也一樣。Airflow 嘗試變得聰明並自動強制轉換該值,但會為此發出警告,以便您了解這一點。例如

@task
def forward_values(values):
    return values  # This is a lazy proxy!

將發出類似這樣的警告

Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.

可以透過像這樣修改任務來抑制該訊息

@task
def forward_values(values):
    return list(values)

注意

不需要 reduce 任務。

雖然我們在這裡展示了一個「reduce」任務 (sum_it),但您不必擁有一個,即使映射任務沒有下游任務,它們仍然會被執行。

任務產生的映射

我們展示的以上範例都可以透過 DAG 檔案中的 for 迴圈實現,但動態任務映射的真正威力來自能夠讓任務產生要迭代的清單。

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(arg)


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

make_list 任務作為普通任務執行,並且必須傳回清單或 dict(請參閱哪些資料類型可以擴展?),然後將呼叫 consumer 任務四次,每次使用 make_list 的傳回值中的一個值。

警告

任務產生的映射不能與 TriggerRule.ALWAYS 一起使用

不允許在任務產生的映射中指派 trigger_rule=TriggerRule.ALWAYS,因為擴展參數在任務立即執行時是未定義的。這在 DAG 解析時強制執行,對於任務和映射任務群組都是如此,如果您嘗試使用它,將會引發錯誤。在最近的範例中,在 consumer 任務中設定 trigger_rule=TriggerRule.ALWAYS 將引發錯誤,因為 make_list 是任務產生的映射。

重複映射

一個映射任務的結果也可以用作下一個映射任務的輸入。

with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    first = add_one.expand(x=[1, 2, 3])
    second = add_one.expand(x=first)

這將產生 [3, 4, 5] 的結果。

新增不擴展的參數

除了傳遞在執行時擴展的引數之外,還可以傳遞不變的引數——為了清楚地區分這兩種類型,我們使用不同的函數,expand() 用於映射引數,而 partial() 用於未映射的引數。

@task
def add(x: int, y: int):
    return x + y


added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)

這將產生 11、12 和 13 的值。

這也適用於將連線 ID、資料庫表格名稱或儲存桶名稱等內容傳遞給任務。

多個參數的映射

除了單個參數之外,還可以傳遞多個參數進行擴展。這將產生「交叉乘積」的效果,使用參數的每個組合呼叫映射任務。

@task
def add(x: int, y: int):
    return x + y


added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# This results in the add function being called with
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)

這將導致 add 任務被呼叫 6 次。但是請注意,不保證擴展的順序。

具名映射

預設情況下,映射任務會被指派一個整數索引。可以使用基於任務輸入的名稱來覆寫 Airflow UI 中每個映射任務的整數索引。這是透過為任務提供帶有 map_index_template 的 Jinja 範本來完成的。當擴展看起來像 .expand(<property>=...) 時,這通常看起來像 map_index_template="{{ task.<property> }}"。此範本在每個擴展任務執行後使用任務上下文呈現。這表示您可以像這樣參考任務上的屬性

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
    ...,
    sql="SELECT * FROM data WHERE date = %(date)s",
    map_index_template="""{{ task.parameters['date'] }}""",
).expand(
    parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)

在上面的範例中,擴展的任務實例將被命名為「2024-01-01」和「2024-01-02」。這些名稱會顯示在 Airflow UI 中,而不是「0」和「1」。

由於範本是在主執行區塊之後呈現的,因此也可以動態注入到呈現上下文中。當呈現理想名稱的邏輯難以在 Jinja 範本語法中表達時,尤其是在 taskflow 函數中,這非常有用。例如

from airflow.operators.python import get_current_context


@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
    context = get_current_context()
    context["my_variable"] = my_value * 3
    ...  # Normal execution...


# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])

使用非 TaskFlow 運算子的映射

也可以將 partialexpand 與經典樣式的運算子一起使用。某些引數不可映射,並且必須傳遞給 partial(),例如 task_idqueuepool 以及 BaseOperator 的大多數其他引數。

airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py[原始碼]

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""

from __future__ import annotations

from datetime import datetime

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG


class AddOneOperator(BaseOperator):
    """A custom operator that adds one to the input."""

    def __init__(self, value, **kwargs):
        super().__init__(**kwargs)
        self.value = value

    def execute(self, context):
        return self.value + 1


class SumItOperator(BaseOperator):
    """A custom operator that sums the input."""

    template_fields = ("values",)

    def __init__(self, values, **kwargs):
        super().__init__(**kwargs)
        self.values = values

    def execute(self, context):
        total = sum(self.values)
        print(f"Total was {total}")
        return total


with DAG(
    dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
    schedule=None,
    start_date=datetime(2022, 3, 4),
    catchup=False,
):
    # map the task to a list of values
    add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])

    # aggregate (reduce) the mapped tasks results
    sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)

注意

只允許將關鍵字引數傳遞給 partial()

經典運算子結果的映射

如果您想映射經典運算子的結果,您應該明確參考輸出,而不是運算子本身。

# Create a list of data inputs.
extract = ExtractOperator(task_id="extract")

# Expand the operator to transform each input.
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)

# Collect the transformed inputs, expand the operator to load each one of them to the target.
load = LoadOperator.partial(task_id="load").expand(input=transform.output)

混合 TaskFlow 和經典運算子

在此範例中,您有一個到 S3 儲存桶的常規資料傳遞,並且想要對每次到達的每個檔案應用相同的處理,無論每次到達多少個檔案。

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator


with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
    list_filenames = S3ListOperator(
        task_id="get_input",
        bucket="example-bucket",
        prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
    )

    @task
    def count_lines(aws_conn_id, bucket, filename):
        hook = S3Hook(aws_conn_id=aws_conn_id)

        return len(hook.read_key(filename, bucket).splitlines())

    @task
    def total(lines):
        return sum(lines)

    counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
        filename=list_filenames.output
    )

    total(lines=counts)

將多個參數指派給非 TaskFlow 運算子

有時,上游需要為下游運算子指定多個引數。為此,您可以使用 expand_kwargs 函數,該函數接受要映射的映射序列。

BashOperator.partial(task_id="bash").expand_kwargs(
    [
        {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
        {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
    ],
)

這會在執行時產生兩個任務實例,分別印出 12

此外,也可以混合 expand_kwargs 與大多數運算子引數,例如 PythonOperator 的 op_kwargs

def print_args(x, y):
    print(x)
    print(y)
    return x + y


PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(
    [
        {"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
        {"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
    ]
)

expand 類似,您也可以針對傳回 dict 清單的 XCom 或每個都傳回 dict 的 XCom 清單進行映射。重新使用上面的 S3 範例,您可以使用映射任務來執行「分支」並將檔案複製到不同的儲存桶

list_filenames = S3ListOperator(...)  # Same as the above example.


@task
def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        dest_bucket_name = "my_text_bucket"
    else:
        dest_bucket_name = "my_other_bucket"
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": dest_bucket_name,
    }


copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)

# Copy files to another bucket, based on the file's extension.
copy_filenames = S3CopyObjectOperator.partial(
    task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)

任務群組的映射

與 TaskFlow 任務類似,您也可以在 @task_group 裝飾的函數上呼叫 expandexpand_kwargs,以建立映射任務群組

注意

為了簡潔起見,此章節中省略了個別任務的實作。

@task_group
def file_transforms(filename):
    return convert_to_yaml(filename)


file_transforms.expand(filename=["data1.json", "data2.json"])

在上面的範例中,任務 convert_to_yaml 在執行時擴展為兩個任務實例。第一個擴展將接收 "data1.json" 作為輸入,第二個接收 "data2.json"

任務群組函數中的值參考

任務函數 (@task) 和任務群組函數 (@task_group) 之間的一個重要區別是,由於任務群組沒有關聯的工作人員,因此任務群組函數中的程式碼無法解析傳遞給它的引數;只有當參考傳遞給任務時,真實值才會被解析。

例如,此程式碼將無法運作

@task
def my_task(value):
    print(value)


@task_group
def my_task_group(value):
    if not value:  # DOES NOT work as you'd expect!
        task_a = EmptyOperator(...)
    else:
        task_a = PythonOperator(...)
    task_a << my_task(value)


my_task_group.expand(value=[0, 1, 2])

當執行 my_task_group 中的程式碼時,value 仍然只是一個參考,而不是真實值,因此 if not value 分支將無法如您所願地運作。但是,如果您將該參考傳遞給任務,它將在執行任務時被解析,因此三個 my_task 實例將分別接收 1、2 和 3。

因此,重要的是要記住,如果您打算對傳遞到任務群組函數中的值執行任何邏輯,您必須始終使用任務來執行邏輯,例如 @task.branch(或 BranchPythonOperator)用於條件,以及任務映射方法用於迴圈。

注意

不允許在映射任務群組中進行任務映射

目前不允許在映射任務群組內進行巢狀任務映射。雖然此功能的技術方面並非特別困難,但我們已決定有意省略此功能,因為它增加了相當大的 UI 複雜性,並且對於一般使用案例可能不是必要的。此限制可能會根據使用者回饋在未來重新檢視。

深度優先執行

如果映射任務群組包含多個任務,則群組中的所有任務都會針對相同的輸入「一起」擴展。例如

@task_group
def file_transforms(filename):
    converted = convert_to_yaml(filename)
    return replace_defaults(converted)


file_transforms.expand(filename=["data1.json", "data2.json"])

由於群組 file_transforms 擴展為兩個,因此任務 convert_to_yamlreplace_defaults 將在執行時各自變成兩個實例。

透過像這樣分別擴展這兩個任務,可以實現類似的效果

converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)

但是,不同之處在於,任務群組允許內部的每個任務僅依賴其「相關輸入」。對於上面的範例,replace_defaults 將僅依賴於同一擴展群組的 convert_to_yaml,而不是同一任務的實例,而是在不同的群組中。這種策略稱為深度優先執行(與簡單的無群組廣度優先執行相反),允許更邏輯的任務分離、細粒度的依賴規則和準確的資源分配——使用上面的範例,第一個 replace_defaults 將能夠在 convert_to_yaml("data2.json") 完成之前執行,並且不需要關心它是否成功。

依賴映射任務群組的輸出

與映射任務群組類似,依賴映射任務群組的輸出也將自動聚合群組的結果

@task_group
def add_to(value):
    value = add_one(value)
    return double(value)


results = add_to.expand(value=[1, 2, 3])
consumer(results)  # Will receive [4, 6, 8].

也可以執行與普通映射任務的結果相同的任何操作。

在映射任務群組的輸出上進行分支

雖然無法在映射任務的結果上實作分支邏輯(例如使用 @task.branch),但可以根據任務群組的輸入進行分支。以下範例示範了根據映射任務群組的輸入執行三個任務之一。

inputs = ["a", "b", "c"]


@task_group(group_id="my_task_group")
def my_task_group(input):
    @task.branch
    def branch(element):
        if "a" in element:
            return "my_task_group.a"
        elif "b" in element:
            return "my_task_group.b"
        else:
            return "my_task_group.c"

    @task
    def a():
        print("a")

    @task
    def b():
        print("b")

    @task
    def c():
        print("c")

    branch(input) >> [a(), b(), c()]


my_task_group.expand(input=inputs)

從映射任務中篩選項目

映射任務可以透過傳回 None 從傳遞到其下游任務的任何元素中移除元素。例如,如果我們只想將具有特定副檔名的檔案從 S3 儲存桶複製到另一個儲存桶,我們可以像這樣實作 create_copy_kwargs

@task
def create_copy_kwargs(filename):
    # Skip files not ending with these suffixes.
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        return None
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


# copy_kwargs and copy_files are implemented the same.

這使得 copy_files 僅針對 .json.yml 檔案進行擴展,同時忽略其餘檔案。

轉換擴展資料

由於通常需要轉換任務映射的輸出資料格式,尤其是從非 TaskFlow 運算子,其中輸出格式是預先決定的並且不容易轉換(例如上述範例中的 create_copy_kwargs),因此可以使用特殊的 map() 函數來輕鬆執行此類轉換。因此,可以像這樣修改上面的範例

from airflow.exceptions import AirflowSkipException

list_filenames = S3ListOperator(...)  # Unchanged.


def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


copy_kwargs = list_filenames.output.map(create_copy_kwargs)

# Unchanged.
copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)

有幾件事需要注意

  1. map() 的可呼叫引數(範例中的 create_copy_kwargs不得是任務,而是一個普通的 Python 函數。轉換是下游任務(即 copy_files)的「預處理」的一部分,而不是 DAG 中的獨立任務。

  2. 可呼叫物件始終只接受一個位置引數。對於用於任務映射的可迭代物件中的每個項目,都會呼叫此函數,類似於 Python 的內建 map() 的運作方式。

  3. 由於可呼叫物件是作為下游任務的一部分執行的,因此您可以使用任何現有技術來編寫任務函數。例如,若要將元件標記為已跳過,您應該引發 AirflowSkipException。請注意,在這裡傳回 None 不起作用

合併上游資料(又稱「壓縮」)

通常也需要將多個輸入來源合併到一個任務映射可迭代物件中。這通常稱為「壓縮」(類似於 Python 的內建 zip() 函數),並且也作為下游任務的預處理執行。

這對於任務映射中的條件邏輯特別有用。例如,如果您想從 S3 下載檔案,但重新命名這些檔案,則可以執行類似以下的操作

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]

filenames_a_b = list_filenames_a.output.zip(list_filenames_b)


@task
def download_filea_from_a_rename(filenames_a_b):
    fn_a, fn_b = filenames_a_b
    S3Hook().download_file(fn_a, local_path=fn_b)


download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)

與內建 zip() 類似,您可以將任意數量的可迭代物件壓縮在一起,以取得位置引數計數的元組的可迭代物件。預設情況下,壓縮的可迭代物件的長度與壓縮的可迭代物件中最短的長度相同,並丟棄多餘的項目。可以傳遞一個選用的關鍵字引數 default,以切換行為以符合 Python 的 itertools.zip_longest()——壓縮的可迭代物件的長度將與壓縮的可迭代物件中最長的長度相同,遺失的項目將以 default 提供的數值填滿。

串連多個上游

合併輸入來源的另一種常見模式是針對多個可迭代物件執行相同的任務。當然,對於每個可迭代物件分別執行相同的程式碼是完全有效的,例如

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = S3ListOperator(
    task_id="list_files_in_b",
    bucket="bucket",
    prefix="incoming/provider_b/{{ data_interval_start|ds }}",
)


@task
def download_file(filename):
    S3Hook().download_file(filename)
    # process file...


download_file.override(task_id="download_file_a").expand(filename=list_filenames_a.output)
download_file.override(task_id="download_file_b").expand(filename=list_filenames_b.output)

但是,如果任務可以合併為一個,則 DAG 將更具可擴展性且更易於檢查。這可以使用 concat 完成

# Tasks list_filenames_a and list_filenames_b, and download_file stay unchanged.

list_filenames_concat = list_filenames_a.concat(list_filenames_b)
download_file.expand(filename=list_filenames_concat)

這會建立一個單一任務來針對兩個清單進行擴展。您可以將任意數量的可迭代物件 concat 在一起(例如 foo.concat(bar, rex));或者,由於傳回值也是 XCom 參考,因此可以鏈結 concat 呼叫(例如 foo.concat(bar).concat(rex))以達到相同的結果:一個單一的可迭代物件,依序串連所有這些物件,類似於 Python 的 itertools.chain()

哪些資料類型可以擴展?

目前只能針對 dict、清單或儲存在 XCom 中作為任務結果的這些類型之一進行映射。

如果上游任務傳回不可映射的類型,則映射任務將在執行時失敗,並顯示 UnmappableXComTypePushed 例外。例如,您不能讓上游任務傳回純字串 – 它必須是清單或 dict。

範本欄位和映射引數如何互動?

運算子的所有引數都可以映射,即使是那些不接受範本參數的引數。

如果欄位標記為範本化並且已映射,則不會被範本化

例如,這將印出 {{ ds }} 而不是日期戳記

@task
def make_list():
    return ["{{ ds }}"]


@task
def printer(val):
    print(val)


printer.expand(val=make_list())

如果您想內插值,請自行呼叫 task.render_template,或使用內插

@task
def make_list(ds=None):
    return [ds]


@task
def make_list(**context):
    return [context["task"].render_template("{{ ds }}", context)]

對映射任務設定限制

您可以對任務設定兩個限制

  1. 可以建立作為擴展結果的映射任務實例的數量。

  2. 可以同時執行的映射任務的數量。

  • 限制映射任務的數量

    [core] max_map_length 組態選項是 expand 可以建立的最大任務數 – 預設值為 1024。

    如果來源任務(我們先前範例中的 make_list)傳回的清單長度超過此值,則會導致任務失敗。

  • 限制映射任務的平行副本

    如果您不希望大型映射任務佔用所有可用的執行器插槽,您可以使用任務上的 max_active_tis_per_dag 設定來限制可以同時執行的數量。

    但是請注意,這適用於針對所有活動 DagRun 的該任務的所有副本,而不僅僅是針對此特定 DagRun。

    @task(max_active_tis_per_dag=16)
    def add_one(x: int):
        return x + 1
    
    
    BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand(bash_command=commands)
    

自動跳過零長度映射

如果輸入為空(零長度),則不會建立新任務,並且映射任務將被標記為 SKIPPED

此條目是否有幫助?