airflow.providers.standard.operators.bash

模組內容

類別

BashOperator

執行 Bash 腳本、命令或一組命令。

class airflow.providers.standard.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[原始碼]

基底: airflow.models.baseoperator.BaseOperator

執行 Bash 腳本、命令或一組命令。

另請參閱

如需更多關於如何使用此運算子的資訊,請查看指南: BashOperator

如果 BaseOperator.do_xcom_push 為 True,當 bash 命令完成時,寫入 stdout 的最後一行也會被推送到 XCom

參數
  • bash_command (str | airflow.utils.types.ArgNotSet) – 要執行的命令、一組命令或對 Bash 腳本的參考(必須是 '.sh' 或 '.bash')。 (樣板化)

  • env (dict[str, str] | None) – 如果 env 不是 None,則它必須是一個字典,用於定義新進程的環境變數;這些變數將取代繼承當前進程環境的方式,後者是預設行為。(樣板化)

  • append_env (bool) – 如果為 False(預設值),則使用在 env 參數中傳遞的環境變數,並且不繼承當前進程環境。如果為 True,則繼承來自當前傳遞的環境變數,然後使用者傳遞的環境變數將更新現有的繼承環境變數,或將新變數附加到其中

  • output_encoding (str) – Bash 命令的輸出編碼

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任務以此退出代碼退出,則將任務保持在 skipped 狀態(預設值:99)。如果設定為 None,任何非零退出代碼都將被視為失敗。

  • cwd (str | None) – 在其中執行命令的工作目錄(樣板化)。如果為 None(預設值),則命令在臨時目錄中執行。若要使用目前的 DAG 資料夾作為工作目錄,您可以設定樣板 {{ dag_run.dag.folder }}。當 bash_command 是 '.sh' 或 '.bash' 檔案時,Airflow 必須具有對工作目錄的寫入權限。腳本將在此目錄中呈現(Jinja 樣板)到一個新的臨時檔案中。

  • output_processor (Callable[[str], Any]) – 進一步處理 bash 腳本輸出的函數(預設值為 lambda output: output)。

Airflow 將評估 Bash 命令的退出代碼。一般來說,非零退出代碼將導致任務失敗,而零將導致任務成功。退出代碼 99(或在 skip_on_exit_code 中設定的另一個代碼)將拋出 airflow.exceptions.AirflowSkipException,這將使任務處於 skipped 狀態。您可以透過設定 skip_on_exit_code=None 將所有非零退出代碼視為失敗。

退出代碼

行為

0

成功

skip_on_exit_code (預設值:99)

引發 airflow.exceptions.AirflowSkipException

否則

引發 airflow.exceptions.AirflowException

注意

除非整個 shell 以非零退出代碼退出,否則 Airflow 將無法識別非零退出代碼。如果非零退出代碼來自子命令,則可能會出現此問題。解決此問題的最簡單方法是在命令前加上 set -e;

bash_command = "set -e; python3 script.py '{{ data_interval_end }}'"

注意

若要簡單地執行 .sh.bash 腳本(不使用任何 Jinja 樣板),請在腳本名稱 bash_command 參數後方新增一個空格 - 例如 bash_command="my_script.sh "。這是因為當 Airflow 檔案以 .sh.bash 結尾時,會嘗試載入並將其作為 Jinja 樣板處理。

如果您的腳本中有 Jinja 樣板,請勿放置任何空白字元。並將腳本的目錄新增至 DAG 的 template_searchpath。如果您指定 cwd,Airflow 必須具有對此目錄的寫入權限。腳本將在此目錄中呈現(Jinja 樣板)到一個新的臨時檔案中。

警告

應注意「使用者」輸入或在 bash_command 中使用 Jinja 樣板,因為此 bash 運算子不會對命令執行任何跳脫或清理。

這主要適用於使用「dag_run」conf,因為可以透過 Web UI 中的使用者提交。大多數預設樣板變數沒有風險。

例如,不要 這樣做

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
)

相反地,您應該透過 env kwarg 傳遞此值,並在 bash_command 內使用雙引號,如下所示

bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

在版本 2.10.0 中新增: output_processor 參數。

template_fields: collections.abc.Sequence[str] = ('bash_command', 'env', 'cwd')[原始碼]
template_fields_renderers[原始碼]
template_ext: collections.abc.Sequence[str] = ('.sh', '.bash')[原始碼]
ui_color = '#f0ede4'[原始碼]
subprocess_hook()[原始碼]

傳回用於執行 bash 命令的 hook。

get_env(context)[原始碼]

建立要為 bash 命令公開的環境變數集。

execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 樣板時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

on_kill()[原始碼]

覆寫此方法以在任務實例被終止時清理子進程。

運算子內任何對 threading、subprocess 或 multiprocessing 模組的使用都需要清理,否則會留下幽靈進程。

此條目是否有幫助?