airflow.providers.standard.operators.python
¶
模組內容¶
類別¶
執行 Python 可呼叫物件。 |
|
工作流程可以在此任務執行後「分支」或遵循路徑。 |
|
允許管線根據 |
|
在自動建立和銷毀的 virtualenv 中執行函式。 |
|
工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。 |
|
在不會重新建立的 virtualenv 中執行函式。 |
|
工作流程可以在此任務執行後「分支」或遵循路徑。 |
函式¶
檢索執行上下文字典,而不變更使用者方法的簽章。 |
屬性¶
- class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]¶
基底類別:
airflow.models.baseoperator.BaseOperator
執行 Python 可呼叫物件。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: PythonOperator
執行可呼叫物件時,Airflow 將傳遞一組關鍵字引數,這些引數可用於您的函式中。這組 kwargs 完全對應於您可以在 Jinja 範本中使用的內容。為了使其運作,您需要在函式標頭中定義
**kwargs
,或者您可以直接新增您想要取得的關鍵字引數 - 例如,使用以下程式碼,您的可呼叫物件將取得ti
上下文變數的值。使用明確引數
def my_python_callable(ti): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"]
- 參數
python_callable (Callable) – 對可呼叫物件的參考
op_args (collections.abc.Collection[Any] | None) – 位置引數的列表,將在呼叫您的可呼叫物件時解包
op_kwargs (collections.abc.Mapping[str, Any] | None) – 關鍵字引數的字典,將在您的函式中解包
templates_dict (dict[str, Any] | None) – 字典,其中的值是範本,這些範本將在
__init__
和execute
之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供。(已範本化)templates_exts (collections.abc.Sequence[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如
['.sql', '.hql']
show_return_value_in_logs (bool) – 布林值,指示是否顯示 return_value 日誌。預設為 True,允許 return value 日誌輸出。可以設定為 False,以防止在您傳回大量資料(例如將大量 XCom 傳輸到 TaskAPI)時記錄 return value 輸出。
-
template_fields: collections.abc.Sequence[str] =
('templates_dict', 'op_args', 'op_kwargs')
- template_fields_renderers
-
BLUE: =
'#ffefeb'
- ui_color
-
shallow_copy_attrs: collections.abc.Sequence[str] =
('python_callable', 'op_kwargs')
- execute(context)
在建立運算子時衍生。
上下文是與呈現 Jinja 範本時使用的字典相同的字典。
請參閱 get_template_context 以取得更多上下文。
- determine_kwargs(context)
- execute_callable()
使用給定的引數呼叫 python 可呼叫物件。
- 傳回
呼叫的傳回值。
- 傳回類型
Any
- class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)
基底類別:
PythonOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在此任務執行後「分支」或遵循路徑。
它衍生自 PythonOperator,並預期一個 Python 函式,該函式傳回單個 task_id、單個 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表。傳回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為
skipped
狀態,以便這些路徑無法向前移動。skipped
狀態會向下游傳播,以允許填滿 DAG 狀態並推斷 DAG 執行狀態。- execute(context)
在建立運算子時衍生。
上下文是與呈現 Jinja 範本時使用的字典相同的字典。
請參閱 get_template_context 以取得更多上下文。
- class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)
基底類別:
PythonOperator
,airflow.models.skipmixin.SkipMixin
允許管線根據
python_callable
的結果繼續執行。ShortCircuitOperator 衍生自 PythonOperator,並評估
python_callable
的結果。如果傳回的結果為 False 或 falsy 值,則管線將會短路。下游任務將根據設定的短路模式標記為「skipped」狀態。如果傳回的結果為 True 或 truthy 值,則下游任務會照常進行,並推送傳回結果的XCom
。可以設定短路以遵循或忽略為下游任務設定的
trigger_rule
。如果ignore_downstream_trigger_rules
設定為 True(預設設定),則會跳過所有下游任務,而無需考慮為任務定義的trigger_rule
。但是,如果此參數設定為 False,則會跳過直接下游任務,但會遵循其他後續下游任務的指定trigger_rule
。在此模式下,運算子假設直接下游任務是有意要跳過的,但可能不是其他後續任務。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: ShortCircuitOperator
- 參數
ignore_downstream_trigger_rules (bool) – 如果設定為 True,則將跳過此運算子任務的所有下游任務。這是預設行為。如果設定為 False,則將跳過直接下游任務,但將遵循為所有其他下游任務定義的
trigger_rule
。
- execute(context)
在建立運算子時衍生。
上下文是與呈現 Jinja 範本時使用的字典相同的字典。
請參閱 get_template_context 以取得更多上下文。
- class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)
基底類別:
_BasePythonVirtualenvOperator
在自動建立和銷毀的 virtualenv 中執行函式。
函式(具有某些注意事項)必須使用 def 定義,而不是類別的一部分。所有匯入都必須在函式內發生,並且不得參考範圍外的任何變數。名為 virtualenv_string_args 的全域範圍變數將可用(由 string_args 填入)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的 virtualenv 在與 Airflow 不同的 Python 主要版本中執行,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是,您可以使用 string_args。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: PythonVirtualenvOperator
- 參數
python_callable (Callable) – 一個 Python 函式,不參考外部變數,使用 def 定義,將在虛擬環境中執行。
requirements (None | collections.abc.Iterable[str] | str) – 需求字串列表,或 pip 指定的(範本化)「需求檔案」。
python_version (str | None) – 執行虛擬環境的 Python 版本。請注意,2 和 2.7 都是可接受的形式。
serializer (_SerializerTypeDef | None) –
用於序列化引數和結果的序列化器。它可以是以下其中之一
"pickle"
:(預設) 使用 pickle 進行序列化。包含在 Python 標準函式庫中。"cloudpickle"
:使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。"dill"
:使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。
system_site_packages (bool) – 是否在您的虛擬環境中包含 system_site_packages。有關更多資訊,請參閱 virtualenv 文件。
pip_install_options (list[str] | None) – 安裝需求時的 pip 安裝選項列表。請參閱「pip install -h」以取得可用選項
op_args (collections.abc.Collection[Any] | None) – 要傳遞給 python_callable 的位置引數列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 要傳遞給 python_callable 的關鍵字引數字典。
string_args (collections.abc.Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行時以 list[str] 的形式提供給 python_callable。請注意,引數以換行符號分隔。
templates_dict (dict | None) – 字典,其中的值是範本,這些範本將在
__init__
和execute
之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供templates_exts (list[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如
['.sql', '.hql']
expect_airflow (bool) – 預期目標環境中安裝了 Airflow。如果為 true,如果未安裝 Airflow,運算子將引發警告,並在啟動時嘗試載入 Airflow 巨集。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保持在
skipped
狀態(預設值:None)。如果設定為None
,則任何非零結束代碼都將被視為失敗。index_urls (None | collections.abc.Collection[str] | str) – 可選的索引 URL 列表,用於從中載入 Python 套件。如果未提供,則將使用系統 pip conf 從來源套件。
venv_cache_path (None | os.PathLike[str]) – 虛擬環境父資料夾的可選路徑,其中將快取虛擬環境,建立子資料夾 venv-{hash},其中 hash 將替換為需求的校驗和。如果未提供,則將在每次執行時在暫存資料夾中建立和刪除虛擬環境。
env_vars (dict[str, str] | None) – 字典,其中包含在執行虛擬環境時要為其設定的其他環境變數。
inherit_env (bool) – 在執行虛擬環境時是否繼承目前的環境變數。如果設定為
True
,則虛擬環境將繼承父進程 (os.environ
) 的環境變數。如果設定為False
,則將使用乾淨的環境執行虛擬環境。use_airflow_context (bool) – 是否向 python_callable 提供
get_current_context()
。尚未實作 - 等待 AIP-72 上下文序列化。
- template_fields: collections.abc.Sequence[str]
-
template_ext: collections.abc.Sequence[str] =
('.txt',)
- execute_callable()
使用給定的引數呼叫 python 可呼叫物件。
- 傳回
呼叫的傳回值。
- class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)
基底類別:
PythonVirtualenvOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。
它衍生自 PythonVirtualenvOperator,並預期一個 Python 函式,該函式傳回單個 task_id、單個 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表。傳回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為
skipped
狀態,以便這些路徑無法向前移動。skipped
狀態會向下游傳播,以允許填滿 DAG 狀態並推斷 DAG 執行狀態。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: BranchPythonVirtualenvOperator
- class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
基底類別:
_BasePythonVirtualenvOperator
在不會重新建立的 virtualenv 中執行函式。
重複使用,無需建立虛擬環境的額外負擔 (但有一些注意事項)。
該函數必須使用 def 定義,且不能是類別的一部分。所有 import 必須在函數內部進行,且不能引用範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將會可用 (由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的虛擬環境運行的 Python 主要版本與 Airflow 不同,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。
如果 Airflow 安裝在外部環境中的版本與運算子使用的版本不同,則運算子將會失敗。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: ExternalPythonOperator
- 參數
python (str) – 指向虛擬環境內應使用的 Python 二進位檔的完整路徑字串 (特定於檔案系統) (在
VENV/bin
資料夾中)。應為絕對路徑 (因此通常以 “/” 或 “X:/” 開頭,取決於使用的檔案系統/作業系統)。python_callable (Callable) – 一個 Python 函式,不參考外部變數,使用 def 定義,將在虛擬環境中執行。
serializer (_SerializerTypeDef | None) –
用於序列化引數和結果的序列化器。它可以是以下其中之一
"pickle"
:(預設) 使用 pickle 進行序列化。包含在 Python 標準函式庫中。"cloudpickle"
:使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。"dill"
:使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。
op_args (collections.abc.Collection[Any] | None) – 要傳遞給 python_callable 的位置引數列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 要傳遞給 python_callable 的關鍵字引數字典。
string_args (collections.abc.Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行時以 list[str] 的形式提供給 python_callable。請注意,引數以換行符號分隔。
templates_dict (dict | None) – 字典,其中的值是範本,這些範本將在
__init__
和execute
之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供templates_exts (list[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如
['.sql', '.hql']
expect_airflow (bool) – 預期目標環境中安裝了 Airflow。如果為 true,如果未安裝 Airflow,運算子將引發警告,並在啟動時嘗試載入 Airflow 巨集。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保持在
skipped
狀態(預設值:None)。如果設定為None
,則任何非零結束代碼都將被視為失敗。env_vars (dict[str, str] | None) – 字典,其中包含在執行虛擬環境時要為其設定的其他環境變數。
inherit_env (bool) – 在執行虛擬環境時是否繼承目前的環境變數。如果設定為
True
,則虛擬環境將繼承父進程 (os.environ
) 的環境變數。如果設定為False
,則將使用乾淨的環境執行虛擬環境。use_airflow_context (bool) – 是否向 python_callable 提供
get_current_context()
。尚未實作 - 等待 AIP-72 上下文序列化。
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
繼承自:
ExternalPythonOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在此任務執行後「分支」或遵循路徑。
擴展了 ExternalPythonOperator,因此期望取得 Python:應使用的虛擬環境 (在
VENV/bin
資料夾中)。應為絕對路徑,以便它可以像 ExternalPythonOperator 一樣在單獨的虛擬環境中運行。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: BranchExternalPythonOperator
- airflow.providers.standard.operators.python.get_current_context()[source]¶
檢索執行上下文字典,而無需更改使用者方法的簽章。
這是檢索執行上下文字典的最簡單方法。
舊樣式
def my_task(**context): ti = context["ti"]
新樣式
from airflow.providers.standard.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
僅當在運算子開始執行後呼叫此方法時,當前上下文才會有值。