airflow.models.dagrun

模組內容

類別

TISchedulingDecision

DagRun.task_instance_scheduling_decisions 的回傳類型。

DagRun

DAG 的調用實例。

DagRunNote

用於儲存關於 dagrun 實例的任意筆記。

屬性

CreatedTasks

RUN_ID_REGEX

airflow.models.dagrun.CreatedTasks[source]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]
class airflow.models.dagrun.TISchedulingDecision[source]

基類:NamedTuple

DagRun.task_instance_scheduling_decisions 的回傳類型。

tis: list[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]
changed_tis: bool[source]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]
finished_tis: list[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[source]

基類:airflow.models.base.Baseairflow.utils.log.logging_mixin.LoggingMixin

DAG 的調用實例。

DAG 運行可以由排程器(即排程運行)或外部觸發器(即手動運行)建立。

property stats_tags: dict[str, str][source]
property logical_date: datetime.datetime[source]
property state[source]
property is_backfill: bool[source]
__tablename__ = 'dag_run'[source]
id[source]
dag_id[source]
queued_at[source]
execution_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
external_trigger[source]
run_type[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
last_scheduling_decision[source]
dag_hash[source]
log_template_id[source]
updated_at[source]
clear_number[source]
__table_args__ = ()[source]
task_instances[source]
dag_model[source]
dag_run_note[source]
note[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__()[source]

傳回 repr(self)。

validate_run_id(key, run_id)[source]
get_state()[source]
set_state(state)[source]

變更 DagRan 的狀態。

屬性的變更根據下表實作(列代表舊狀態,欄代表新狀態)

狀態轉換矩陣

已排隊

執行中

成功

失敗

queued_at = timezone.utcnow()

如果為空:start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

已排隊

queued_at = timezone.utcnow()

如果為空:start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

執行中

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

成功

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

失敗

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

refresh_from_db(session=NEW_SESSION)[source]

從資料庫重新載入目前的 dagrun。

參數

session (sqlalchemy.orm.Session) – 資料庫連線

classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[source]

取得每個 DAG 的活動 dag run 數量。

classmethod next_dagruns_to_examine(state, session, max_number=None)[source]

傳回排程器應嘗試排程的下一個 DagRun。

這將傳回零或多個 DagRun 列,這些列已使用 “SELECT ... FOR UPDATE” 查詢進行列級鎖定,您應確保所有排程決策都在單一交易中完成 – 一旦交易提交,它將被解除鎖定。

classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]

傳回符合給定搜尋條件的一組 dag run。

參數
  • dag_id (str | list[str] | None) – 要尋找 dag run 的 dag_id 或 dag_id 清單

  • run_id (Iterable[str] | None) – 定義此 dag run 的 run id

  • run_type (airflow.utils.types.DagRunType | None) – DagRun 的類型

  • execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – 執行日期

  • state (airflow.utils.state.DagRunState | None) – dag run 的狀態

  • external_trigger (bool | None) – 此 dag run 是否為外部觸發

  • no_backfills (bool) – 不傳回回填 (True),傳回全部 (False)。預設為 False

  • session (sqlalchemy.orm.Session) – 資料庫連線

  • execution_start_date (datetime.datetime | None) – 從此日期開始執行的 dag run

  • execution_end_date (datetime.datetime | None) – 執行到此日期為止的 dag run

classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]

傳回 DAG 現有的 run,其具有特定的 run_id 或 execution_date。

如果找不到此 DAG run,則傳回 None

參數
static generate_run_id(run_type, execution_date)[source]

根據 Run Type 和 Execution Date 產生 Run ID。

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

傳回此 dag run 的 task instances。

get_task_instances(state=None, session=NEW_SESSION)[source]

傳回此 dag run 的 task instances。

重新導向至 DagRun.fetch_task_instances 方法。保留此方法是因為它在程式碼中廣泛使用。

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

傳回此 dag run 的 task_id 所指定的 task instance。

參數
static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

傳回此 dag run 的 task_id 所指定的 task instance。

參數
get_dag()[source]

傳回與此 DagRun 關聯的 Dag。

傳回

DAG

傳回類型

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

傳回先前的 DagRun(如果有的話)。

參數
static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]

返回先前的 SCHEDULED DagRun(如果有的話)。

參數
  • dag_run_id (int) – DAG 執行個體 ID

  • session(連線階段) (sqlalchemy.orm.Session) – SQLAlchemy ORM 連線階段

update_state(session=NEW_SESSION, execute_callbacks=True)[source]

根據其 TaskInstance 的狀態,判斷 DagRun 的整體狀態。

參數
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

  • execute_callbacks(執行回呼) (bool) – 判斷是否應直接調用 dag 回呼(成功/失敗、SLA 等)(預設值:true),或將其記錄為 returned_callback 屬性中的待處理請求

傳回

元組,其中包含可在當前迴圈中排程的 tis,以及需要執行的 returned_callback

傳回類型

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[source]
notify_dagrun_state_changed(msg='')[source]
verify_integrity(*, session=NEW_SESSION)[source]

通過檢查已移除的任務或尚未在資料庫中的任務,來驗證 DagRun。

如果需要,它會將狀態設定為已移除或新增任務。

Missing_indexes(遺失的索引)

一個字典,其中包含任務與遺失的索引。

參數

session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

classmethod get_latest_runs(session=NEW_SESSION)[source]

返回每個 DAG 的最新 DagRun。

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]

將給定的任務實例設定為已排程狀態。

schedulable_tis 的每個元素都應已設定其 task 屬性。

任何沒有回呼或出口的 EmptyOperator 都會直接設定為成功狀態。

所有 TI 都應屬於此 DagRun,但此程式碼位於熱路徑中,因此未進行檢查 – 呼叫端有責任僅使用來自單個 dag 執行個體的 TI 來呼叫此函式。

get_log_template(*, session=NEW_SESSION)[source]
get_log_filename_template(*, session=NEW_SESSION)[source]
class airflow.models.dagrun.DagRunNote(content, user_id=None)[source]

繼承自:airflow.models.base.Base

用於儲存關於 dagrun 實例的任意筆記。

__tablename__ = 'dag_run_note'[source]
user_id[source]
dag_run_id[source]
content[source]
created_at[source]
updated_at[source]
dag_run[source]
__table_args__ = ()[source]
__repr__()[source]

傳回 repr(self)。

此條目是否有幫助?