BashOperator¶
使用 BashOperator
在 Bash shell 中執行命令。要執行的 Bash 命令或腳本由以下決定:
使用
BashOperator
時的bash_command
參數,或如果使用 TaskFlow 裝飾器
@task.bash
,則從裝飾的可調用物件返回的非空字串值。
提示
建議使用 @task.bash
裝飾器,而不是傳統的 BashOperator
來執行 Bash 命令。
@task.bash
def run_after_loop() -> str:
return "echo https://airflow.dev.org.tw/"
run_this = run_after_loop()
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.dev.org.tw/",
)
範本化¶
您可以使用 Jinja 範本 來參數化 Bash 命令。
@task.bash
def also_run_this() -> str:
return 'echo "ti_key={{ task_instance_key_str }}"'
also_this = also_run_this()
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}"',
)
使用 @task.bash
TaskFlow 裝飾器可讓您返回格式化的字串,並利用所有 可直接存取裝飾任務的執行上下文變數。
@task.bash
def also_run_this_again(task_instance_key_str) -> str:
return f'echo "ti_key={task_instance_key_str}"'
also_this_again = also_run_this_again()
建議您利用這種方法,因為它非常符合整體 TaskFlow 範例。
注意
當在 Bash 命令中使用 Jinja 範本時,應謹慎處理「使用者」輸入,因為不會對 Bash 命令執行跳脫字元和清理。
這主要適用於使用 dag_run.conf
,因為它可以透過 Web UI 中的使用者提交。大多數預設範本變數沒有風險。
例如,不要 執行
@task.bash
def bash_task() -> str:
return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'
# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
message = dag_run.conf["message"] if dag_run.conf else ""
return f'echo "here is the message: {message}"'
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)
相反地,您應該透過 env
kwarg 傳遞此內容,並在 Bash 命令內使用雙引號。
@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo \"here is the message: '$message'\"",
env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)
跳過¶
一般來說,非零的退出代碼會產生 AirflowException,從而導致任務失敗。在希望任務以 skipped
狀態結束的情況下,您可以退出代碼 99
(或如果您傳遞 skip_on_exit_code
,則使用另一個退出代碼)。
@task.bash
def this_will_skip() -> str:
return 'echo "hello world"; exit 99;'
this_skips = this_will_skip()
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
輸出處理器¶
output_processor
參數可讓您指定一個 lambda 函數,該函數在 Bash 腳本的輸出作為 XCom 推送之前處理它。此功能對於直接在 BashOperator 中操作腳本的輸出特別有用,而無需額外的運算子或任務。
例如,考慮 Bash 腳本的輸出是 JSON 字串的情況。使用 output_processor
,您可以將此字串轉換為 JSON 物件,然後再將其儲存在 XCom 中。這簡化了工作流程,並確保下游任務以所需的格式接收處理後的資料。
以下是如何將 result_processor 與 BashOperator 一起使用
@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
return """
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
"""
bash_task = BashOperator(
task_id="filter_today_changes",
bash_command="""
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
""",
output_processor=lambda output: json.loads(output),
)
從檔案執行命令¶
BashOperator
和 @task.bash
TaskFlow 裝飾器都允許您執行儲存在檔案中的 Bash 命令。這些檔案必須具有 .sh
或 .bash
副檔名。
使用 Jinja 範本¶
您可以執行包含 Jinja 範本的 Bash 腳本。當您執行此操作時,Airflow 會載入檔案的內容、呈現範本,並將呈現的腳本寫入臨時檔案。預設情況下,該檔案會放置在臨時目錄中(在 /tmp
下)。您可以使用 cwd
參數變更此位置。
注意
Airflow 必須具有對 /tmp
或 cwd
目錄的寫入權限,才能夠將臨時檔案寫入磁碟。
若要執行 Bash 腳本,請將其放置在相對於包含 DAG 檔案的目錄的位置。因此,如果您的 DAG 檔案位於 /usr/local/airflow/dags/test_dag.py
中,您可以將 test.sh
檔案移動到 /usr/local/airflow/dags/
下的任何位置(例如:/usr/local/airflow/dags/scripts/test.sh
),並將相對路徑傳遞給 bash_command
,如下所示
@task.bash
def bash_example():
# "scripts" folder is under "/usr/local/airflow/dags"
return "scripts/test.sh"
t2 = BashOperator(
task_id="bash_example",
# "scripts" folder is under "/usr/local/airflow/dags"
bash_command="scripts/test.sh",
)
為 Bash 腳本建立單獨的資料夾可能出於許多原因而變得理想,例如分離腳本的邏輯和管線程式碼、允許在以不同語言組成的檔案中進行適當的程式碼醒目提示,以及在建構管線時的總體靈活性。
也可以將您的 template_searchpath
定義為指向 DAG 建構函式呼叫中的任何資料夾位置。
@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
@task.bash
def bash_example():
return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
t2 = BashOperator(
task_id="bash_example",
bash_command="test.sh ",
)
不使用 Jinja 範本¶
如果您的腳本不包含任何 Jinja 範本,請在腳本名稱後新增一個空格以停用 Airflow 的呈現。
@task.bash
def run_command_from_script() -> str:
return "$AIRFLOW_HOME/scripts/example.sh "
run_script = run_command_from_script()
run_script = BashOperator(
task_id="run_command_from_script",
bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)
找不到 Jinja 範本¶
如果您在嘗試執行 Bash 腳本時遇到「找不到範本」例外,請在腳本名稱後新增一個空格。這是因為 Airflow 嘗試對其套用 Jinja 範本,這將會失敗。
@task.bash
def bash_example():
# This fails with 'Jinja template not found' error
# return "/home/batcher/test.sh",
# This works (has a space after)
return "/home/batcher/test.sh "
BashOperator(
task_id="bash_example",
# This fails with 'Jinja template not found' error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
)
但是,如果您想在 Bash 腳本中使用範本,請不要新增空格,而是查看 具有 Jinja 範本的 Bash 腳本 章節。
使用 Python 豐富 Bash¶
@task.bash
TaskFlow 裝飾器可讓您將 Bash 和 Python 結合到任務中的強大組合中。
在 @task.bash
任務中使用 Python 條件式、其他函數呼叫等,可以幫助定義、擴充甚至建構要執行的 Bash 命令。
例如,使用條件邏輯來判斷任務行為
@task.bash
def sleep_in(day: str) -> str:
if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
return f"sleep {60 * 60}"
else:
raise AirflowSkipException("No sleeping in today!")
sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
或呼叫函數來幫助建構 Bash 命令
def _get_files_in_cwd() -> list[str]:
from pathlib import Path
dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
files = [str(elem) for elem in dir_contents if elem.is_file()]
return files
@task.bash
def get_file_stats() -> str:
from shlex import join
files = _get_files_in_cwd()
cmd = join(["stat", *files])
return cmd
get_file_stats()
這種執行前豐富化具有許多可能性。
BashSensor¶
使用 BashSensor
使用任意命令進行感測。命令應在成功時返回 0,否則返回任何其他值。
t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")
t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1")