PythonOperator

使用 PythonOperator 來執行 Python 可呼叫物件。

提示

建議使用 @task 裝飾器,而不是傳統的 PythonOperator 來執行 Python 可呼叫物件。

airflow/example_dags/example_python_decorator.py[原始碼]

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py[原始碼]

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

傳入引數

如同使用一般的 Python 函數一樣,將額外引數傳遞給 @task 裝飾的函數。

airflow/example_dags/example_python_decorator.py[原始碼]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py[原始碼]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

範本化

Airflow 傳入一組額外的關鍵字引數:每個 Jinja 範本變數 各一個,以及一個 templates_dict 引數。

templates_dictop_argsop_kwargs 引數皆為範本,因此字典中的每個值都會被評估為 Jinja 範本

airflow/example_dags/example_python_decorator.py[原始碼]

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py[原始碼]

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

上下文

Context 是一個字典物件,其中包含關於 DagRun 環境的資訊。例如,選擇 task_instance 將會取得目前正在執行的 TaskInstance 物件。

它可以隱含地使用,例如使用 **kwargs,但也可以使用 get_current_context() 明確地使用。在這種情況下,類型提示可以用於靜態分析。

airflow/example_dags/example_python_context_decorator.py

@task(task_id="print_the_context")
def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = print_context()

airflow/example_dags/example_python_context_operator.py

def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = PythonOperator(task_id="print_the_context", python_callable=print_context)

PythonVirtualenvOperator

使用 PythonVirtualenvOperator 裝飾器,在新的 Python 虛擬環境中執行 Python 可呼叫物件。virtualenv 套件需要安裝在執行 Airflow 的環境中(作為可選依賴項 pip install apache-airflow[virtualenv] --constraint ...)。

此外,cloudpickle 套件需要作為可選依賴項安裝,使用命令 pip install [cloudpickle] --constraint ...。此套件是目前使用的 dill 套件的替代品。Cloudpickle 在專注於標準 pickling 協議方面具有強大的優勢,確保了更廣泛的兼容性和更順暢的資料交換,同時仍然有效地處理函數內常見的 Python 物件和全域變數。

提示

建議使用 @task.virtualenv 裝飾器,而不是傳統的 PythonVirtualenvOperator,以在新的 Python 虛擬環境中執行 Python 可呼叫物件。

airflow/example_dags/example_python_decorator.py[原始碼]

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py[原始碼]

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the function level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

傳入引數

如同使用一般的 Python 函數一樣,將額外引數傳遞給 @task.virtualenv 裝飾的函數。不幸的是,由於與底層程式庫不相容,Airflow 不支援序列化 vartitask_instance。對於 Airflow 上下文變數,請確保您透過將 system_site_packages 設定為 True 或將 apache-airflow 新增至 requirements 引數來存取 Airflow。否則,您將無法在 op_kwargs 中存取大多數 Airflow 的上下文變數。如果您想要與日期時間物件相關的上下文,例如 data_interval_start,您可以新增 pendulumlazy_object_proxy

重要

定義要執行的 Python 函數主體會從 DAG 中剪下到一個臨時檔案中,且不包含周圍的程式碼。如同範例所示,您需要再次新增所有 import,且不能依賴全域 Python 上下文中的變數。

如果您想要將變數傳遞到傳統的 PythonVirtualenvOperator 中,請使用 op_argsop_kwargs

如果需要套件安裝的其他參數,請透過 pip_install_options 參數傳遞它們,或使用 requirements.txt,如下面的範例所示

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

所有支援的選項都列在 requirements file format 中。

範本化

Jinja 範本化可以與 PythonOperator 中描述的方式相同使用。

虛擬環境設定選項

虛擬環境是根據 worker 上的全域 python pip 設定建立的。在您的環境中使用額外的 ENV 或調整一般 pip 設定,如 pip config 中所述。

如果您想要使用額外的任務特定私有 python 儲存庫來設定虛擬環境,您可以傳遞 index_urls 參數,這將調整 pip 安裝設定。傳遞的索引 URL 會取代標準系統設定的索引 URL 設定。為了防止將私有儲存庫的密碼新增到您的 DAG 程式碼中,您可以使用 Airflow 連線與 Hook。為此,可以使用連線類型 Package Index (Python)

在您想要防止遠端呼叫來設定虛擬環境的特殊情況下,請將 index_urls 作為空列表傳遞,即 index_urls=[],這會強制 pip 安裝程式使用 --no-index 選項。

快取和重複使用

虛擬環境的設定是針對每個任務執行在臨時目錄中完成的。執行後,虛擬環境會再次被刪除。確保您的 worker 上的 $tmp 資料夾有足夠的磁碟空間。通常(如果未進行不同的配置),將會使用本機 pip 快取,以防止每次執行都重新下載套件。

但是,為每次執行設定虛擬環境仍然需要一些時間。對於重複執行,您可以將選項 venv_cache_path 設定為 worker 上的檔案系統資料夾。在這種情況下,虛擬環境將設定一次並重複使用。如果使用虛擬環境快取,則在快取路徑中會為每個唯一的需求集建立不同的虛擬環境子資料夾。因此,根據您系統設定中 DAG 的變化,需要足夠的磁碟空間。

請注意,在快取模式下,不會進行自動清理。所有 worker 插槽都共用相同的虛擬環境,但如果任務在不同的 worker 上重複排程,則可能會發生在多個 worker 上個別建立虛擬環境的情況。此外,如果 worker 在 Kubernetes POD 中啟動,則重新啟動 worker 將會捨棄快取(假設 venv_cache_path 不在永久卷上)。

如果您在執行時遇到快取虛擬環境損壞的問題,您可以透過將 Airflow 變數 PythonVirtualenvOperator.cache_key 設定為任何文字來影響快取目錄雜湊。此變數的內容用於向量中,以計算快取目錄金鑰。

請注意,對快取虛擬環境的任何修改(例如二進位路徑中的臨時檔案、後安裝其他需求)都可能會污染快取虛擬環境,並且運算子不會維護或清理快取路徑。

上下文

在某些限制下,您也可以在虛擬環境中使用 Context

重要

在虛擬環境中使用 Context 有點挑戰,因為它涉及到程式庫依賴性和序列化問題。

您可以透過使用 Jinja 範本變數 並將其明確地作為參數傳遞,來在某種程度上繞過此問題。

您也可以使用 get_current_context(),其方式與之前相同,但有一些限制。

  • 需要 apache-airflow>=3.0.0

  • use_airflow_context 設定為 True 以在虛擬環境中呼叫 get_current_context()

  • system_site_packages 設定為 True 或將 expect_airflow 設定為 True

airflow/example_dags/example_python_context_decorator.py

@task.virtualenv(task_id="print_the_context_venv", use_airflow_context=True)
def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = print_context_venv()

airflow/example_dags/example_python_context_operator.py

def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = PythonVirtualenvOperator(
    task_id="print_the_context_venv", python_callable=print_context_venv, use_airflow_context=True
)

ExternalPythonOperator

ExternalPythonOperator 可以幫助您使用與其他任務(以及主 Airflow 環境)不同的一組 Python 程式庫來執行某些任務。這可能是虛擬環境或預先安裝且在 Airflow 任務執行的環境中可用的任何 Python 安裝。運算子將 Python 二進位檔作為 python 參數。請注意,即使在虛擬環境的情況下,python 路徑也應指向虛擬環境內的 python 二進位檔(通常在虛擬環境的 bin 子目錄中)。與虛擬環境的常規使用相反,不需要 activation 環境。僅使用 python 二進位檔會自動啟用它。在下面的兩個範例中,PATH_TO_PYTHON_BINARY 都是這樣一個路徑,指向可執行的 Python 二進位檔。

使用 ExternalPythonOperator 在預先定義的環境中執行 Python 可呼叫物件。virtualenv 套件應預先安裝在 Python 運行的環境中。如果使用 dill,則必須預先安裝在環境中(與主 Airflow 環境中安裝的版本相同)。

提示

建議使用 @task.external_python 裝飾器,而不是傳統的 ExternalPythonOperator,以在預先定義的 Python 環境中執行 Python 程式碼。

airflow/example_dags/example_python_decorator.py[原始碼]

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py[原始碼]

def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = ExternalPythonOperator(
    task_id="external_python",
    python_callable=callable_external_python,
    python=PATH_TO_PYTHON_BINARY,
)

傳入引數

如同使用一般的 Python 函數一樣,將額外引數傳遞給 @task.external_python 裝飾的函數。不幸的是,由於與底層程式庫不相容,Airflow 不支援序列化 varti / task_instance。對於 Airflow 上下文變數,請確保 Airflow 也作為 virtualenv 環境的一部分安裝,版本與任務運行的 Airflow 版本相同。否則,您將無法在 op_kwargs 中存取大多數 Airflow 的上下文變數。如果您想要與日期時間物件相關的上下文,例如 data_interval_start,您可以將 pendulumlazy_object_proxy 新增到您的虛擬環境中。

重要

定義要執行的 Python 函數主體會從 DAG 中剪下到一個臨時檔案中,且不包含周圍的程式碼。如同範例所示,您需要再次新增所有 import,且不能依賴全域 Python 上下文中的變數。

如果您想要將變數傳遞到傳統的 ExternalPythonOperator 中,請使用 op_argsop_kwargs

範本化

Jinja 範本化可以與 PythonOperator 中描述的方式相同使用。

上下文

您可以在與 PythonVirtualenvOperator 相同的條件下使用 Context

airflow/example_dags/example_python_context_decorator.py

@task.external_python(
    task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, use_airflow_context=True
)
def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = print_context_external()

airflow/example_dags/example_python_context_operator.py

def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = ExternalPythonOperator(
    task_id="print_the_context_external",
    python_callable=print_context_external,
    python=SOME_EXTERNAL_PYTHON,
    use_airflow_context=True,
)

PythonBranchOperator

使用 PythonBranchOperator 來執行 Python 分支 任務。

提示

建議使用 @task.branch 裝飾器,而不是傳統的 PythonBranchOperator,以執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

@task.branch()
def branching(choices: list[str]) -> str:
    return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

branching = BranchPythonOperator(
    task_id="branching",
    python_callable=lambda: f"branch_{random.choice(options)}",
)

傳入引數和範本化

引數傳遞和範本化選項與 PythonOperator 相同。

BranchPythonVirtualenvOperator

使用 BranchPythonVirtualenvOperator 裝飾器來執行 Python 分支 任務,並且是 PythonBranchOperator 與在虛擬環境中執行的混合體。

提示

建議使用 @task.branch_virtualenv 裝飾器,而不是傳統的 BranchPythonVirtualenvOperator,以執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()

@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

def branch_with_venv(choices):
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

branching_venv = BranchPythonVirtualenvOperator(
    task_id="branching_venv",
    requirements=["numpy~=1.26.0"],
    venv_cache_path=VENV_CACHE_PATH,
    python_callable=branch_with_venv,
    op_args=[options],
)

傳入引數和範本化

引數傳遞和範本化選項與 PythonOperator 相同。

BranchExternalPythonOperator

使用 BranchExternalPythonOperator 來執行 Python 分支 任務,並且是 PythonBranchOperator 與在外部 Python 環境中執行的混合體。

提示

建議使用 @task.branch_external_python 裝飾器,而不是傳統的 BranchExternalPythonOperator,以執行 Python 程式碼。

airflow/example_dags/example_branch_operator_decorator.py[原始碼]

@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
    import random

    return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[原始碼]

def branch_with_external_python(choices):
    import random

    return f"ext_py_{random.choice(choices)}"

branching_ext_py = BranchExternalPythonOperator(
    task_id="branching_ext_python",
    python=PATH_TO_PYTHON_BINARY,
    python_callable=branch_with_external_python,
    op_args=[options],
)

傳入引數和範本化

引數傳遞和範本化選項與 PythonOperator 相同。

ShortCircuitOperator

使用 ShortCircuitOperator 來控制在條件滿足或獲得真值時,管線是否繼續。

此條件和真值的評估是透過可呼叫物件的輸出完成的。如果可呼叫物件傳回 True 或真值,則允許管線繼續,並且將推送輸出的 XCom。如果輸出為 False 或假值,則管線將根據配置的短路(稍後詳細介紹)進行短路。在下面的範例中,當「condition_is_true」任務之後的任務將執行時,「condition_is_false」任務的下游任務將被跳過。

提示

建議使用 @task.short_circuit 裝飾器,而不是傳統的 ShortCircuitOperator,以透過 Python 可呼叫物件來短路管線。

airflow/example_dags/example_short_circuit_decorator.py[原始碼]

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py[原始碼]

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

「短路」可以配置為尊重或忽略為下游任務定義的 觸發規則。如果 ignore_downstream_trigger_rules 設定為 True(預設配置),則所有下游任務都將被跳過,而無需考慮為任務定義的 trigger_rule。如果此參數設定為 False,則直接下游任務將被跳過,但其他後續下游任務的指定 trigger_rule 將被尊重。在此短路配置中,運算子假設直接下游任務是故意要跳過的,但可能不是其他後續任務。如果只需要短路管線的 *部分*,而不是短路短路任務之後的所有任務,則此配置特別有用。

在下面的範例中,請注意「short_circuit」任務配置為尊重下游觸發規則。這表示雖然「short_circuit」任務之後的任務將被跳過,因為裝飾的函數傳回 False,但「task_7」仍將執行,因為它設定為在上游任務完成運行時執行,無論狀態如何(即 TriggerRule.ALL_DONE 觸發規則)。

airflow/example_dags/example_short_circuit_decorator.py[原始碼]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py[原始碼]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

傳入引數和範本化

引數傳遞和範本化選項與 PythonOperator 相同。

此條目是否有幫助?