序列化

為了支援資料交換,例如任務之間的引數,Airflow 需要序列化要交換的資料,並在下游任務需要時再次反序列化。序列化也會發生,以便 webserver 和排程器(相對於 DAG 處理器)不需要讀取 DAG 檔案。這樣做是為了安全性和效率。

序列化是一項出乎意料地困難的工作。Python 開箱即用僅支援基本型別的序列化,例如 strint,並且它會遍歷可迭代物件。當事情變得更複雜時,就需要自訂序列化。

Airflow 開箱即用支援三種自訂序列化方式。基本型別會原樣傳回,不進行額外編碼,例如,str 仍然是 str。當它不是基本型別(或其可迭代物件)時,Airflow 會在 airflow.serialization.serializers 的命名空間中尋找已註冊的序列化器和反序列化器。如果找不到,它將在類別中尋找 serialize() 方法,或者在反序列化的情況下尋找 deserialize(data, version: int) 方法。最後,如果類別使用 @dataclass@attr.define 裝飾,它將使用這些裝飾器的公開方法。

如果您希望使用新的序列化器擴展 Airflow,最好了解何時選擇哪種序列化方式。受 Airflow 控制的物件,即位於 airflow.* 命名空間下的物件,例如 airflow.model.dag.DAG 或受開發人員控制的物件,例如 my.company.Foo,應首先檢查它們是否可以使用 @attr.define@dataclass 裝飾。如果不可能,則應實作 serializedeserialize 方法。serialize 方法應傳回基本型別或字典。它不需要序列化字典中的值,這將會被處理,但鍵應為基本型別形式。

不受 Airflow 控制的物件,例如 numpy.int16,將需要已註冊的序列化器和反序列化器。需要版本控制。基本型別,不包括 bytes,可以原樣傳回,字典也可以。同樣,dict 值不需要序列化,但其鍵需要為基本型別形式。如果您正在實作已註冊的序列化器,請特別注意不要有循環匯入。通常,可以透過使用 str 來填充序列化器列表來避免這種情況。像這樣:serializers = ["my.company.Foo"] 而不是 serializers = [Foo]

注意

序列化和反序列化取決於速度。盡可能使用內建函式,例如 dict,並遠離使用類別和其他複雜結構。

Airflow 物件

from typing import Any, ClassVar


class Foo:
    __version__: ClassVar[int] = 1

    def __init__(self, a, v) -> None:
        self.a = a
        self.b = {"x": v}

    def serialize(self) -> dict[str, Any]:
        return {
            "a": self.a,
            "b": self.b,
        }

    @staticmethod
    def deserialize(data: dict[str, Any], version: int):
        f = Foo(a=data["a"])
        f.b = data["b"]
        return f

已註冊

from __future__ import annotations

from decimal import Decimal
from typing import TYPE_CHECKING

from airflow.utils.module_loading import qualname

if TYPE_CHECKING:
    from airflow.serialization.serde import U


serializers = [
    Decimal
]  # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers  # in some cases you might not have a deserializer (e.g. k8s pod)

__version__ = 1  # required


# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
    if isinstance(o, Decimal):
        name = qualname(o)
        _, _, exponent = o.as_tuple()
        if exponent >= 0:  # No digits after the decimal point.
            return int(o), name, __version__, True
            # Technically lossy due to floating point errors, but the best we
            # can do without implementing a custom encode function.
        return float(o), name, __version__, True

    return "", "", 0, False


# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
    # always check version compatibility
    if version > __version__:
        raise TypeError(f"serialized {version} of {classname} > {__version__}")

    if classname != qualname(Decimal):
        raise TypeError(f"{classname} != {qualname(Decimal)}")

    return Decimal(str(data))

這個條目有幫助嗎?