airflow.operators.bash

模組內容

類別

BashOperator

執行 Bash 指令稿、命令或一組命令。

class airflow.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_exit_code=None, skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[原始碼]

基底類別: airflow.models.baseoperator.BaseOperator

執行 Bash 指令稿、命令或一組命令。

另請參閱

如需關於如何使用此 operator 的更多資訊,請參閱指南: BashOperator

如果 BaseOperator.do_xcom_push 為 True,則當 bash 命令完成時,寫入 stdout 的最後一行也將被推送到 XCom

參數
  • bash_command (str | airflow.utils.types.ArgNotSet) – 要執行的命令、一組命令或 Bash 指令稿的參考(必須為 '.sh' 或 '.bash')。 (已套用範本)

  • env (dict[str, str] | None) – 如果 env 不是 None,則它必須是一個字典,用於定義新程序的環境變數;這些變數將取代繼承當前程序環境,這是預設行為。(已套用範本)

  • append_env (bool) – 如果為 False(預設),則使用 env 參數中傳遞的環境變數,且不繼承當前程序的環境。如果為 True,則繼承來自當前程序的環境變數,然後使用者傳遞的環境變數將更新現有的繼承環境變數,或將新變數附加到其中

  • output_encoding (str) – Bash 命令的輸出編碼

  • skip_on_exit_code (int | Container[int] | None) – 如果任務以此退出代碼退出,則將任務保持在 skipped 狀態(預設值:99)。如果設定為 None,則任何非零退出代碼都將被視為失敗。

  • cwd (str | None) – 在其中執行命令的工作目錄(已套用範本)。如果為 None(預設值),則命令在臨時目錄中執行。若要使用當前 DAG 資料夾作為工作目錄,您可以設定範本 {{ dag_run.dag.folder }}。當 bash_command 是 '.sh' 或 '.bash' 檔案時,Airflow 必須具有對工作目錄的寫入權限。指令稿將在此目錄中呈現(Jinja 範本)到一個新的臨時檔案中。

  • output_processor (Callable[[str], Any]) – 用於進一步處理 bash 指令稿輸出的函式(預設值為 lambda output: output)。

Airflow 將評估 Bash 命令的退出代碼。一般來說,非零退出代碼將導致任務失敗,而零將導致任務成功。退出代碼 99 (或在 skip_on_exit_code 中設定的另一個代碼)將拋出 airflow.exceptions.AirflowSkipException,這將使任務處於 skipped 狀態。您可以將所有非零退出代碼視為失敗,方法是設定 skip_on_exit_code=None

退出代碼

行為

0

成功

skip_on_exit_code (預設值:99)

引發 airflow.exceptions.AirflowSkipException

否則

引發 airflow.exceptions.AirflowException

注意

除非整個 shell 以非零退出代碼退出,否則 Airflow 將無法識別非零退出代碼。如果非零退出代碼來自子命令,則可能會出現問題。解決此問題的最簡單方法是在命令前加上 set -e;

bash_command = "set -e; python3 script.py '{{ next_execution_date }}'"

注意

若要簡單地執行 .sh.bash 指令稿(不使用任何 Jinja 範本),請在指令稿名稱 bash_command 參數後新增一個空格 – 例如 bash_command="my_script.sh "。這是因為當檔案以 .sh.bash 結尾時,Airflow 會嘗試載入此檔案並將其作為 Jinja 範本處理。

如果您的指令稿中有 Jinja 範本,請勿放置任何空格。並在 DAG 的 template_searchpath 中新增指令稿的目錄。如果您指定 cwd,則 Airflow 必須具有對此目錄的寫入權限。指令稿將在此目錄中呈現(Jinja 範本)到一個新的臨時檔案中。

警告

使用「使用者」輸入或在 bash_command 中使用 Jinja 範本時應謹慎,因為此 bash operator 不會對命令執行任何跳脫或清理。

這主要適用於使用「dag_run」conf,因為這可以透過 Web UI 中的使用者提交。大多數預設範本變數沒有風險。

例如,不要 這樣做

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
)

相反地,您應該透過 env kwarg 傳遞此內容,並在 bash_command 內部使用雙引號,如下所示

bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

2.10.0 版本新增: output_processor 參數。

template_fields: Sequence[str] = ('bash_command', 'env', 'cwd')[原始碼]
template_fields_renderers[原始碼]
template_ext: Sequence[str] = ('.sh', '.bash')[原始碼]
ui_color = '#f0ede4'[原始碼]
subprocess_hook()[原始碼]

傳回用於執行 bash 命令的 hook。

get_env(context)[原始碼]

建立要為 bash 命令公開的環境變數集。

execute(context)[原始碼]

在建立 operator 時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

on_kill()[原始碼]

覆寫此方法以在任務實例被終止時清理子程序。

在 operator 中使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下孤立程序。

此條目是否有幫助?