airflow.models.taskinstance
¶
模組內容¶
類別¶
任務實例儲存任務實例的狀態。 |
|
簡化的任務實例。 |
|
用於儲存關於任務實例的任意註解。 |
函式¶
|
將目前的執行環境設定為提供的環境物件。 |
|
清除一組任務實例,但確保正在執行的實例被終止。 |
屬性¶
- airflow.models.taskinstance.set_current_context(context)[原始碼]¶
將目前的執行環境設定為提供的環境物件。
此方法應在每次任務執行時呼叫一次,在呼叫 operator.execute 之前。
- airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[原始碼]¶
清除一組任務實例,但確保正在執行的實例被終止。
同時將 Dagrun 的 state 設定為 QUEUED,並將 start_date 設定為執行時間。但僅適用於已完成的 DR(SUCCESS 和 FAILED)。不會清除正在執行的 DR(QUEUED 和 RUNNING)的 state 和 start_date,因為清除已在執行的 DR 的狀態是多餘的,而清除 `start_date` 會影響 DR 的持續時間。
- 參數
tis (任務實例的列表) – 任務實例的列表
session (sqlalchemy.orm.session.Session) – 目前的 session
dag_run_state (airflow.utils.state.DagRunState | airflow.typing_compat.Literal[False]) – 設定已完成 DagRun 的狀態。若設定為 False,則 DagRun 狀態不會變更。
dag (airflow.models.dag.DAG | None) – DAG 物件
activate_dag_runs (None) – 已棄用的參數,請勿傳遞
- class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[原始碼]¶
繼承自:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
任務實例儲存任務實例的狀態。
此表為任務已執行及其所處狀態的權威和單一事實來源。
SqlAlchemy 模型刻意不使用 SqlAlchemy 外鍵連接到任務或 dag 模型,以便更精確地控制交易。
在此表上的資料庫交易應確保雙重觸發,並消除關於哪些任務實例準備好執行或未準備好執行的任何混淆,即使在多個排程器可能觸發任務實例的情況下。
map_index 中值 -1 代表以下任一情況:沒有對應任務的 TI;具有對應任務但尚未展開的 TI(state=pending);具有對應任務但展開為空列表的 TI(state=skipped)。
- property key: airflow.models.taskinstancekey.TaskInstanceKey[原始碼]¶
傳回唯一識別任務實例的元組。
- property previous_ti: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[原始碼]¶
此屬性已棄用。
請使用
airflow.models.taskinstance.TaskInstance.get_previous_ti
。
- property previous_ti_success: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[原始碼]¶
此屬性已棄用。
請使用
airflow.models.taskinstance.TaskInstance.get_previous_ti
。
- property previous_start_date_success: pendulum.DateTime | None[原始碼]¶
此屬性已棄用。
請使用
airflow.models.taskinstance.TaskInstance.get_previous_start_date
。
- dag_model: airflow.models.dag.DagModel[source]¶
- command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]¶
返回一個可以在任何安裝了 Airflow 的地方執行的命令。
此命令是 Orchestrator 發送給 Executor 的訊息的一部分。
- static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]¶
產生執行此任務實例所需的 shell 命令。
- 參數
dag_id (str) – DAG ID
task_id (str) – 任務 ID
run_id (str) – 此任務的 DagRun 的執行 ID
mark_success (bool) – 是否將任務標記為成功
ignore_all_deps (bool) – 忽略所有可忽略的依賴項。 覆蓋其他 ignore_* 參數。
ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 參數 (例如,對於回填)
wait_for_past_depends_before_skipping (bool) – 在將 ti 標記為跳過之前,等待過去的依賴項
ignore_task_deps (bool) – 忽略特定於任務的依賴項,例如 depends_on_past 和觸發規則
ignore_ti_state (bool) – 忽略任務實例先前的失敗/成功
local (bool) – 是否在本機執行任務
pickle_id (int | None) – 如果 DAG 已序列化到資料庫,則為與 pickled DAG 關聯的 ID
file_path (pathlib.PurePath | str | None) – 包含 DAG 定義的檔案路徑
raw (bool) – 原始模式 (需要更多詳細資訊)
job_id (str | None) – 任務 ID (需要更多詳細資訊)
pool (str | None) – 任務應在其中執行的 Airflow pool
cfg_path (str | None) – 組態檔案的路徑
- 返回
可用於執行任務實例的 shell 命令
- 返回類型
- current_state(session=NEW_SESSION)[source]¶
從資料庫取得最新的狀態。
如果傳遞了 Session,我們會使用它,並且查找狀態將成為 Session 的一部分,否則將使用新的 Session。
這裡使用 sqlalchemy.inspect 來取得主鍵,以確保如果它們更改,它也不會退化
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- error(session=NEW_SESSION)[source]¶
強制將任務實例的狀態設定為資料庫中的 FAILED。
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]¶
- refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]¶
根據主鍵從資料庫重新整理任務實例。
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
lock_for_update (bool) – 如果為 True,表示資料庫應鎖定 TaskInstance (發出 FOR UPDATE 子句),直到 Session 提交。
- refresh_from_task(task, pool_override=None)[source]¶
從給定的任務複製通用屬性。
- 參數
task (airflow.models.operator.Operator) – 要複製的任務物件
pool_override (str | None) – 使用 pool_override 而不是任務的 pool
- set_state(state, session=NEW_SESSION)[source]¶
設定 TaskInstance 狀態。
- 參數
state (str | None) – 要為 TI 設定的狀態
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回
狀態是否已變更
- 返回類型
- are_dependents_done(session=NEW_SESSION)[source]¶
檢查此任務實例的直接依賴項是否已成功或已跳過。
這旨在供 wait_for_downstream 使用。
當您不希望在依賴項完成之前開始處理任務的下一個排程時,這非常有用。 例如,如果任務 DROP 並重新建立表格。
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- get_previous_dagrun(state=None, session=None)[source]¶
返回在此任務實例的 DagRun 之前執行的 DagRun。
- 參數
state (airflow.utils.state.DagRunState | None) – 如果傳遞,則僅考慮特定狀態的實例。
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM Session。
- get_previous_ti(state=None, session=NEW_SESSION)[source]¶
返回在此任務實例之前執行的任務的任務實例。
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – 如果傳遞,則僅考慮特定狀態的實例。
- get_previous_execution_date(state=None, session=NEW_SESSION)[source]¶
從屬性 previous_ti_success 返回執行日期。
- 參數
state (airflow.utils.state.DagRunState | None) – 如果傳遞,則僅考慮特定狀態的實例。
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- get_previous_start_date(state=None, session=NEW_SESSION)[source]¶
從屬性 previous_ti_success 返回開始日期。
- 參數
state (airflow.utils.state.DagRunState | None) – 如果傳遞,則僅考慮特定狀態的實例。
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]¶
給定依賴項的上下文,是否滿足執行此任務實例的所有條件。
(例如,從 UI 強制執行的任務實例將忽略某些依賴項)。
- 參數
dep_context (airflow.ti_deps.dep_context.DepContext | None) – 決定應評估的依賴項的執行上下文。
session (sqlalchemy.orm.session.Session) – 資料庫 Session
verbose (bool) – 是否在 info 或 debug 日誌級別記錄有關失敗依賴項的詳細資訊
- get_dagrun(session=NEW_SESSION)[source]¶
返回此 TaskInstance 的 DagRun。
- 參數
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回
DagRun
- 返回類型
- check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]¶
- emit_state_change_metric(new_state)[source]¶
傳送時間指標 (metric),表示給定狀態轉換所花費的時間長度。
先前的狀態和指標名稱會從任務所處的狀態推導而來。
- 參數
new_state (airflow.utils.state.TaskInstanceState) – 此任務剛剛設定的狀態。我們不使用 self.state,因為有時狀態會直接在資料庫中更新,而不是在本地 TaskInstance 物件中更新。支援的狀態:QUEUED 和 RUNNING
- defer_task(exception, session=NEW_SESSION)[source]¶
將任務標記為延遲 (deferred),並設定在 TaskDeferred 引發時恢復任務所需的觸發器 (trigger)。
- run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]¶
執行 TaskInstance。
- classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_stop=False)[source]¶
處理 TaskInstance 的失敗。
- 參數
fail_stop (bool) – 若為 true,則停止 DAG 中剩餘的任務
- handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]¶
處理任務實例 (task instance) 的失敗。
- 參數
error (None | str | BaseException) – 如果指定,則記錄拋出的特定異常
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
test_mode (bool | None) – 若為 True,則不在資料庫中記錄成功或失敗
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 若為 True,則任務不會重試
- get_template_context(session=None, ignore_param_exceptions=True)[source]¶
傳回 TI 上下文。
- 參數
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM Session
ignore_param_exceptions (bool) – 標記,用於抑制初始化 ParamsDict 時的值異常
- get_rendered_template_fields(session=NEW_SESSION)[source]¶
更新任務,使其包含呈現後的範本欄位,以便在 UI 中呈現。
如果任務已執行,將從資料庫中提取;否則將進行呈現。
- render_templates(context=None, jinja_env=None)[source]¶
呈現運算子 (operator) 欄位中的範本。
如果任務最初是映射 (mapped) 的,這可能會將
self.task
替換為未映射、完全呈現的 BaseOperator。原始的self.task
在替換之前會被傳回。
- get_email_subject_content(exception, task=None)[source]¶
取得異常的電子郵件主旨內容。
- 參數
exception (BaseException) – 電子郵件中傳送的異常
task (airflow.models.baseoperator.BaseOperator | None) –
- email_alert(exception, task)[source]¶
傳送包含異常資訊的警報電子郵件。
- 參數
exception – 異常
task (airflow.models.baseoperator.BaseOperator) – 與異常相關的任務
- xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]¶
建立一個可供任務提取 (pull) 的 XCom。
- 參數
key (str) – 儲存值的鍵。
value (Any) – 要儲存的值。可能的類型取決於
enable_xcom_pickling
是否為 true。如果是,則可以是任何可 pickle 的物件;否則只能使用可 JSON 序列化的物件。execution_date (datetime.datetime | None) – 已棄用的參數,不起作用。
- xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]¶
提取 (Pull) 符合特定條件的 XCom(可選)。
- 參數
key (str) – XCom 的鍵。如果提供,則僅傳回具有相符鍵的 XCom。預設鍵為
'return_value'
,也可作為常數XCOM_RETURN_KEY
使用。此鍵會自動賦予由任務傳回的 XCom(相對於手動推送 (push) 的 XCom)。若要移除篩選器,請傳遞 None。task_ids (str | Iterable[str] | None) – 僅提取來自具有相符 ID 的任務的 XCom。傳遞 None 以移除篩選器。
dag_id (str | None) – 如果提供,則僅提取來自此 DAG 的 XCom。如果為 None (預設值),則使用呼叫任務的 DAG。
map_indexes (int | Iterable[int] | None) – 如果提供,則僅提取具有相符索引的 XCom。如果為 None (預設值),則會從正在提取的任務推斷 (詳情請參閱下文)。
include_prior_dates (bool) – 如果為 False,則僅傳回來自當前 execution_date 的 XCom。如果為 True,則也會傳回來自先前日期的 XCom。
當提取單個任務(
task_id
為 None 或字串)且未指定map_indexes
時,傳回值會從指定的任務是否已映射來推斷。如果未映射,則傳回來自單個任務實例的值。如果要提取的任務已映射,則傳回一個迭代器 (iterator)(而非列表),該迭代器產生來自映射任務實例的 XCom。在任何一種情況下,如果找不到相符的 XCom,則傳回default
(如果未指定,則為 None)。當提取多個任務時(即
task_id
或map_index
為非字串的可迭代物件),會傳回相符 XCom 的列表。列表中的元素會依task_id
和map_index
中的項目順序排序。
- get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]¶
推斷與此 TI「相關」的上游 (upstream) 的映射索引 (map index)。
邏輯的主要部分存在是為了解決以下範例描述的問題,其中 ‘val’ 必須根據參考的使用位置解析為不同的值
@task def this_task(v): # This is self.task. return v * 2 @task_group def tg1(inp): val = upstream(inp) # This is the upstream task. this_task(val) # When inp is 1, val here should resolve to 2. return val # This val is the same object returned by tg1. val = tg1.expand(inp=[1, 2, 3]) @task_group def tg2(inp): another_task(inp, val) # val here should resolve to [2, 4, 6]. tg2.expand(inp=["a", "b"])
檢查
upstream
和self.task
周圍的映射任務群組,以尋找共同的「祖先」。如果找到這樣的祖先,我們需要傳回特定的映射索引,以從上游 XCom 提取部分值。
- class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]¶
簡化的任務實例。
用於透過佇列 (Queues) 在進程之間傳送資料。