airflow.models.taskinstance

模組內容

類別

TaskInstance

任務實例儲存任務實例的狀態。

SimpleTaskInstance

簡化的任務實例。

TaskInstanceNote

用於儲存關於任務實例的任意註解。

函式

set_current_context(context)

將目前的執行環境設定為提供的環境物件。

clear_task_instances(tis, session[, ...])

清除一組任務實例,但確保正在執行的實例被終止。

屬性

TR

日誌

PAST_DEPENDS_MET

TaskInstanceStateType

airflow.models.taskinstance.TR[原始碼]
airflow.models.taskinstance.log[原始碼]
airflow.models.taskinstance.PAST_DEPENDS_MET = 'past_depends_met'[原始碼]
airflow.models.taskinstance.set_current_context(context)[原始碼]

將目前的執行環境設定為提供的環境物件。

此方法應在每次任務執行時呼叫一次,在呼叫 operator.execute 之前。

airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[原始碼]

清除一組任務實例,但確保正在執行的實例被終止。

同時將 Dagrun 的 state 設定為 QUEUED,並將 start_date 設定為執行時間。但僅適用於已完成的 DR(SUCCESS 和 FAILED)。不會清除正在執行的 DR(QUEUED 和 RUNNING)的 statestart_date,因為清除已在執行的 DR 的狀態是多餘的,而清除 `start_date` 會影響 DR 的持續時間。

參數
  • tis (任務實例的列表) – 任務實例的列表

  • session (sqlalchemy.orm.session.Session) – 目前的 session

  • dag_run_state (airflow.utils.state.DagRunState | airflow.typing_compat.Literal[False]) – 設定已完成 DagRun 的狀態。若設定為 False,則 DagRun 狀態不會變更。

  • dag (airflow.models.dag.DAG | None) – DAG 物件

  • activate_dag_runs (None) – 已棄用的參數,請勿傳遞

class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[原始碼]

繼承自: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

任務實例儲存任務實例的狀態。

此表為任務已執行及其所處狀態的權威和單一事實來源。

SqlAlchemy 模型刻意不使用 SqlAlchemy 外鍵連接到任務或 dag 模型,以便更精確地控制交易。

在此表上的資料庫交易應確保雙重觸發,並消除關於哪些任務實例準備好執行或未準備好執行的任何混淆,即使在多個排程器可能觸發任務實例的情況下。

map_index 中值 -1 代表以下任一情況:沒有對應任務的 TI;具有對應任務但尚未展開的 TI(state=pending);具有對應任務但展開為空列表的 TI(state=skipped)。

property stats_tags: dict[str, str][原始碼]

傳回任務實例標籤。

property next_try_number: int[原始碼]
property operator_name: str | None[原始碼]

@property:若有設定,則為 operator 使用更友善的顯示名稱。

property log_url: str[原始碼]

TaskInstance 的日誌 URL。

property mark_success_url: str[原始碼]

標記 TI 成功的 URL。

property key: airflow.models.taskinstancekey.TaskInstanceKey[原始碼]

傳回唯一識別任務實例的元組。

property is_premature: bool[原始碼]

傳回任務是否處於 UP_FOR_RETRY 狀態且重試間隔已過。

property previous_ti: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[原始碼]

此屬性已棄用。

請使用 airflow.models.taskinstance.TaskInstance.get_previous_ti

property previous_ti_success: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[原始碼]

此屬性已棄用。

請使用 airflow.models.taskinstance.TaskInstance.get_previous_ti

property previous_start_date_success: pendulum.DateTime | None[原始碼]

此屬性已棄用。

請使用 airflow.models.taskinstance.TaskInstance.get_previous_start_date

__tablename__ = 'task_instance'[原始碼]
task_id[原始碼]
dag_id[原始碼]
run_id[原始碼]
map_index[原始碼]
start_date[原始碼]
end_date[原始碼]
duration[原始碼]
state[原始碼]
try_number[原始碼]
max_tries[原始碼]
hostname[原始碼]
unixname[原始碼]
job_id[原始碼]
pool[原始碼]
pool_slots[原始碼]
queue[原始碼]
priority_weight[原始碼]
operator[原始碼]
custom_operator_name[原始碼]
queued_dttm[原始碼]
queued_by_job_id[原始碼]
pid[原始碼]
executor[原始碼]
executor_config[原始碼]
updated_at[原始碼]
rendered_map_index[原始碼]
external_executor_id[原始碼]
trigger_id[原始碼]
trigger_timeout[原始碼]
next_method[原始碼]
next_kwargs[原始碼]
__table_args__ = ()[原始碼]
dag_model: airflow.models.dag.DagModel[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
execution_date[source]
task_instance_note[source]
note[source]
task: airflow.models.operator.Operator | None[source]
test_mode: bool = False[source]
is_trigger_log_context: bool = False[source]
run_as_user: str | None[source]
__hash__()[source]

返回 hash(self)。

init_on_load()[source]

初始化未儲存在資料庫中的屬性。

task_display_name()[source]
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

返回一個可以在任何安裝了 Airflow 的地方執行的命令。

此命令是 Orchestrator 發送給 Executor 的訊息的一部分。

static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]

產生執行此任務實例所需的 shell 命令。

參數
  • dag_id (str) – DAG ID

  • task_id (str) – 任務 ID

  • run_id (str) – 此任務的 DagRun 的執行 ID

  • mark_success (bool) – 是否將任務標記為成功

  • ignore_all_deps (bool) – 忽略所有可忽略的依賴項。 覆蓋其他 ignore_* 參數。

  • ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 參數 (例如,對於回填)

  • wait_for_past_depends_before_skipping (bool) – 在將 ti 標記為跳過之前,等待過去的依賴項

  • ignore_task_deps (bool) – 忽略特定於任務的依賴項,例如 depends_on_past 和觸發規則

  • ignore_ti_state (bool) – 忽略任務實例先前的失敗/成功

  • local (bool) – 是否在本機執行任務

  • pickle_id (int | None) – 如果 DAG 已序列化到資料庫,則為與 pickled DAG 關聯的 ID

  • file_path (pathlib.PurePath | str | None) – 包含 DAG 定義的檔案路徑

  • raw (bool) – 原始模式 (需要更多詳細資訊)

  • job_id (str | None) – 任務 ID (需要更多詳細資訊)

  • pool (str | None) – 任務應在其中執行的 Airflow pool

  • cfg_path (str | None) – 組態檔案的路徑

返回

可用於執行任務實例的 shell 命令

返回類型

list[str]

current_state(session=NEW_SESSION)[source]

從資料庫取得最新的狀態。

如果傳遞了 Session,我們會使用它,並且查找狀態將成為 Session 的一部分,否則將使用新的 Session。

這裡使用 sqlalchemy.inspect 來取得主鍵,以確保如果它們更改,它也不會退化

參數

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

error(session=NEW_SESSION)[source]

強制將任務實例的狀態設定為資料庫中的 FAILED。

參數

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]
refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]

根據主鍵從資料庫重新整理任務實例。

參數
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • lock_for_update (bool) – 如果為 True,表示資料庫應鎖定 TaskInstance (發出 FOR UPDATE 子句),直到 Session 提交。

refresh_from_task(task, pool_override=None)[source]

從給定的任務複製通用屬性。

參數
  • task (airflow.models.operator.Operator) – 要複製的任務物件

  • pool_override (str | None) – 使用 pool_override 而不是任務的 pool

clear_xcom_data(session=NEW_SESSION)[source]
set_state(state, session=NEW_SESSION)[source]

設定 TaskInstance 狀態。

參數
返回

狀態是否已變更

返回類型

布林值

are_dependents_done(session=NEW_SESSION)[source]

檢查此任務實例的直接依賴項是否已成功或已跳過。

這旨在供 wait_for_downstream 使用。

當您不希望在依賴項完成之前開始處理任務的下一個排程時,這非常有用。 例如,如果任務 DROP 並重新建立表格。

參數

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

get_previous_dagrun(state=None, session=None)[source]

返回在此任務實例的 DagRun 之前執行的 DagRun。

參數
get_previous_ti(state=None, session=NEW_SESSION)[source]

返回在此任務實例之前執行的任務的任務實例。

參數
get_previous_execution_date(state=None, session=NEW_SESSION)[source]

從屬性 previous_ti_success 返回執行日期。

參數
get_previous_start_date(state=None, session=NEW_SESSION)[source]

從屬性 previous_ti_success 返回開始日期。

參數
are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]

給定依賴項的上下文,是否滿足執行此任務實例的所有條件。

(例如,從 UI 強制執行的任務實例將忽略某些依賴項)。

參數
  • dep_context (airflow.ti_deps.dep_context.DepContext | None) – 決定應評估的依賴項的執行上下文。

  • session (sqlalchemy.orm.session.Session) – 資料庫 Session

  • verbose (bool) – 是否在 info 或 debug 日誌級別記錄有關失敗依賴項的詳細資訊

get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[source]

取得失敗的依賴項。

__repr__()[source]

返回 repr(self)。

next_retry_datetime()[source]

如果任務實例失敗,則取得下次重試的日期時間。

對於指數退避,retry_delay 用作基準,並將轉換為秒。

ready_for_retry()[source]

檢查任務實例是否處於正確的狀態和時間範圍以進行重試。

get_dagrun(session=NEW_SESSION)[source]

返回此 TaskInstance 的 DagRun。

參數

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

返回

DagRun

返回類型

airflow.models.dagrun.DagRun

classmethod ensure_dag(task_instance, session=NEW_SESSION)[source]

確保任務具有關聯的 dag 物件,可能已透過序列化移除。

check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
emit_state_change_metric(new_state)[source]

傳送時間指標 (metric),表示給定狀態轉換所花費的時間長度。

先前的狀態和指標名稱會從任務所處的狀態推導而來。

參數

new_state (airflow.utils.state.TaskInstanceState) – 此任務剛剛設定的狀態。我們不使用 self.state,因為有時狀態會直接在資料庫中更新,而不是在本地 TaskInstance 物件中更新。支援的狀態:QUEUED 和 RUNNING

clear_next_method_args()[source]

確保我們取消設定 next_method 和 next_kwargs,以確保任何重試都不會重複使用它們。

defer_task(exception, session=NEW_SESSION)[source]

將任務標記為延遲 (deferred),並設定在 TaskDeferred 引發時恢復任務所需的觸發器 (trigger)。

run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]

執行 TaskInstance。

dry_run()[source]

僅呈現 (Render) TI 的範本 (Templates)。

classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_stop=False)[source]

處理 TaskInstance 的失敗。

參數

fail_stop (bool) – 若為 true,則停止 DAG 中剩餘的任務

static save_to_db(ti, session=NEW_SESSION)[source]
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]

處理任務實例 (task instance) 的失敗。

參數
  • error (None | str | BaseException) – 如果指定,則記錄拋出的特定異常

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • test_mode (bool | None) – 若為 True,則不在資料庫中記錄成功或失敗

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 若為 True,則任務不會重試

is_eligible_to_retry()[source]

任務實例是否符合重試的條件。

get_template_context(session=None, ignore_param_exceptions=True)[source]

傳回 TI 上下文。

參數
get_rendered_template_fields(session=NEW_SESSION)[source]

更新任務,使其包含呈現後的範本欄位,以便在 UI 中呈現。

如果任務已執行,將從資料庫中提取;否則將進行呈現。

overwrite_params_with_dag_run_conf(params, dag_run)[source]

使用 DagRun.conf 覆寫任務參數 (Task Params)。

render_templates(context=None, jinja_env=None)[source]

呈現運算子 (operator) 欄位中的範本。

如果任務最初是映射 (mapped) 的,這可能會將 self.task 替換為未映射、完全呈現的 BaseOperator。原始的 self.task 在替換之前會被傳回。

render_k8s_pod_yaml()[source]

呈現 k8s pod yaml。

get_rendered_k8s_spec(session=NEW_SESSION)[source]

呈現 k8s pod yaml。

get_email_subject_content(exception, task=None)[source]

取得異常的電子郵件主旨內容。

參數
email_alert(exception, task)[source]

傳送包含異常資訊的警報電子郵件。

參數
set_duration()[source]

設定任務實例的持續時間。

xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]

建立一個可供任務提取 (pull) 的 XCom。

參數
  • key (str) – 儲存值的鍵。

  • value (Any) – 要儲存的值。可能的類型取決於 enable_xcom_pickling 是否為 true。如果是,則可以是任何可 pickle 的物件;否則只能使用可 JSON 序列化的物件。

  • execution_date (datetime.datetime | None) – 已棄用的參數,不起作用。

xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]

提取 (Pull) 符合特定條件的 XCom(可選)。

參數
  • key (str) – XCom 的鍵。如果提供,則僅傳回具有相符鍵的 XCom。預設鍵為 'return_value',也可作為常數 XCOM_RETURN_KEY 使用。此鍵會自動賦予由任務傳回的 XCom(相對於手動推送 (push) 的 XCom)。若要移除篩選器,請傳遞 None

  • task_ids (str | Iterable[str] | None) – 僅提取來自具有相符 ID 的任務的 XCom。傳遞 None 以移除篩選器。

  • dag_id (str | None) – 如果提供,則僅提取來自此 DAG 的 XCom。如果為 None (預設值),則使用呼叫任務的 DAG。

  • map_indexes (int | Iterable[int] | None) – 如果提供,則僅提取具有相符索引的 XCom。如果為 None (預設值),則會從正在提取的任務推斷 (詳情請參閱下文)。

  • include_prior_dates (bool) – 如果為 False,則僅傳回來自當前 execution_date 的 XCom。如果為 True,則也會傳回來自先前日期的 XCom。

當提取單個任務(task_idNone 或字串)且未指定 map_indexes 時,傳回值會從指定的任務是否已映射來推斷。如果未映射,則傳回來自單個任務實例的值。如果要提取的任務已映射,則傳回一個迭代器 (iterator)(而非列表),該迭代器產生來自映射任務實例的 XCom。在任何一種情況下,如果找不到相符的 XCom,則傳回 default(如果未指定,則為 None)。

當提取多個任務時(即 task_idmap_index 為非字串的可迭代物件),會傳回相符 XCom 的列表。列表中的元素會依 task_idmap_index 中的項目順序排序。

get_num_running_task_instances(session, same_dagrun=False)[source]

從資料庫傳回正在執行的 TI 數量。

init_run_context(raw=False)[source]

設定日誌上下文。

static filter_for_tis(tis)[source]

傳回 SQLAlchemy 篩選器,以查詢選定的任務實例。

schedule_downstream_tasks(session=NEW_SESSION, max_tis_per_query=None)[source]

排程此任務實例的下游任務。

get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]

推斷與此 TI「相關」的上游 (upstream) 的映射索引 (map index)。

邏輯的主要部分存在是為了解決以下範例描述的問題,其中 ‘val’ 必須根據參考的使用位置解析為不同的值

@task
def this_task(v):  # This is self.task.
    return v * 2


@task_group
def tg1(inp):
    val = upstream(inp)  # This is the upstream task.
    this_task(val)  # When inp is 1, val here should resolve to 2.
    return val


# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])


@task_group
def tg2(inp):
    another_task(inp, val)  # val here should resolve to [2, 4, 6].


tg2.expand(inp=["a", "b"])

檢查 upstreamself.task 周圍的映射任務群組,以尋找共同的「祖先」。如果找到這樣的祖先,我們需要傳回特定的映射索引,以從上游 XCom 提取部分值。

參數
  • upstream (airflow.models.operator.Operator) – 被參考的上游任務。

  • ti_count (int | None) – 此任務由排程器擴展的任務實例總數,即範本上下文中的 expanded_ti_count

返回

要提取的特定映射索引或映射索引,或者如果我們想要「整個」傳回值(即不涉及映射任務群組),則為 None

返回類型

int | range | None

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]

簡化的任務實例。

用於透過佇列 (Queues) 在進程之間傳送資料。

__repr__()[source]

返回 repr(self)。

__eq__(other)[source]

傳回 self==value。

as_dict()[source]
classmethod from_ti(ti)[source]
classmethod from_dict(obj_dict)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]

繼承自: airflow.models.base.TaskInstanceDependencies

用於儲存關於任務實例的任意註解。

__tablename__ = 'task_instance_note'[source]
user_id[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
content[source]
created_at[source]
updated_at[原始碼]
task_instance[原始碼]
__table_args__ = ()[原始碼]
__repr__()[原始碼]

返回 repr(self)。

這個條目對您有幫助嗎?