使用命令列介面

本文檔旨在概述使用 CLI 時的所有常見任務。

注意

有關 CLI 命令的更多資訊,請參閱 命令列介面與環境變數參考

設定 Bash/Zsh 自動完成

當使用 bash (或 zsh) 作為您的 shell 時,airflow 可以使用 argcomplete 進行自動完成。

對於所有啟用 argcomplete 的 python 應用程式的 全域啟用,請執行

sudo activate-global-python-argcomplete

對於永久性(但非全域性)airflow 啟用,請使用

register-python-argcomplete airflow >> ~/.bashrc

對於僅針對 airflow 的 argcomplete 一次性啟用,請使用

eval "$(register-python-argcomplete airflow)"
../_images/cli_completion.gif

如果您使用 zsh,請將以下內容新增至您的 .zshrc

autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"

建立連線

有關使用 CLI 建立連線的資訊,請參閱 從 CLI 建立連線

將 DAG 結構匯出為圖片

Airflow 可讓您將 DAG 結構列印或儲存為圖片。這對於記錄或分享您的 DAG 結構很有用。您需要安裝 Graphviz

例如,要將 example_complex DAG 列印到終端機

airflow dags show example_complex

這會將渲染的 DAG 結構(類似於 圖形)以 DOT 格式列印到螢幕上。

支援多種檔案格式。若要使用它們,請新增參數 --save [filename].[format]

要將 example_complex DAG 儲存為 PNG 檔案

airflow dags show example_complex --save example_complex.png

這會將以下圖片儲存為檔案

../_images/usage_cli_export.png

DAG 表示範例

支援以下檔案格式

  • bmp

  • canondotgvxdotxdot1.2xdot1.4

  • cgimage

  • cmap

  • eps

  • exr

  • fig

  • gdgd2

  • gif

  • gtk

  • ico

  • imapcmapx

  • imap_npcmapx_np

  • ismap

  • jp2

  • jpgjpegjpe

  • jsonjson0dot_jsonxdot_json

  • pctpict

  • pdf

  • pic

  • plainplain-ext

  • png

  • pov

  • ps

  • ps2

  • psd

  • sgi

  • svgsvgz

  • tga

  • tiftiff

  • tk

  • vmlvmlz

  • vrml

  • wbmp

  • webp

  • xlib

  • x11

預設情況下,Airflow 會在 airflow.cfg 檔案的 [core] 區段中 dags_folder 選項指定的目錄中尋找 DAG。您可以使用 --subdir 參數選取新的目錄。

顯示 DAG 結構

有時您會處理包含複雜依賴關係的 DAG。預覽 DAG 以查看其是否正確會很有幫助。

如果您有 macOS,您可以將 iTerm2imgcat 腳本一起使用,以在主控台中顯示 DAG 結構。您也需要安裝 Graphviz

其他終端機不支援顯示高品質圖形。您可以將圖片轉換為文字形式,但其解析度會讓您無法閱讀。

若要執行此操作,您應該在 airflow dags show 命令中使用 --imgcat 切換。例如,如果您想要顯示 example_bash_operator DAG,則可以使用以下命令

airflow dags show example_bash_operator --imgcat

您會看到與以下螢幕截圖類似的結果。

../_images/usage_cli_imgcat.png

iTerm2 中的 DAG 預覽

格式化命令輸出

某些 Airflow 命令(例如 airflow dags listairflow tasks states-for-dag-run)支援 --output 旗標,允許使用者變更命令輸出的格式。可能的選項

  • table - 將資訊渲染為純文字表格

  • simple - 將資訊渲染為簡單表格,可由標準 linux 公用程式解析

  • json - 以 json 字串形式渲染資訊

  • yaml - 以有效的 yaml 形式渲染資訊

jsonyaml 格式都可讓您更輕鬆地使用命令列工具(例如 jqyq)來操作資料。例如

airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
  "sd": "2020-11-29T14:53:46.811030+00:00",
  "ed": "2020-11-29T14:53:46.974545+00:00"
}
{
  "sd": "2020-11-29T14:53:56.926441+00:00",
  "ed": "2020-11-29T14:53:57.118781+00:00"
}
{
  "sd": "2020-11-29T14:53:56.915802+00:00",
  "ed": "2020-11-29T14:53:57.125230+00:00"
}
{
  "sd": "2020-11-29T14:53:56.922131+00:00",
  "ed": "2020-11-29T14:53:57.129091+00:00"
}
{
  "sd": "2020-11-29T14:53:56.931243+00:00",
  "ed": "2020-11-29T14:53:57.126306+00:00"
}

從元數據資料庫清除歷史記錄

注意

強烈建議您在執行 db clean 命令之前備份元數據資料庫。

db clean 命令的工作方式是從每個表格中刪除早於提供的 --clean-before-timestamp 的記錄。

您可以選擇性地提供要執行刪除的表格清單。如果未提供表格清單,則將包含所有表格。

您可以使用 --dry-run 選項來列印要清除的主要表格中的列計數。

預設情況下,db clean 會將清除的列封存到 _airflow_deleted__<table>__<timestamp> 形式的表格中。如果您不希望以這種方式保留資料,您可以提供參數 --skip-archive

從封存表格匯出已清除的記錄

db export-archived 命令會將封存表格(由 db clean 命令建立)的內容匯出為指定的格式,預設為 CSV 檔案。匯出的檔案將包含在 db clean 過程中從主要表格中清除的記錄。

您可以使用 --export-format 選項指定匯出格式。預設格式為 csv,目前也是唯一支援的格式。

您還必須使用 --output-path 選項指定要將資料匯出到的路徑位置。此位置必須存在。

其他選項包括:--tables 用於指定要匯出的表格,--drop-archives 用於在匯出後刪除封存表格。

刪除封存表格

如果在 db clean 過程中,您未使用 --skip-archive 選項(會刪除封存表格),您仍然可以使用 db drop-archived 命令刪除封存表格。此操作不可逆,建議您在使用 db export-archived 命令將表格備份到磁碟後再刪除它們。

您可以使用 --tables 選項指定要刪除的表格。如果未指定表格,則將刪除所有封存表格。

注意級聯刪除

請記住,某些表格具有使用 ON DELETE CASCADE 定義的外鍵關係,因此在一個表格中刪除可能會觸發其他表格中的刪除。例如,task_instance 表格鍵至 dag_run 表格,因此如果刪除 DagRun 記錄,則也會刪除其所有相關聯的任務執行個體。

DAG 執行個體的特殊處理

通常,Airflow 會透過查詢最新的 DagRun 來判斷下一個要執行的 DagRun。如果您刪除所有 DAG 執行個體,Airflow 可能會排程已完成的舊 DAG 執行個體,例如,如果您已設定 catchup=True。因此,db clean 命令將保留最新的非手動觸發 DAG 執行個體,以維持排程的連續性。

可回填 DAG 的考量

並非所有 DAG 都設計用於 Airflow 的回填命令。但是對於那些 DAG,需要特別注意。如果您刪除 DAG 執行個體,並且您在包含已刪除 DAG 執行個體的日期範圍內執行回填,則這些執行個體將會重新建立並再次執行。因此,如果您的 DAG 屬於此類別,您可能需要避免刪除 DAG 執行個體,而僅清除其他大型表格,例如任務執行個體和日誌等。

升級 Airflow

執行 airflow db migrate --help 以取得使用詳細資訊。

手動執行遷移

如果需要,您可以產生升級的 sql 陳述式,並一次手動套用每個升級遷移。若要執行此操作,您可以將 --range(適用於 Airflow 版本)或 --revision-range(適用於 Alembic 修訂版本)選項與 db migrate 一起使用。請勿略過執行 Alembic 修訂版本 ID 更新命令;這是 Airflow 下次需要升級時知道您從哪裡升級的方式。請參閱 資料庫遷移參考,以取得修訂版本與版本之間的對應關係。

降級 Airflow

注意

建議您在執行 db downgrade 或任何其他資料庫操作之前備份資料庫。

您可以使用 db downgrade 命令降級到特定的 Airflow 版本。或者,您可以提供 Alembic 修訂版本 ID 以降級到該版本。

如果您想要預覽命令但不執行它們,請使用選項 --show-sql-only

選項 --from-revision--from-version 只能與 --show-sql-only 選項結合使用,因為在實際執行遷移時,我們應始終從目前的修訂版本降級。

有關 Airflow 版本與 Alembic 修訂版本之間的對應關係,請參閱 資料庫遷移參考

匯出連線

您可以使用 CLI 從資料庫匯出連線。支援的檔案格式為 jsonyamlenv

您可以將目標檔案指定為參數

airflow connections export connections.json

或者,您可以指定 file-format 參數以覆寫檔案格式

airflow connections export /tmp/connections --file-format yaml

您也可以為 STDOUT 指定 -

airflow connections export -

JSON 格式包含一個物件,其中鍵包含連線 ID,值包含連線的定義。在此格式中,連線定義為 JSON 物件。以下是 JSON 檔案範例。

{
  "airflow_db": {
    "conn_type": "mysql",
    "host": "mysql",
    "login": "root",
    "password": "plainpassword",
    "schema": "airflow",
    "port": null,
    "extra": null
  },
  "druid_broker_default": {
    "conn_type": "druid",
    "host": "druid-broker",
    "login": null,
    "password": null,
    "schema": null,
    "port": 8082,
    "extra": "{\"endpoint\": \"druid/v2/sql\"}"
  }
}

YAML 檔案結構與 JSON 的結構類似。連線 ID 和一個或多個連線的定義的鍵值對。在此格式中,連線定義為 YAML 物件。以下是 YAML 檔案範例。

airflow_db:
  conn_type: mysql
  extra: null
  host: mysql
  login: root
  password: plainpassword
  port: null
  schema: airflow
druid_broker_default:
  conn_type: druid
  extra: '{"endpoint": "druid/v2/sql"}'
  host: druid-broker
  login: null
  password: null
  port: 8082
  schema: null

您也可以使用 .env 格式匯出連線。鍵是連線 ID,值是連線的序列化表示法,使用 Airflow 的連線 URI 格式或 JSON。若要使用 JSON,請提供選項 --serialization-format=json,否則將使用 Airflow 連線 URI 格式。以下是使用兩種格式的 .env 檔案範例。

URI 範例

airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql

JSON 範例輸出

airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}

測試 DAG 匯入錯誤

CLI 可用於透過 list-import-errors 子命令檢查任何已發現的 DAG 是否有匯入錯誤。可以建立自動化步驟,如果任何 DAG 無法匯入,則透過檢查命令輸出而失敗,尤其是在與 --output 一起使用以產生標準檔案格式時。例如,當沒有錯誤時,預設輸出為 No data found,而 json 輸出為 []。然後可以在 CI 或 pre-commit 中執行檢查,以加速審查程序和測試。

如果存在任何錯誤,則失敗的範例命令,使用 jq 來解析輸出

airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'

該行可以直接新增至自動化,或者如果您想要列印輸出,則可以使用 tee

airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt

Jenkins 管道中的範例

stage('All DAGs are loadable') {
    steps {
        sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
    }
}

注意

為了使此功能準確運作,您必須確保 Airflow 不會在 stdout 中記錄任何其他文字。例如,您可能需要修正任何棄用警告、將 2>/dev/null 新增至您的命令,或者如果您有在外掛程式載入時產生日誌的外掛程式,則在 Airflow 組態中設定 lazy_load_plugins = True

此條目是否有幫助?