airflow.models.dagbag

模組內容

類別

FileLoadStat

關於單一檔案的資訊。

DagBag

dagbag 是一個 DAG 的集合,從資料夾樹狀結構中解析出來,並具有高階組態設定。

DagPriorityParsingRequest

模型用於儲存 DAG 解析請求,這些請求將在解析檔案時優先處理。

函式

generate_md5_hash(context)

class airflow.models.dagbag.FileLoadStat[source]

基底類別: NamedTuple

關於單一檔案的資訊。

參數
  • file – 載入的檔案。

  • duration – 處理檔案所花費的時間。

  • dag_num – 此檔案中載入的 DAG 總數。

  • task_num – 此檔案中載入的任務總數。

  • dags – 此檔案中載入的 DAG 名稱。

  • warning_num – 從處理此檔案捕獲的警告總數。

file: str[source]
duration: datetime.timedelta[source]
dag_num: int[source]
task_num: int[source]
dags: str[source]
warning_num: int[source]
class airflow.models.dagbag.DagBag(dag_folder=None, include_examples=NOTSET, safe_mode=NOTSET, read_dags_from_db=False, store_serialized_dags=None, load_op_links=True, collect_dags=True)[source]

基底類別: airflow.utils.log.logging_mixin.LoggingMixin

dagbag 是一個 DAG 的集合,從資料夾樹狀結構中解析出來,並具有高階組態設定。

一些可能的設定是作為後端的資料庫,以及用於啟動任務的執行器。這使得為生產和開發、測試或不同團隊或安全設定檔運行不同的環境變得更容易。原本是系統級別的設定現在是 dagbag 級別,以便一個系統可以運行多個獨立的設定集。

參數
  • dag_folder (str | pathlib.Path | None) – 要掃描以尋找 DAG 的資料夾

  • include_examples (bool | airflow.utils.types.ArgNotSet) – 是否包含 Airflow 附帶的範例

  • safe_mode (bool | airflow.utils.types.ArgNotSet) – 當 False 時,掃描所有 Python 模組以尋找 DAG。當 True 時,使用啟發式方法(包含 DAGairflow 字串的檔案)來篩選要掃描 DAG 的 Python 模組。

  • read_dags_from_db (bool) – 如果傳遞 True,則從資料庫讀取 DAG。如果 False,則從 Python 檔案讀取 DAG。

  • store_serialized_dags (bool | None) – 已棄用的參數,與 read_dags_from_db 效果相同

  • load_op_links (bool) – 在反序列化 DAG 時,是否應透過外掛程式載入額外的運算子連結?此標誌在排程器中設定為 False,以便不載入額外運算子連結,從而不在排程器中運行使用者程式碼。

  • collect_dags (bool) – 當為 True 時,在類別初始化期間收集 DAG。

property store_serialized_dags: bool[source]

是否從資料庫讀取 DAG。

property dag_ids: list[str][source]

取得 DAG ID。

返回

此 bag 中 DAG ID 的列表

返回類型

list[str]

size()[source]
返回

此 dagbag 中包含的 DAG 數量

返回類型

int

get_dag(dag_id, session=None)[source]

從字典中取得 DAG,如果已過期,則刷新它。

參數

dag_id – DAG ID

process_file(filepath, only_if_updated=True, safe_mode=True)[source]

給定 Python 模組或 zip 檔案的路徑,匯入模組並在其中尋找 DAG 物件。

bag_dag(dag, root_dag)[source]

將 DAG 新增到 bag 中,遞迴到子 DAG。

引發

如果在此 DAG 或其子 DAG 中檢測到循環,則引發 AirflowDagCycleException。

引發

如果此 DAG 或其子 DAG 已存在於 bag 中,則引發 AirflowDagDuplicatedIdException。

collect_dags(dag_folder=None, only_if_updated=True, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))[source]

在給定路徑中尋找 Python 模組,匯入它們,並將它們新增到 dagbag 集合中。

請注意,如果在處理目錄時找到 .airflowignore 檔案,它的行為將非常像 .gitignore,忽略與檔案中指定的任何模式匹配的檔案。

注意.airflowignore 中的模式被解釋為未錨定的正規表示式或類似 gitignore 的 glob 運算式,具體取決於 DAG_IGNORE_FILE_SYNTAX 組態參數。

collect_dags_from_db()[source]

從資料庫收集 DAG。

dagbag_report()[source]

列印關於 DagBag 載入統計資訊的報告。

sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]
airflow.models.dagbag.generate_md5_hash(context)[source]
class airflow.models.dagbag.DagPriorityParsingRequest(fileloc)[source]

基底類別: airflow.models.base.Base

模型用於儲存 DAG 解析請求,這些請求將在解析檔案時優先處理。

__tablename__ = 'dag_priority_parsing_request'[source]
id[source]
fileloc[source]
__repr__()[source]

返回 repr(self)。

此條目是否對您有幫助?