airflow.models.dagrun
¶
模組內容¶
類別¶
DagRun.task_instance_scheduling_decisions 的回傳類型。 |
|
DAG 的調用實例。 |
|
用於儲存關於 dagrun 實例的任意筆記。 |
屬性¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]¶
- class airflow.models.dagrun.TISchedulingDecision[source]¶
基類:
NamedTuple
DagRun.task_instance_scheduling_decisions 的回傳類型。
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[source]¶
基類:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
DAG 的調用實例。
DAG 運行可以由排程器(即排程運行)或外部觸發器(即手動運行)建立。
- property logical_date: datetime.datetime[source]¶
- set_state(state)[source]¶
變更 DagRan 的狀態。
屬性的變更根據下表實作(列代表舊狀態,欄代表新狀態)
狀態轉換矩陣¶ 已排隊
執行中
成功
失敗
無
queued_at = timezone.utcnow()
如果為空:start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
已排隊
queued_at = timezone.utcnow()
如果為空:start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
執行中
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
成功
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
失敗
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[source]¶
從資料庫重新載入目前的 dagrun。
- 參數
session (sqlalchemy.orm.Session) – 資料庫連線
- classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[source]¶
取得每個 DAG 的活動 dag run 數量。
- classmethod next_dagruns_to_examine(state, session, max_number=None)[source]¶
傳回排程器應嘗試排程的下一個 DagRun。
這將傳回零或多個 DagRun 列,這些列已使用 “SELECT ... FOR UPDATE” 查詢進行列級鎖定,您應確保所有排程決策都在單一交易中完成 – 一旦交易提交,它將被解除鎖定。
- classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]¶
傳回符合給定搜尋條件的一組 dag run。
- 參數
dag_id (str | list[str] | None) – 要尋找 dag run 的 dag_id 或 dag_id 清單
run_id (Iterable[str] | None) – 定義此 dag run 的 run id
run_type (airflow.utils.types.DagRunType | None) – DagRun 的類型
execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – 執行日期
state (airflow.utils.state.DagRunState | None) – dag run 的狀態
external_trigger (bool | None) – 此 dag run 是否為外部觸發
no_backfills (bool) – 不傳回回填 (True),傳回全部 (False)。預設為 False
session (sqlalchemy.orm.Session) – 資料庫連線
execution_start_date (datetime.datetime | None) – 從此日期開始執行的 dag run
execution_end_date (datetime.datetime | None) – 執行到此日期為止的 dag run
- classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]¶
傳回 DAG 現有的 run,其具有特定的 run_id 或 execution_date。
如果找不到此 DAG run,則傳回 None。
- 參數
dag_id (str) – 要尋找重複項的 dag_id
run_id (str) – 定義此 dag run 的 run id
execution_date (datetime.datetime) – 執行日期
session (sqlalchemy.orm.Session) – 資料庫連線
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
傳回此 dag run 的 task instances。
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
傳回此 dag run 的 task instances。
重新導向至 DagRun.fetch_task_instances 方法。保留此方法是因為它在程式碼中廣泛使用。
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
傳回此 dag run 的 task_id 所指定的 task instance。
- 參數
task_id (str) – task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
傳回此 dag run 的 task_id 所指定的 task instance。
- 參數
dag_id (str) – DAG id
dag_run_id (str) – DAG run id
task_id (str) – task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
傳回先前的 DagRun(如果有的話)。
- 參數
dag_run (DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic) – dag run
session(連線階段) (sqlalchemy.orm.Session) – SQLAlchemy ORM 連線階段
state(狀態) (airflow.utils.state.DagRunState | None) – dag 執行個體的狀態
- static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]¶
返回先前的 SCHEDULED DagRun(如果有的話)。
- 參數
dag_run_id (int) – DAG 執行個體 ID
session(連線階段) (sqlalchemy.orm.Session) – SQLAlchemy ORM 連線階段
- update_state(session=NEW_SESSION, execute_callbacks=True)[source]¶
根據其 TaskInstance 的狀態,判斷 DagRun 的整體狀態。
- 參數
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
execute_callbacks(執行回呼) (bool) – 判斷是否應直接調用 dag 回呼(成功/失敗、SLA 等)(預設值:true),或將其記錄為
returned_callback
屬性中的待處理請求
- 傳回
元組,其中包含可在當前迴圈中排程的 tis,以及需要執行的 returned_callback
- 傳回類型
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION)[source]¶
通過檢查已移除的任務或尚未在資料庫中的任務,來驗證 DagRun。
如果需要,它會將狀態設定為已移除或新增任務。
- Missing_indexes(遺失的索引)
一個字典,其中包含任務與遺失的索引。
- 參數
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session