PythonOperator

使用 PythonOperator 來執行 Python callable。

提示

建議使用 @task decorator 而非傳統的 PythonOperator 來執行 Python callable。

airflow/example_dags/example_python_decorator.py[原始碼]

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py[原始碼]

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

傳遞引數

將額外引數傳遞到 @task 裝飾的函式,就像使用一般的 Python 函式一樣。

airflow/example_dags/example_python_decorator.py[原始碼]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py[原始碼]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

範本化

Airflow 傳入一組額外的關鍵字引數:每個 Jinja 範本變數 各一個,以及一個 templates_dict 引數。

templates_dict 引數是範本化的,因此字典中的每個值都會被評估為 Jinja 範本

airflow/example_dags/example_python_decorator.py[原始碼]

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py[原始碼]

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

PythonVirtualenvOperator

使用 PythonVirtualenvOperator decorator 在新的 Python 虛擬環境中執行 Python callable。virtualenv 套件需要安裝在執行 Airflow 的環境中(作為選用依賴項 pip install apache-airflow[virtualenv] --constraint ...)。

此外,cloudpickle 套件需要使用命令 pip install [cloudpickle] --constraint ... 作為選用依賴項安裝。此套件是目前使用的 dill 套件的替代品。Cloudpickle 在專注於標準 pickling 協議方面具有強大的優勢,確保更廣泛的相容性和更流暢的資料交換,同時仍然有效地處理常見的 Python 物件和函式中的全域變數。

提示

建議使用 @task.virtualenv decorator 而非傳統的 PythonVirtualenvOperator 在新的 Python 虛擬環境中執行 Python callable。

airflow/example_dags/example_python_decorator.py[原始碼]

    @task.virtualenv(
        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
    )
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py[原始碼]

    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
    )

傳遞引數

將額外引數傳遞到 @task.virtualenv 裝飾的函式,就像使用一般的 Python 函式一樣。遺憾的是,由於與底層程式庫不相容,Airflow 不支援序列化 vartitask_instance。對於 Airflow 上下文變數,請確保您可以透過將 system_site_packages 設定為 True 或將 apache-airflow 新增到 requirements 引數來存取 Airflow。否則,您將無法在 op_kwargs 中存取 Airflow 的大多數上下文變數。如果您想要與 datetime 物件相關的上下文,例如 data_interval_start,您可以新增 pendulumlazy_object_proxy

重要事項

定義要執行的 Python 函式主體會從 DAG 中剪出到一個臨時檔案,而沒有周圍的程式碼。如同範例中所示,您需要再次新增所有 import,並且不能依賴全域 Python 上下文中的變數。

如果您想要將變數傳遞到傳統的 PythonVirtualenvOperator,請使用 op_argsop_kwargs

如果需要套件安裝的其他參數,請透過 pip_install_options 參數傳遞它們,或使用 requirements.txt,如下例所示

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

所有支援的選項都列在 requirements file format 中。

虛擬環境設定選項

虛擬環境是根據 worker 上的全域 python pip 組態建立的。在您的環境中使用其他 ENVs 或調整一般 pip 組態,如 pip config 中所述。

如果您想要使用其他特定於任務的私有 python 儲存庫來設定虛擬環境,您可以傳遞 index_urls 參數,這將調整 pip 安裝組態。傳遞的 index urls 會取代標準系統設定的 index url 設定。為了防止將機密資訊新增到 DAG 程式碼中的私有儲存庫,您可以使用 Airflow 連線與 Hooks。為此,可以使用連線類型 Package Index (Python)

在您想要防止遠端呼叫以設定虛擬環境的特殊情況下,將 index_urls 作為空列表傳遞,例如 index_urls=[],這會強制 pip 安裝程式使用 --no-index 選項。

快取與重複使用

虛擬環境的設定是針對每個任務執行在臨時目錄中進行的。執行後,虛擬環境會再次刪除。確保您的 worker 上的 $tmp 資料夾有足夠的磁碟空間。通常(如果未另行設定),將會使用本機 pip 快取,以防止每次執行都重新下載套件。

但為每次執行設定虛擬環境仍然需要一些時間。對於重複執行,您可以將選項 venv_cache_path 設定為 worker 上的檔案系統資料夾。在這種情況下,虛擬環境將設定一次並重複使用。如果使用虛擬環境快取,則會在快取路徑中為每個唯一的需求集建立不同的虛擬環境子資料夾。因此,根據您系統設定中 DAG 的變化,需要足夠的磁碟空間。

請注意,在快取模式下,不會進行自動清理。所有 worker slots 共享相同的虛擬環境,但如果任務在不同的 worker 上重複排程,則可能會在多個 worker 上個別建立虛擬環境。此外,如果 worker 在 Kubernetes POD 中啟動,則重新啟動 worker 將會丟棄快取(假設 venv_cache_path 不在持久性卷上)。

如果您在執行時遇到快取虛擬環境損壞的問題,您可以透過將 Airflow 變數 PythonVirtualenvOperator.cache_key 設定為任何文字來影響快取目錄雜湊。此變數的內容用於向量中,以計算快取目錄金鑰。

請注意,對快取虛擬環境的任何修改(例如二進位路徑中的臨時檔案、後安裝其他需求)都可能會污染快取虛擬環境,並且 operator 不會維護或清理快取路徑。

ExternalPythonOperator

ExternalPythonOperator 可以幫助您使用與其他任務(以及主要 Airflow 環境)不同的一組 Python 程式庫來執行某些任務。這可能是虛擬環境或預先安裝且在執行 Airflow 任務的環境中可用的任何 Python 安裝。operator 以 python 參數接收 Python 二進位檔。請注意,即使在虛擬環境的情況下,python 路徑也應指向虛擬環境內的 python 二進位檔(通常在虛擬環境的 bin 子目錄中)。與虛擬環境的常規使用相反,不需要 activation 環境。僅僅使用 python 二進位檔就會自動啟用它。在下面的兩個範例中,PATH_TO_PYTHON_BINARY 都是這樣一個路徑,指向可執行的 Python 二進位檔。

使用 ExternalPythonOperator 在預定義的環境中執行 Python callable。virtualenv 套件應預先安裝在執行 Python 的環境中。如果使用 dill,則必須預先安裝在環境中(與主要 Airflow 環境中安裝的版本相同)。

提示

建議使用 @task.external_python decorator 而非傳統的 ExternalPythonOperator 在預定義的 Python 環境中執行 Python 程式碼。

airflow/example_dags/example_python_decorator.py[原始碼]

    @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py[原始碼]

    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = ExternalPythonOperator(
        task_id="external_python",
        python_callable=callable_external_python,
        python=PATH_TO_PYTHON_BINARY,
    )

傳遞引數

將額外引數傳遞到 @task.external_python 裝飾的函式,就像使用一般的 Python 函式一樣。遺憾的是,由於與底層程式庫不相容,Airflow 不支援序列化 varti / task_instance。對於 Airflow 上下文變數,請確保 Airflow 也作為 virtualenv 環境的一部分安裝,版本與執行任務的 Airflow 版本相同。否則,您將無法在 op_kwargs 中存取 Airflow 的大多數上下文變數。如果您想要與 datetime 物件相關的上下文,例如 data_interval_start,您可以將 pendulumlazy_object_proxy 新增到您的虛擬環境中。

重要事項

定義要執行的 Python 函式主體會從 DAG 中剪出到一個臨時檔案,而沒有周圍的程式碼。如同範例中所示,您需要再次新增所有 import,並且不能依賴全域 Python 上下文中的變數。

如果您想要將變數傳遞到傳統的 ExternalPythonOperator,請使用 op_argsop_kwargs

PythonBranchOperator

使用 PythonBranchOperator 執行 Python 分支 任務。

提示

建議使用 @task.branch decorator 而非傳統的 PythonBranchOperator 執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

    @task.branch()
    def branching(choices: list[str]) -> str:
        return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=lambda: f"branch_{random.choice(options)}",
    )

引數傳遞和範本化選項與 PythonOperator 相同。

BranchPythonVirtualenvOperator

使用 BranchPythonVirtualenvOperator decorator 執行 Python 分支 任務,它是 PythonBranchOperator 與在虛擬環境中執行的混合體。

提示

建議使用 @task.branch_virtualenv decorator 而非傳統的 BranchPythonVirtualenvOperator 執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
    #       Run the example a second time and see that it re-uses it and is faster.
    VENV_CACHE_PATH = tempfile.gettempdir()

    @task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
    def branching_virtualenv(choices) -> str:
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
    #       Run the example a second time and see that it re-uses it and is faster.
    VENV_CACHE_PATH = Path(tempfile.gettempdir())

    def branch_with_venv(choices):
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

    branching_venv = BranchPythonVirtualenvOperator(
        task_id="branching_venv",
        requirements=["numpy~=1.26.0"],
        venv_cache_path=VENV_CACHE_PATH,
        python_callable=branch_with_venv,
        op_args=[options],
    )

引數傳遞和範本化選項與 PythonVirtualenvOperator 相同。

BranchExternalPythonOperator

使用 BranchExternalPythonOperator 執行 Python 分支 任務,它是 PythonBranchOperator 與在外部 Python 環境中執行的混合體。

提示

建議使用 @task.branch_external_python decorator 而非傳統的 BranchExternalPythonOperator 執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

    @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
    def branching_ext_python(choices) -> str:
        import random

        return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

    def branch_with_external_python(choices):
        import random

        return f"ext_py_{random.choice(choices)}"

    branching_ext_py = BranchExternalPythonOperator(
        task_id="branching_ext_python",
        python=PATH_TO_PYTHON_BINARY,
        python_callable=branch_with_external_python,
        op_args=[options],
    )

引數傳遞和範本化選項與 ExternalPythonOperator 相同。

ShortCircuitOperator

使用 ShortCircuitOperator 來控制在滿足條件或獲得真值時,管線是否繼續。

此條件和真值的評估是透過 callable 的輸出完成的。如果 callable 返回 True 或真值,則允許管線繼續,並且將推送輸出的 XCom。如果輸出為 False 或假值,則管線將根據設定的短路(稍後詳細介紹)進行短路。在下面的範例中,在 "condition_is_true" 任務之後的任務將會執行,而 "condition_is_false" 任務下游的任務將會被跳過。

提示

建議使用 @task.short_circuit decorator 而非傳統的 ShortCircuitOperator,透過 Python callable 短路管線。

airflow/example_dags/example_short_circuit_decorator.py[原始碼]

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py[原始碼]

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

“短路” 可以設定為尊重或忽略為下游任務定義的 觸發規則。如果 ignore_downstream_trigger_rules 設定為 True(預設組態),則所有下游任務都會被跳過,而無需考慮為任務定義的 trigger_rule。如果此參數設定為 False,則直接下游任務會被跳過,但其他後續下游任務的指定 trigger_rule 會被尊重。在此短路組態中,operator 假設直接下游任務是有意要跳過的,但可能不是其他後續任務。如果只需要短路部分管線,而不是所有跟在短路任務之後的任務,則此組態特別有用。

在下面的範例中,請注意 "short_circuit" 任務設定為尊重下游觸發規則。這表示雖然跟在 "short_circuit" 任務之後的任務將會被跳過,因為裝飾的函式返回 False,但 "task_7" 仍然會執行,因為它設定為在上游任務完成執行後執行,而無論狀態如何(即 TriggerRule.ALL_DONE 觸發規則)。

airflow/example_dags/example_short_circuit_decorator.py[原始碼]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py[原始碼]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

傳遞引數

將額外引數傳遞到 @task.short_circuit 裝飾的函式,就像使用一般的 Python 函式一樣。

範本化

Jinja 範本化可以以與 PythonOperator 描述相同的方式使用。

PythonSensor

PythonSensor 執行任意 callable 並等待其傳回值為 True。

提示

建議使用 @task.sensor decorator 而非傳統的 PythonSensor 執行 Python callable 以檢查 True 條件。

airflow/example_dags/example_sensor_decorator.py[原始碼]

# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
    return PokeReturnValue(is_done=True, xcom_value="xcom_value")

airflow/example_dags/example_sensors.py[原始碼]

t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)

t10 = PythonSensor(
    task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)

此條目是否有幫助?