airflow.models.dag

模組內容

類別

DAG

DAG(有向無環圖)是由具有方向性依賴關係的任務集合組成。

DagTag

每個 DAG 的標籤名稱,允許在 DAG 視圖中快速篩選。

DagOwnerAttributes

定義不同擁有者屬性的表格。

DagModel

包含 DAG 屬性的表格。

DagContext

DAG 上下文用於在使用 DAG 作為 ContextManager 時保持目前的 DAG。

函數

create_timetable(interval, timezone)

schedule_interval 參數建立 Timetable 實例。

get_last_dagrun(dag_id, session[, ...])

傳回 DAG 的最後一次 DAG 執行,如果沒有則傳回 None。

get_dataset_triggered_next_run_info(dag_ids, *, session)

取得 dag_ids 清單的下一次執行資訊。

dag([dag_id, description, schedule, ...])

Python DAG 裝飾器,將函數包裝成 Airflow DAG。

屬性

log

DEFAULT_VIEW_PRESETS

ORIENTATION_PRESETS

TAG_MAX_LEN

DagStateChangeCallback

ScheduleInterval

ScheduleIntervalArg

ScheduleArg

SLAMissCallback

DEFAULT_SCHEDULE_INTERVAL

DAG_ARGS_EXPECTED_TYPES

airflow.models.dag.log[來源]
airflow.models.dag.DEFAULT_VIEW_PRESETS = ['grid', 'graph', 'duration', 'gantt', 'landing_times'][來源]
airflow.models.dag.ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT'][來源]
airflow.models.dag.TAG_MAX_LEN = 100[來源]
airflow.models.dag.DagStateChangeCallback[來源]
airflow.models.dag.ScheduleInterval[來源]
airflow.models.dag.ScheduleIntervalArg[來源]
airflow.models.dag.ScheduleArg[來源]
airflow.models.dag.SLAMissCallback[來源]
airflow.models.dag.DEFAULT_SCHEDULE_INTERVAL[來源]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[來源]

基底類別:airflow.exceptions.AirflowException

當模型不正確地填入資料間隔欄位時引發的例外。

資料間隔欄位應全部為 None(對於在 AIP-39 之前排程的執行),或全部為 datetime(對於在實作 AIP-39 之後排程的執行)。如果正好有一個欄位為 None,則會引發此例外。

__str__()[來源]

傳回 str(self)。

airflow.models.dag.create_timetable(interval, timezone)[來源]

schedule_interval 參數建立 Timetable 實例。

airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[來源]

傳回 DAG 的最後一次 DAG 執行,如果沒有則傳回 None。

最後一次 DAG 執行可以是任何類型的執行,例如排程或回填。覆寫的 DagRun 會被忽略。

airflow.models.dag.get_dataset_triggered_next_run_info(dag_ids, *, session)[來源]

取得 dag_ids 清單的下一次執行資訊。

給定 dag_ids 清單,取得字串,表示任何由資料集觸發的 DAG 距離下一次執行的接近程度,例如「已更新 2 個資料集中的 1 個」。

airflow.models.dag.DAG_ARGS_EXPECTED_TYPES[來源]
class airflow.models.dag.DAG(dag_id, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[來源]

基底類別:airflow.utils.log.logging_mixin.LoggingMixin

DAG(有向無環圖)是由具有方向性依賴關係的任務集合組成。

DAG 也具有排程、開始日期和結束日期(選用)。對於每個排程(例如每日或每小時),DAG 需要在滿足其依賴關係時執行每個個別任務。某些任務具有依賴於自身過去的屬性,這表示它們在先前的排程(和上游任務)完成之前無法執行。

DAG 本質上充當任務的命名空間。一個 task_id 只能新增到一個 DAG 一次。

請注意,如果您計劃使用時區,則提供的所有日期都應為 pendulum 日期。請參閱時區感知 DAG

2.4 版本新增: schedule 參數用於指定基於時間的排程邏輯 (timetable),或資料集驅動的觸發器。

自 2.4 版本起已棄用: 參數 schedule_intervaltimetable。它們的功能已合併到新的 schedule 參數中。

參數
  • dag_id (str) – DAG 的 ID;必須完全由字母數字字元、破折號、點和底線(所有 ASCII)組成

  • description (str | None) – DAG 的描述,例如顯示在網頁伺服器上

  • schedule (ScheduleArg) – 定義 DAG 執行排程的規則。可以接受 cron 字串、timedelta 物件、Timetable 或 Dataset 物件清單。如果未提供此參數,則 DAG 將設定為預設排程 timedelta(days=1)。另請參閱 使用 Timetable 自訂 DAG 排程

  • start_date (datetime.datetime | None) – 排程器將嘗試回填的時間戳記

  • end_date (datetime.datetime | None) – 您的 DAG 不會在其之後運行的日期,留為 None 表示無限期排程

  • template_searchpath (str | Iterable[str] | None) – 此資料夾清單(非相對路徑)定義 jinja 將在其中尋找範本的位置。順序很重要。請注意,jinja/airflow 預設包含您的 DAG 檔案的路徑

  • template_undefined (type[jinja2.StrictUndefined]) – 範本未定義類型。

  • user_defined_macros (dict | None) – 將在您的 jinja 範本中公開的巨集字典。例如,將 dict(foo='bar') 傳遞給此參數可讓您在與此 DAG 相關的所有 jinja 範本中使用 {{ foo }}。請注意,您可以在此處傳遞任何類型的物件。

  • user_defined_filters (dict | None) – 將在您的 jinja 範本中公開的篩選器字典。例如,將 dict(hello=lambda name: 'Hello %s' % name) 傳遞給此參數可讓您在與此 DAG 相關的所有 jinja 範本中使用 {{ 'world' | hello }}

  • default_args (dict | None) – 在初始化運算子時用作建構子關鍵字參數的預設參數字典。請注意,運算子具有相同的 hook,並且優先於此處定義的 hook,這表示如果您的字典在此處包含 ‘depends_on_past’: True,並且在運算子的呼叫 default_args 中包含 ‘depends_on_past’: False,則實際值將為 False

  • params (collections.abc.MutableMapping | None) – DAG 層級參數的字典,可在範本中存取,命名空間位於 params 下。這些參數可以在任務層級覆寫。

  • max_active_tasks (int) – 允許同時執行的任務實例數

  • max_active_runs (int) – 最大活動 DAG 執行次數,超出此數量的 DAG 執行處於執行狀態,排程器將不會建立新的活動 DAG 執行

  • max_consecutive_failed_dag_runs (int) – (實驗性)連續失敗 DAG 執行的最大次數,超出此次數排程器將停用 DAG

  • dagrun_timeout (datetime.timedelta | None) – 指定 DagRun 應在逾時/失敗之前運行的時間長度,以便可以建立新的 DagRun。

  • sla_miss_callback (None | SLAMissCallback | list[SLAMissCallback]) – 指定在報告 SLA 逾時時要呼叫的函數或函數清單。請參閱 sla_miss_callback,以取得有關函數簽名和傳遞給回呼的參數的更多資訊。

  • default_view (str) – 指定 DAG 預設視圖(grid、graph、duration、gantt、landing_times),預設為 grid

  • orientation (str) – 在圖形視圖中指定 DAG 方向(LR、TB、RL、BT),預設為 LR

  • catchup (bool) – 執行排程器補追(還是僅執行最新版本)?預設為 True

  • on_failure_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 當此 DAG 的 DagRun 失敗時要呼叫的函數或函數清單。上下文字典作為單一參數傳遞給此函數。

  • on_success_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 與 on_failure_callback 非常相似,不同之處在於它在 DAG 成功時執行。

  • access_control (dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None) – 指定選用的 DAG 層級動作,例如「{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}」,或者如果存在 DAG 執行資源,則可以指定資源名稱,例如「{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}}」

  • is_paused_upon_creation (bool | None) – 指定首次建立 DAG 時是否暫停。如果 DAG 已存在,則會忽略此旗標。如果未指定此選用參數,則將使用全域配置設定。

  • jinja_environment_kwargs (dict | None) –

    要傳遞到 Jinja Environment 以進行範本呈現的其他配置選項

    範例:避免 Jinja 從範本字串中移除尾隨換行符

    DAG(
        dag_id="my-dag",
        jinja_environment_kwargs={
            "keep_trailing_newline": True,
            # some other jinja2 Environment options here
        },
    )
    

    請參閱Jinja Environment 文件

  • render_template_as_native_obj (bool) – 如果為 True,則使用 Jinja NativeEnvironment 將範本呈現為原生 Python 類型。如果為 False,則使用 Jinja Environment 將範本呈現為字串值。

  • tags (list[str] | None) – 標籤清單,有助於在 UI 中篩選 DAG。

  • owner_links (dict[str, str] | None) – 擁有者及其連結的字典,這些連結將在 DAG 視圖 UI 上可點擊。可以用作 HTTP 連結(例如指向您的 Slack 頻道的連結),或 mailto 連結。例如:{“dag_owner”: “https://airflow.dev.org.tw/”}

  • auto_register (bool) – 在 with 區塊中使用此 DAG 時自動註冊

  • fail_stop (bool) – 當 DAG 中的任務失敗時,使目前正在執行的任務失敗。警告:fail stop DAG 只能包含具有預設觸發規則(「all_success」)的任務。如果 fail stop DAG 中的任何任務具有非預設觸發規則,則會擲回例外。

  • dag_display_name (str | None) – DAG 的顯示名稱,會出現在 UI 上。

property dag_id: str[來源]
property is_subdag: bool[來源]
property concurrency: int[來源]
property max_active_tasks: int[source]
property access_control[source]
property dag_display_name: str[source]
property description: str | None[source]
property default_view: str[source]
property pickle_id: int | None[source]
property tasks: list[airflow.models.operator.Operator][source]
property task_ids: list[str][source]
property teardowns: list[airflow.models.operator.Operator][source]
property tasks_upstream_of_teardowns: list[airflow.models.operator.Operator][source]
property task_group: airflow.utils.task_group.TaskGroup[source]
property relative_fileloc: pathlib.Path[source]

可匯入的 dag ‘file’ 檔案位置,相對於已設定的 DAGs 資料夾。

property folder: str[source]

DAG 物件被實例化的資料夾位置。

property owner: str[source]

傳回在 DAG 任務中找到的所有擁有者列表。

傳回值

DAG 任務中以逗號分隔的擁有者列表

傳回類型

str

property allow_future_exec_dates: bool[source]
property concurrency_reached[source]

請使用 airflow.models.DAG.get_concurrency_reached,此屬性已過時。

property is_paused[source]

請使用 airflow.models.DAG.get_is_paused,此屬性已過時。

property normalized_schedule_interval: ScheduleInterval[source]
property latest_execution_date[source]

請使用 airflow.models.DAG.get_latest_execution_date,此屬性已過時。

property subdags[source]

傳回與此 DAG 關聯的子 DAG 物件列表。

property roots: list[airflow.models.operator.Operator][source]

傳回沒有父節點的節點。這些節點最先執行,稱為根節點。

property leaves: list[airflow.models.operator.Operator][source]

傳回沒有子節點的節點。這些節點最後執行,稱為葉節點。

property task: airflow.decorators.TaskDecoratorCollection[source]
fileloc: str[source]

載入此 DAG 或子 DAG 需要匯入的檔案路徑。

當 DAG 從 ZIP 檔案或其他 DAG 發佈格式載入時,這可能不是磁碟上的實際檔案。

parent_dag: DAG | None[source]
get_doc_md(doc_md)[source]
validate()[source]

驗證 DAG 是否具有一致的設定。

這會在 DAG bag 在打包 DAG 之前被呼叫。

validate_executor_field()[source]
__repr__()[source]

傳回 repr(self)。

__eq__(other)[source]

傳回 self==value。

__ne__(other)[source]

傳回 self!=value。

__lt__(other)[source]

傳回 self<value。

__hash__()[source]

傳回 hash(self)。

__enter__()[source]
__exit__(_type, _value, _tb)[source]
date_range(start_date, num=None, end_date=None)[source]
is_fixed_time_schedule()[source]

判斷排程是否具有固定時間(例如每天凌晨 3 點)。

透過「查看」接下來兩個 cron 觸發時間來進行偵測;如果這兩個時間具有相同的分鐘和小時值,則排程是固定的,我們*不需要*執行 DST 修正。

這假設 DST 發生在整分鐘變更時(例如 12:59 -> 12:00)。

不要試圖理解這實際上意味著什麼。這是舊邏輯,不應在任何地方使用。

following_schedule(dttm)[source]

計算此 DAG 在 UTC 中的後續排程。

參數

dttm – utc datetime

傳回值

utc datetime

previous_schedule(dttm)[source]
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]

取得此 DAG 在 date_last_automated_dagrun 之後的下一個 DagRun 的相關資訊。

這會根據 DAG 的時間表、start_date、end_date 等,計算下一個 DagRun 應運作的時間間隔(其執行日期)以及何時可以排程。這不會檢查最大活動執行次數或任何其他「max_active_tasks」類型限制,而僅根據此 DAG 及其任務的各種日期和間隔欄位執行計算。

參數
  • last_automated_dagrun (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 現有「自動」DagRun(排程或回填,但非手動)的 max(execution_date)

  • restricted (bool) – 如果設定為 False(預設為 True),則忽略 DAG 或任務上指定的 start_dateend_datecatchup

傳回值

下一個 dagrun 的 DagRunInfo,如果 dagrun 不會被排程,則為 None。

傳回類型

airflow.timetables.base.DagRunInfo | None

next_dagrun_after_date(date_last_automated_dagrun)[source]
iter_dagrun_infos_between(earliest, latest, *, align=True)[source]

在使用此 DAG 的時間表在給定間隔之間產生 DagRunInfo。

如果 DagRunInfo 實例的 logical_date 不早於 earliest,也不晚於 latest,則會產生這些實例。這些實例會依其 logical_date 從最早到最晚排序。

如果 alignFalse,則第一次執行將立即在 earliest 上發生,即使它沒有落在邏輯時間表排程上。預設值為 True,但為了向後相容性,子 DAG 將忽略此值,並且始終表現得好像此值設定為 False

範例:DAG 排程為每天午夜執行 (0 0 * * *)。如果 earliest2021-06-03 23:00:00,則如果 align=False,第一個 DagRunInfo 將為 2021-06-03 23:00:00,如果 align=True,則為 2021-06-04 00:00:00

get_run_dates(start_date, end_date=None)[source]

使用此 DAG 的排程間隔,傳回在收到的參數間隔之間的日期列表。

傳回的日期可以用於執行日期。

參數
  • start_date – 間隔的開始日期。

  • end_date – 間隔的結束日期。預設為 timezone.utcnow()

傳回值

在間隔內遵循 DAG 排程的日期列表。

傳回類型

list

normalize_schedule(dttm)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_externally_triggered=True)[source]
param(name, default=NOTSET)[source]

傳回目前 DAG 的 DagParam 物件。

參數
  • name (str) – DAG 參數名稱。

  • default (Any) – DAG 參數的回退值。

傳回值

指定名稱和目前 DAG 的 DagParam 實例。

傳回類型

airflow.models.param.DagParam

get_concurrency_reached(session=NEW_SESSION)[source]

傳回布林值,指示是否已達到此 DAG 的 max_active_tasks 限制。

get_is_active(session=NEW_SESSION)[source]

傳回布林值,指示此 DAG 是否為活動狀態。

get_is_paused(session=NEW_SESSION)[source]

傳回布林值,指示此 DAG 是否已暫停。

static fetch_callback(dag, dag_run_id, success=True, reason=None, *, session=NEW_SESSION)[source]

根據 success 的值,提取適當的回調函數。

此方法獲取此 DagRun 中單個 TaskInstance 的上下文,並將其與回調函數列表一起返回。

參數
  • dag (DAG) – DAG 物件

  • dag_run_id (str) – DAG 執行 ID

  • success (bool) – 標記,用於指定是否應調用失敗或成功回調函數

  • reason (str | None) – 完成原因

  • session (sqlalchemy.orm.session.Session) – 資料庫會話

handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]

根據情況觸發 on_failure_callback 或 on_success_callback。

此方法獲取此 DagRun 中單個 TaskInstance 的上下文,並將其與 'reason' 一起傳遞給可調用物件,主要用於區分 DagRun 失敗。

參數
  • dagrun (airflow.models.dagrun.DagRun) – DagRun 物件

  • success – 標記,用於指定是否應調用失敗或成功回調函數

  • reason – 完成原因

  • session – 資料庫會話

classmethod execute_callback(callbacks, context, dag_id)[source]

使用給定的上下文觸發回調函數。

參數
  • callbacks (list[Callable] | None) – 要調用的回調函數列表

  • context (airflow.models.taskinstance.Context | None) – 要傳遞給所有回調函數的上下文

  • dag_id (str) – 要尋找的 DAG 的 dag_id。

get_active_runs()[source]

返回目前正在運行的 dag run 執行日期的列表。

傳回值

執行日期列表

get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)[source]

返回處於 “running” 狀態的活動 dag run 的數量。

參數
  • external_trigger – 若為外部觸發的活動 dag run,則為 True

  • session

傳回值

活動 dag run 的數量,大於 0

static fetch_dagrun(dag_id, execution_date=None, run_id=None, session=NEW_SESSION)[source]

如果存在,則返回給定執行日期或 run_id 的 dag run,否則返回 None。

參數
傳回值

如果找到 DagRun,則返回 DagRun,否則返回 None。

傳回類型

airflow.models.dagrun.DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic

get_dagrun(execution_date=None, run_id=None, session=NEW_SESSION)[source]
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]

返回 start_date(包含)和 end_date(包含)之間的 dag run 列表。

參數
  • start_date – 要尋找的 DagRun 的起始執行日期。

  • end_date – 要尋找的 DagRun 的結束執行日期。

  • session

傳回值

找到的 DagRun 列表。

get_latest_execution_date(session=NEW_SESSION)[source]

返回至少存在一個 dag run 的最新日期。

resolve_template_files()[source]
get_template_env(*, force_sandboxed=False)[source]

建立 Jinja2 環境。

set_dependency(upstream_task_id, downstream_task_id)[source]

在使用 add_task() 新增到 DAG 的兩個任務之間設定依賴關係。

get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]

取得 base_date 之前(包含 base_date)的 num 個任務實例。

傳回的列表可能包含對應於任何 DagRunType 的剛好 num 個任務實例。如果 base_date 之前排定的 DAG 執行次數少於 num 次,則列表中的任務實例可能會更少。

get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

設定 TaskInstance 的狀態,並清除處於 failed 或 upstream_failed 狀態的下游任務。

參數
  • task_id (str) – TaskInstance 的任務 ID

  • map_indexes (Collection[int] | None) – 僅在 TaskInstance 的 map_index 符合時才設定 TaskInstance。如果為 None(預設值),則會設定任務的所有映射 TaskInstance。

  • execution_date (datetime.datetime | None) – TaskInstance 的執行日期

  • run_id (str | None) – TaskInstance 的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要將 TaskInstance 設定為的狀態

  • upstream (bool) – 包含給定 task_id 的所有上游任務

  • downstream (bool) – 包含給定 task_id 的所有下游任務

  • future (bool) – 包含給定 task_id 的所有未來 TaskInstance

  • commit (bool) – 提交變更

  • past (bool) – 包含給定 task_id 的所有過去 TaskInstance

set_task_group_state(*, group_id, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

將 TaskGroup 設定為給定狀態,並清除處於 failed 或 upstream_failed 狀態的下游任務。

參數
  • group_id (str) – TaskGroup 的 group_id

  • execution_date (datetime.datetime | None) – TaskInstance 的執行日期

  • run_id (str | None) – TaskInstance 的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要將 TaskInstance 設定為的狀態

  • upstream (bool) – 包含給定 task_id 的所有上游任務

  • downstream (bool) – 包含給定 task_id 的所有下游任務

  • future (bool) – 包含給定 task_id 的所有未來 TaskInstance

  • commit (bool) – 提交變更

  • past (bool) – 包含給定 task_id 的所有過去 TaskInstance

  • session (sqlalchemy.orm.session.Session) – 新會話

topological_sort(include_subdag_tasks=False)[source]

以拓撲順序排序任務,使任務在任何上游依賴項之後出現。

已棄用,改用 task_group.topological_sort

set_dag_runs_state(state=DagRunState.RUNNING, session=NEW_SESSION, start_date=None, end_date=None, dag_ids=None)[source]
clear(task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state=DagRunState.QUEUED, dry_run=False, session=NEW_SESSION, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids=frozenset())[source]

清除與目前 dag 相關聯的一組任務實例,針對指定的日期範圍。

參數
  • task_ids (Collection[str | tuple[str, int]] | None) – 要清除的任務 ID 或 (task_id, map_index) 元組的列表

  • start_date (datetime.datetime | None) – 要清除的最小 execution_date

  • end_date (datetime.datetime | None) – 要清除的最大 execution_date

  • only_failed (bool) – 僅清除失敗的任務

  • only_running (bool) – 僅清除正在運行的任務。

  • confirm_prompt (bool) – 詢問確認

  • include_subdags (bool) – 清除子 DAG 中的任務,並清除 ExternalTaskMarker 指示的外部任務

  • include_parentdag (bool) – 清除子 DAG 的父 DAG 中的任務。

  • dag_run_state (airflow.utils.state.DagRunState) – 要將 DagRun 設定為的狀態。如果設定為 False,則不會變更 dagrun 狀態。

  • dry_run (bool) – 尋找要清除的任務,但不清除它們。

  • session (sqlalchemy.orm.session.Session) – 要使用的 sqlalchemy 會話

  • dag_bag (airflow.models.dagbag.DagBag | None) – 用於尋找 DAG 子 DAG 的 DagBag(選用)

  • exclude_task_ids (frozenset[str] | frozenset[tuple[str, int]] | None) – 不應清除的 task_id 或 (task_id, map_index) 元組的集合

classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
__deepcopy__(memo)[source]
sub_dag(*args, **kwargs)[source]

使用 airflow.models.DAG.partial_subset,此方法已棄用。

partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]

根據與一個或多個任務匹配的 regex,傳回目前 dag 的子集。

根據應與一個或多個任務匹配的 regex,傳回目前 dag 的子集,作為目前 dag 的深層副本,並根據傳遞的標記包含上游和下游鄰居。

參數
  • task_ids_or_regex (str | Pattern | Iterable[str]) – 任務 ID 列表,或用於比對任務 ID 的 regex(作為字串或編譯的 regex 模式)。

  • include_downstream – 除了匹配的任務外,還包含所有匹配任務的下游任務。

  • include_upstream – 除了匹配的任務外,還包含所有匹配任務的上游任務。

  • include_direct_upstream – 包含匹配任務和下游任務(如果 include_downstream = True)的所有直接上游任務

has_task(task_id)[source]
has_task_group(task_group_id)[source]
task_group_dict()[source]
get_task(task_id, include_subdags=False)[source]
pickle_info()[source]
pickle(session=NEW_SESSION)[source]
tree_view()[source]

列印 DAG 的 ASCII 樹狀結構表示。

get_tree_view()[source]

返回 DAG 的 ASCII 樹狀結構表示。

add_task(task)[source]

將任務新增至 DAG。

參數

task (airflow.models.operator.Operator) – 您想要新增的任務

add_tasks(tasks)[source]

將任務列表新增至 DAG。

參數

tasks (Iterable[airflow.models.operator.Operator]) – 您想要新增的任務列表

run(start_date=None, end_date=None, mark_success=False, local=False, donot_pickle=airflow_conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False, disable_retry=False)[source]

執行 DAG。

參數
  • start_date – 要執行的範圍的開始日期

  • end_date – 要執行的範圍的結束日期

  • mark_success – 若為 True,則將作業標記為成功,而無需實際執行

  • local – 若為 True,則使用 LocalExecutor 執行任務

  • executor – 用於執行任務的 Executor 實例

  • donot_pickle – 若為 True,則避免序列化 DAG 物件並傳送至 worker

  • ignore_task_deps – 若為 True,則跳過上游任務

  • ignore_first_depends_on_past – 若為 True,則僅針對第一組任務忽略 depends_on_past 依賴項

  • pool – 要使用的資源池

  • delay_on_limit_secs – 當達到 max_active_runs 限制時,在下次嘗試執行 dag run 之前等待的秒數

  • verbose – 使日誌輸出更詳細

  • conf – 從 CLI 傳遞的使用者定義字典

  • rerun_failed_tasks

  • run_backwards

  • run_at_least_once – 若為 true,即使在時間範圍內沒有邏輯執行,也始終至少執行 DAG 一次。

cli()[source]

公開此 DAG 特有的 CLI。

test(execution_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]

針對給定的 DAG 和執行日期,執行單個 DagRun。

參數
  • execution_date (datetime.datetime | None) – DAG 執行的執行日期

  • run_conf (dict[str, Any] | None) – 要傳遞給新建立的 dagrun 的組態

  • conn_file_path (str | None) – yaml 或 json 格式的連線檔案路徑

  • variable_file_path (str | None) – yaml 或 json 格式的變數檔案路徑

  • use_executor (bool) – 若設定,則使用 executor 測試 DAG

  • mark_success_pattern (Pattern | str | None) – 要標記為成功而非執行的 task_ids 的正則表達式

  • session (sqlalchemy.orm.session.Session) – 資料庫連線 (選用)

create_dagrun(state, execution_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_hash=None, creating_job_id=None, data_interval=None)[source]

從此 dag 建立 dag run,包含與此 dag 關聯的任務。

返回 dag run。

參數
  • run_id (str | None) – 定義此 dag run 的 run id

  • run_type (airflow.utils.types.DagRunType | None) – DagRun 的類型

  • execution_date (datetime.datetime | None) – 此 dag run 的執行日期

  • state (airflow.utils.state.DagRunState) – dag run 的狀態

  • start_date (datetime.datetime | None) – 應評估此 dag run 的日期

  • external_trigger (bool | None) – 此 dag run 是否為外部觸發

  • conf (dict | None) – 包含要傳遞給 DAG 的組態/參數的字典

  • creating_job_id (int | None) – 建立此 DagRun 的作業 ID

  • session (sqlalchemy.orm.session.Session) – 資料庫 session

  • dag_hash (str | None) – 序列化 DAG 的雜湊值

  • data_interval (tuple[datetime.datetime, datetime.datetime] | None) – DagRun 的資料間隔

classmethod bulk_sync_to_db(dags, session=NEW_SESSION)[source]

使用 airflow.models.DAG.bulk_write_to_db,此方法已棄用。

classmethod bulk_write_to_db(dags, processor_subdir=None, session=NEW_SESSION)[source]

確保給定 dags 的 DagModel 列在 DB 的 dag 表格中是最新的。

請注意,此方法可以針對 DAG 和 SubDAG 呼叫。SubDag 實際上是 SubDagOperator。

參數

dags (Collection[DAG]) – 要儲存到 DB 的 DAG 物件

傳回值

sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]

將關於此 DAG 的屬性儲存到 DB。

請注意,此方法可以針對 DAG 和 SubDAG 呼叫。SubDag 實際上是 SubDagOperator。

傳回值

get_default_view()[source]

允許向後相容的 jinja2 模板。

static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]

給定已知 DAG 的列表,停用在 ORM 中標記為活動的任何其他 DAG。

參數

active_dag_ids – 活動的 DAG ID 列表

傳回值

static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]

停用排程器在過期日期之前最後一次觸及的任何 DAG。

這些 DAG 很可能已被刪除。

參數

expiration_date – 設定在此時間之前觸及的非活動 DAG

傳回值

static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]

返回給定 DAG 中任務實例的數量。

參數
  • session – ORM session

  • dag_id – 要取得任務並行性的 DAG 的 ID

  • run_id – 要取得任務並行性的 DAG run 的 ID

  • task_ids – 給定 DAG 的有效任務 ID 列表

  • states – 如果提供,則要篩選的狀態列表

傳回值

正在執行的任務數量

傳回類型

整數

classmethod get_serialized_fields()[source]

字串化的 DAG 和運算符號正好包含這些欄位。

get_edge_info(upstream_task_id, downstream_task_id)[source]

返回給定任務對的邊緣資訊,如果沒有資訊,則返回空邊緣。

set_edge_info(upstream_task_id, downstream_task_id, info)[source]

在 DAG 上設定給定的邊緣資訊。

請注意,這將覆寫,而不是與現有資訊合併。

validate_schedule_and_params()[source]

當 DAG 定義了排程時,驗證 Param 值。

如果存在任何無法透過其 schema 定義解析的 Param,則引發例外。

解析給定的連結,並驗證它是否為有效的 URL 或 'mailto' 連結。

返回無效的 (擁有者, 連結) 對的迭代器。

class airflow.models.dag.DagTag(name, doc)[source]

繼承自: airflow.models.base.Base

每個 DAG 的標籤名稱,允許在 DAG 視圖中快速篩選。

__tablename__ = 'dag_tag'[source]
name[source]
dag_id[source]
__table_args__ = ()[source]
__repr__()[source]

傳回 repr(self)。

class airflow.models.dag.DagOwnerAttributes(name, doc)[source]

繼承自: airflow.models.base.Base

定義不同擁有者屬性的表格。

例如,擁有者的連結將作為超連結傳遞到「DAGs」視圖。

__tablename__ = 'dag_owner_attributes'[source]
dag_id[source]
owner[source]
__repr__()[source]

傳回 repr(self)。

classmethod get_all(session)[source]
class airflow.models.dag.DagModel(concurrency=None, **kwargs)[source]

繼承自: airflow.models.base.Base

包含 DAG 屬性的表格。

property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]
property timezone[source]
property safe_dag_id[source]
property relative_fileloc: pathlib.Path | None[source]

可匯入的 dag ‘file’ 檔案位置,相對於已設定的 DAGs 資料夾。

__tablename__ = 'dag'[source]

這些項目儲存在資料庫中,用於與狀態相關的資訊

dag_id[source]
root_dag_id[source]
is_paused_at_creation[source]
is_paused[source]
is_subdag[source]
is_active[source]
last_parsed_time[source]
last_pickled[source]
last_expired[source]
scheduler_lock[source]
pickle_id[source]
fileloc[source]
processor_subdir[source]
owners[source]
description[source]
default_view[source]
schedule_interval[source]
timetable_description[source]
dataset_expression[source]
tags[source]
max_active_tasks[source]
max_active_runs[source]
max_consecutive_failed_dag_runs[source]
has_task_concurrency_limits[source]
has_import_errors[source]
next_dagrun[source]
next_dagrun_data_interval_start[source]
next_dagrun_data_interval_end[source]
next_dagrun_create_after[source]
__table_args__ = ()[source]
parent_dag[source]
schedule_dataset_references[source]
schedule_dataset_alias_references[source]
schedule_datasets[source]
task_outlet_dataset_references[source]
NUM_DAGS_PER_DAGRUN_QUERY[source]
__repr__()[source]

傳回 repr(self)。

static get_dagmodel(dag_id, session=NEW_SESSION)[source]
classmethod get_current(dag_id, session=NEW_SESSION)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
get_is_paused(*, session=None)[source]

提供與 ‘DAG’ 的介面相容性。

get_is_active(*, session=None)[source]

提供與 ‘DAG’ 的介面相容性。

static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]

給定 dag_ids 列表,取得一組已暫停的 DAG ID。

參數
  • dag_ids (list[str]) – DAG ID 列表

  • session (sqlalchemy.orm.session.Session) – ORM Session

傳回值

已暫停的 Dag_ids

傳回類型

set[str]

get_default_view()[source]

取得預設 DAG 視圖,如果 DagModel 沒有值,則回傳預設組態值。

set_is_paused(is_paused, including_subdags=True, session=NEW_SESSION)[source]

暫停/取消暫停 DAG。

參數
  • is_paused (bool) – DAG 是否已暫停

  • including_subdags (bool) – 是否包含 DAG 的子 DAG

  • session – session

dag_display_name()[source]
classmethod deactivate_deleted_dags(alive_dag_filelocs, processor_subdir, session=NEW_SESSION)[source]

針對 DAG 檔案已被移除的 DAG,設定 is_active=False

參數
  • alive_dag_filelocs (Container[str]) – 存活 DAG 的檔案路徑

  • processor_subdir (str) – DAG 處理器子目錄

  • session (sqlalchemy.orm.session.Session) – ORM Session

classmethod dags_needing_dagruns(session)[source]

回傳(並鎖定)一個 DAG 物件列表,這些物件即將建立新的 DagRun。

這將回傳一個結果集,其中包含使用 “SELECT … FOR UPDATE” 查詢進行列級鎖定的列,您應確保所有排程決策都在單一交易中完成 – 一旦交易提交,它將被解鎖。

calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]

計算 next_dagrunnext_dagrun_create_after`

參數
  • dag (DAG) – DAG 物件

  • last_automated_dag_run (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 最近一次執行的 DataInterval(或 datetime),如果尚未排程則為 None。

get_dataset_triggered_next_run_info(*, session=NEW_SESSION)[source]
airflow.models.dag.dag(dag_id='', description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]

Python DAG 裝飾器,將函數包裝成 Airflow DAG。

接受操作器 kwargs 的 kwargs。可用於參數化 DAG。

參數
  • dag_args – DAG 物件的引數

  • dag_kwargs – DAG 物件的 Kwargs。

class airflow.models.dag.DagContext[source]

DAG 上下文用於在使用 DAG 作為 ContextManager 時保持目前的 DAG。

您可以將 DAG 作為上下文使用

with DAG(
    dag_id="example_dag",
    default_args=default_args,
    schedule="0 0 * * *",
    dagrun_timeout=timedelta(minutes=60),
) as dag:
    ...

如果您這樣做,上下文將儲存 DAG,並且每當建立新任務時,它將使用此儲存的 DAG 作為父 DAG。

autoregistered_dags: set[tuple[DAG, types.ModuleType]][source]
current_autoregister_module_name: str | None[source]
classmethod push_context_managed_dag(dag)[source]
classmethod pop_context_managed_dag()[source]
classmethod get_current_dag()[source]

這個條目有幫助嗎?