airflow.models.dag
¶
模組內容¶
類別¶
DAG(有向無環圖)是由具有方向性依賴關係的任務集合組成。 |
|
每個 DAG 的標籤名稱,允許在 DAG 視圖中快速篩選。 |
|
定義不同擁有者屬性的表格。 |
|
包含 DAG 屬性的表格。 |
|
DAG 上下文用於在使用 DAG 作為 ContextManager 時保持目前的 DAG。 |
函數¶
|
從 |
|
傳回 DAG 的最後一次 DAG 執行,如果沒有則傳回 None。 |
|
取得 dag_ids 清單的下一次執行資訊。 |
|
Python DAG 裝飾器,將函數包裝成 Airflow DAG。 |
屬性¶
- airflow.models.dag.DEFAULT_VIEW_PRESETS = ['grid', 'graph', 'duration', 'gantt', 'landing_times'][來源]¶
- exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[來源]¶
基底類別:
airflow.exceptions.AirflowException
當模型不正確地填入資料間隔欄位時引發的例外。
資料間隔欄位應全部為 None(對於在 AIP-39 之前排程的執行),或全部為 datetime(對於在實作 AIP-39 之後排程的執行)。如果正好有一個欄位為 None,則會引發此例外。
- airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[來源]¶
傳回 DAG 的最後一次 DAG 執行,如果沒有則傳回 None。
最後一次 DAG 執行可以是任何類型的執行,例如排程或回填。覆寫的 DagRun 會被忽略。
- airflow.models.dag.get_dataset_triggered_next_run_info(dag_ids, *, session)[來源]¶
取得 dag_ids 清單的下一次執行資訊。
給定 dag_ids 清單,取得字串,表示任何由資料集觸發的 DAG 距離下一次執行的接近程度,例如「已更新 2 個資料集中的 1 個」。
- class airflow.models.dag.DAG(dag_id, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[來源]¶
基底類別:
airflow.utils.log.logging_mixin.LoggingMixin
DAG(有向無環圖)是由具有方向性依賴關係的任務集合組成。
DAG 也具有排程、開始日期和結束日期(選用)。對於每個排程(例如每日或每小時),DAG 需要在滿足其依賴關係時執行每個個別任務。某些任務具有依賴於自身過去的屬性,這表示它們在先前的排程(和上游任務)完成之前無法執行。
DAG 本質上充當任務的命名空間。一個 task_id 只能新增到一個 DAG 一次。
請注意,如果您計劃使用時區,則提供的所有日期都應為 pendulum 日期。請參閱時區感知 DAG。
2.4 版本新增: schedule 參數用於指定基於時間的排程邏輯 (timetable),或資料集驅動的觸發器。
自 2.4 版本起已棄用: 參數 schedule_interval 和 timetable。它們的功能已合併到新的 schedule 參數中。
- 參數
dag_id (str) – DAG 的 ID;必須完全由字母數字字元、破折號、點和底線(所有 ASCII)組成
description (str | None) – DAG 的描述,例如顯示在網頁伺服器上
schedule (ScheduleArg) – 定義 DAG 執行排程的規則。可以接受 cron 字串、timedelta 物件、Timetable 或 Dataset 物件清單。如果未提供此參數,則 DAG 將設定為預設排程
timedelta(days=1)
。另請參閱 使用 Timetable 自訂 DAG 排程。start_date (datetime.datetime | None) – 排程器將嘗試回填的時間戳記
end_date (datetime.datetime | None) – 您的 DAG 不會在其之後運行的日期,留為 None 表示無限期排程
template_searchpath (str | Iterable[str] | None) – 此資料夾清單(非相對路徑)定義 jinja 將在其中尋找範本的位置。順序很重要。請注意,jinja/airflow 預設包含您的 DAG 檔案的路徑
template_undefined (type[jinja2.StrictUndefined]) – 範本未定義類型。
user_defined_macros (dict | None) – 將在您的 jinja 範本中公開的巨集字典。例如,將
dict(foo='bar')
傳遞給此參數可讓您在與此 DAG 相關的所有 jinja 範本中使用{{ foo }}
。請注意,您可以在此處傳遞任何類型的物件。user_defined_filters (dict | None) – 將在您的 jinja 範本中公開的篩選器字典。例如,將
dict(hello=lambda name: 'Hello %s' % name)
傳遞給此參數可讓您在與此 DAG 相關的所有 jinja 範本中使用{{ 'world' | hello }}
。default_args (dict | None) – 在初始化運算子時用作建構子關鍵字參數的預設參數字典。請注意,運算子具有相同的 hook,並且優先於此處定義的 hook,這表示如果您的字典在此處包含 ‘depends_on_past’: True,並且在運算子的呼叫 default_args 中包含 ‘depends_on_past’: False,則實際值將為 False。
params (collections.abc.MutableMapping | None) – DAG 層級參數的字典,可在範本中存取,命名空間位於 params 下。這些參數可以在任務層級覆寫。
max_active_tasks (int) – 允許同時執行的任務實例數
max_active_runs (int) – 最大活動 DAG 執行次數,超出此數量的 DAG 執行處於執行狀態,排程器將不會建立新的活動 DAG 執行
max_consecutive_failed_dag_runs (int) – (實驗性)連續失敗 DAG 執行的最大次數,超出此次數排程器將停用 DAG
dagrun_timeout (datetime.timedelta | None) – 指定 DagRun 應在逾時/失敗之前運行的時間長度,以便可以建立新的 DagRun。
sla_miss_callback (None | SLAMissCallback | list[SLAMissCallback]) – 指定在報告 SLA 逾時時要呼叫的函數或函數清單。請參閱 sla_miss_callback,以取得有關函數簽名和傳遞給回呼的參數的更多資訊。
default_view (str) – 指定 DAG 預設視圖(grid、graph、duration、gantt、landing_times),預設為 grid
orientation (str) – 在圖形視圖中指定 DAG 方向(LR、TB、RL、BT),預設為 LR
catchup (bool) – 執行排程器補追(還是僅執行最新版本)?預設為 True
on_failure_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 當此 DAG 的 DagRun 失敗時要呼叫的函數或函數清單。上下文字典作為單一參數傳遞給此函數。
on_success_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 與
on_failure_callback
非常相似,不同之處在於它在 DAG 成功時執行。access_control (dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None) – 指定選用的 DAG 層級動作,例如「{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}」,或者如果存在 DAG 執行資源,則可以指定資源名稱,例如「{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}}」
is_paused_upon_creation (bool | None) – 指定首次建立 DAG 時是否暫停。如果 DAG 已存在,則會忽略此旗標。如果未指定此選用參數,則將使用全域配置設定。
jinja_environment_kwargs (dict | None) –
要傳遞到 Jinja
Environment
以進行範本呈現的其他配置選項範例:避免 Jinja 從範本字串中移除尾隨換行符
DAG( dag_id="my-dag", jinja_environment_kwargs={ "keep_trailing_newline": True, # some other jinja2 Environment options here }, )
render_template_as_native_obj (bool) – 如果為 True,則使用 Jinja
NativeEnvironment
將範本呈現為原生 Python 類型。如果為 False,則使用 JinjaEnvironment
將範本呈現為字串值。tags (list[str] | None) – 標籤清單,有助於在 UI 中篩選 DAG。
owner_links (dict[str, str] | None) – 擁有者及其連結的字典,這些連結將在 DAG 視圖 UI 上可點擊。可以用作 HTTP 連結(例如指向您的 Slack 頻道的連結),或 mailto 連結。例如:{“dag_owner”: “https://airflow.dev.org.tw/”}
auto_register (bool) – 在
with
區塊中使用此 DAG 時自動註冊fail_stop (bool) – 當 DAG 中的任務失敗時,使目前正在執行的任務失敗。警告:fail stop DAG 只能包含具有預設觸發規則(「all_success」)的任務。如果 fail stop DAG 中的任何任務具有非預設觸發規則,則會擲回例外。
dag_display_name (str | None) – DAG 的顯示名稱,會出現在 UI 上。
- property relative_fileloc: pathlib.Path[source]¶
可匯入的 dag ‘file’ 檔案位置,相對於已設定的 DAGs 資料夾。
- property task: airflow.decorators.TaskDecoratorCollection[source]¶
- is_fixed_time_schedule()[source]¶
判斷排程是否具有固定時間(例如每天凌晨 3 點)。
透過「查看」接下來兩個 cron 觸發時間來進行偵測;如果這兩個時間具有相同的分鐘和小時值,則排程是固定的,我們*不需要*執行 DST 修正。
這假設 DST 發生在整分鐘變更時(例如 12:59 -> 12:00)。
不要試圖理解這實際上意味著什麼。這是舊邏輯,不應在任何地方使用。
- next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]¶
取得此 DAG 在
date_last_automated_dagrun
之後的下一個 DagRun 的相關資訊。這會根據 DAG 的時間表、start_date、end_date 等,計算下一個 DagRun 應運作的時間間隔(其執行日期)以及何時可以排程。這不會檢查最大活動執行次數或任何其他「max_active_tasks」類型限制,而僅根據此 DAG 及其任務的各種日期和間隔欄位執行計算。
- 參數
last_automated_dagrun (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 現有「自動」DagRun(排程或回填,但非手動)的
max(execution_date)
。restricted (bool) – 如果設定為 False(預設為 True),則忽略 DAG 或任務上指定的
start_date
、end_date
和catchup
。
- 傳回值
下一個 dagrun 的 DagRunInfo,如果 dagrun 不會被排程,則為 None。
- 傳回類型
- iter_dagrun_infos_between(earliest, latest, *, align=True)[source]¶
在使用此 DAG 的時間表在給定間隔之間產生 DagRunInfo。
如果 DagRunInfo 實例的
logical_date
不早於earliest
,也不晚於latest
,則會產生這些實例。這些實例會依其logical_date
從最早到最晚排序。如果
align
為False
,則第一次執行將立即在earliest
上發生,即使它沒有落在邏輯時間表排程上。預設值為True
,但為了向後相容性,子 DAG 將忽略此值,並且始終表現得好像此值設定為False
。範例:DAG 排程為每天午夜執行 (
0 0 * * *
)。如果earliest
為2021-06-03 23:00:00
,則如果align=False
,第一個 DagRunInfo 將為2021-06-03 23:00:00
,如果align=True
,則為2021-06-04 00:00:00
。
- get_run_dates(start_date, end_date=None)[source]¶
使用此 DAG 的排程間隔,傳回在收到的參數間隔之間的日期列表。
傳回的日期可以用於執行日期。
- 參數
start_date – 間隔的開始日期。
end_date – 間隔的結束日期。預設為
timezone.utcnow()
。
- 傳回值
在間隔內遵循 DAG 排程的日期列表。
- 傳回類型
- param(name, default=NOTSET)[source]¶
傳回目前 DAG 的 DagParam 物件。
- 參數
name (str) – DAG 參數名稱。
default (Any) – DAG 參數的回退值。
- 傳回值
指定名稱和目前 DAG 的 DagParam 實例。
- 傳回類型
- static fetch_callback(dag, dag_run_id, success=True, reason=None, *, session=NEW_SESSION)[source]¶
根據 success 的值,提取適當的回調函數。
此方法獲取此 DagRun 中單個 TaskInstance 的上下文,並將其與回調函數列表一起返回。
- 參數
dag (DAG) – DAG 物件
dag_run_id (str) – DAG 執行 ID
success (bool) – 標記,用於指定是否應調用失敗或成功回調函數
reason (str | None) – 完成原因
session (sqlalchemy.orm.session.Session) – 資料庫會話
- handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]¶
根據情況觸發 on_failure_callback 或 on_success_callback。
此方法獲取此 DagRun 中單個 TaskInstance 的上下文,並將其與 'reason' 一起傳遞給可調用物件,主要用於區分 DagRun 失敗。
- 參數
dagrun (airflow.models.dagrun.DagRun) – DagRun 物件
success – 標記,用於指定是否應調用失敗或成功回調函數
reason – 完成原因
session – 資料庫會話
- get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)[source]¶
返回處於 “running” 狀態的活動 dag run 的數量。
- 參數
external_trigger – 若為外部觸發的活動 dag run,則為 True
session –
- 傳回值
活動 dag run 的數量,大於 0
- static fetch_dagrun(dag_id, execution_date=None, run_id=None, session=NEW_SESSION)[source]¶
如果存在,則返回給定執行日期或 run_id 的 dag run,否則返回 None。
- 參數
dag_id (str) – 要尋找的 DAG 的 dag_id。
execution_date (datetime.datetime | None) – 要尋找的 DagRun 的執行日期。
run_id (str | None) – 要尋找的 DagRun 的 run_id。
session (sqlalchemy.orm.session.Session) –
- 傳回值
如果找到 DagRun,則返回 DagRun,否則返回 None。
- 傳回類型
airflow.models.dagrun.DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic
- get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]¶
返回 start_date(包含)和 end_date(包含)之間的 dag run 列表。
- 參數
start_date – 要尋找的 DagRun 的起始執行日期。
end_date – 要尋找的 DagRun 的結束執行日期。
session –
- 傳回值
找到的 DagRun 列表。
- get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]¶
取得
base_date
之前(包含base_date
)的num
個任務實例。傳回的列表可能包含對應於任何 DagRunType 的剛好
num
個任務實例。如果base_date
之前排定的 DAG 執行次數少於num
次,則列表中的任務實例可能會更少。
- set_task_instance_state(*, task_id, map_indexes=None, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
設定 TaskInstance 的狀態,並清除處於 failed 或 upstream_failed 狀態的下游任務。
- 參數
task_id (str) – TaskInstance 的任務 ID
map_indexes (Collection[int] | None) – 僅在 TaskInstance 的 map_index 符合時才設定 TaskInstance。如果為 None(預設值),則會設定任務的所有映射 TaskInstance。
execution_date (datetime.datetime | None) – TaskInstance 的執行日期
run_id (str | None) – TaskInstance 的 run_id
state (airflow.utils.state.TaskInstanceState) – 要將 TaskInstance 設定為的狀態
upstream (bool) – 包含給定 task_id 的所有上游任務
downstream (bool) – 包含給定 task_id 的所有下游任務
future (bool) – 包含給定 task_id 的所有未來 TaskInstance
commit (bool) – 提交變更
past (bool) – 包含給定 task_id 的所有過去 TaskInstance
- set_task_group_state(*, group_id, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
將 TaskGroup 設定為給定狀態,並清除處於 failed 或 upstream_failed 狀態的下游任務。
- 參數
group_id (str) – TaskGroup 的 group_id
execution_date (datetime.datetime | None) – TaskInstance 的執行日期
run_id (str | None) – TaskInstance 的 run_id
state (airflow.utils.state.TaskInstanceState) – 要將 TaskInstance 設定為的狀態
upstream (bool) – 包含給定 task_id 的所有上游任務
downstream (bool) – 包含給定 task_id 的所有下游任務
future (bool) – 包含給定 task_id 的所有未來 TaskInstance
commit (bool) – 提交變更
past (bool) – 包含給定 task_id 的所有過去 TaskInstance
session (sqlalchemy.orm.session.Session) – 新會話
- topological_sort(include_subdag_tasks=False)[source]¶
以拓撲順序排序任務,使任務在任何上游依賴項之後出現。
已棄用,改用
task_group.topological_sort
- set_dag_runs_state(state=DagRunState.RUNNING, session=NEW_SESSION, start_date=None, end_date=None, dag_ids=None)[source]¶
- clear(task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state=DagRunState.QUEUED, dry_run=False, session=NEW_SESSION, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids=frozenset())[source]¶
清除與目前 dag 相關聯的一組任務實例,針對指定的日期範圍。
- 參數
task_ids (Collection[str | tuple[str, int]] | None) – 要清除的任務 ID 或 (
task_id
,map_index
) 元組的列表start_date (datetime.datetime | None) – 要清除的最小 execution_date
end_date (datetime.datetime | None) – 要清除的最大 execution_date
only_failed (bool) – 僅清除失敗的任務
only_running (bool) – 僅清除正在運行的任務。
confirm_prompt (bool) – 詢問確認
include_subdags (bool) – 清除子 DAG 中的任務,並清除 ExternalTaskMarker 指示的外部任務
include_parentdag (bool) – 清除子 DAG 的父 DAG 中的任務。
dag_run_state (airflow.utils.state.DagRunState) – 要將 DagRun 設定為的狀態。如果設定為 False,則不會變更 dagrun 狀態。
dry_run (bool) – 尋找要清除的任務,但不清除它們。
session (sqlalchemy.orm.session.Session) – 要使用的 sqlalchemy 會話
dag_bag (airflow.models.dagbag.DagBag | None) – 用於尋找 DAG 子 DAG 的 DagBag(選用)
exclude_task_ids (frozenset[str] | frozenset[tuple[str, int]] | None) – 不應清除的
task_id
或 (task_id
,map_index
) 元組的集合
- classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]¶
- partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]¶
根據與一個或多個任務匹配的 regex,傳回目前 dag 的子集。
根據應與一個或多個任務匹配的 regex,傳回目前 dag 的子集,作為目前 dag 的深層副本,並根據傳遞的標記包含上游和下游鄰居。
- add_tasks(tasks)[source]¶
將任務列表新增至 DAG。
- 參數
tasks (Iterable[airflow.models.operator.Operator]) – 您想要新增的任務列表
- run(start_date=None, end_date=None, mark_success=False, local=False, donot_pickle=airflow_conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False, disable_retry=False)[source]¶
執行 DAG。
- 參數
start_date – 要執行的範圍的開始日期
end_date – 要執行的範圍的結束日期
mark_success – 若為 True,則將作業標記為成功,而無需實際執行
local – 若為 True,則使用 LocalExecutor 執行任務
executor – 用於執行任務的 Executor 實例
donot_pickle – 若為 True,則避免序列化 DAG 物件並傳送至 worker
ignore_task_deps – 若為 True,則跳過上游任務
ignore_first_depends_on_past – 若為 True,則僅針對第一組任務忽略 depends_on_past 依賴項
pool – 要使用的資源池
delay_on_limit_secs – 當達到 max_active_runs 限制時,在下次嘗試執行 dag run 之前等待的秒數
verbose – 使日誌輸出更詳細
conf – 從 CLI 傳遞的使用者定義字典
rerun_failed_tasks –
run_backwards –
run_at_least_once – 若為 true,即使在時間範圍內沒有邏輯執行,也始終至少執行 DAG 一次。
- test(execution_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]¶
針對給定的 DAG 和執行日期,執行單個 DagRun。
- 參數
execution_date (datetime.datetime | None) – DAG 執行的執行日期
conn_file_path (str | None) – yaml 或 json 格式的連線檔案路徑
variable_file_path (str | None) – yaml 或 json 格式的變數檔案路徑
use_executor (bool) – 若設定,則使用 executor 測試 DAG
mark_success_pattern (Pattern | str | None) – 要標記為成功而非執行的 task_ids 的正則表達式
session (sqlalchemy.orm.session.Session) – 資料庫連線 (選用)
- create_dagrun(state, execution_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_hash=None, creating_job_id=None, data_interval=None)[source]¶
從此 dag 建立 dag run,包含與此 dag 關聯的任務。
返回 dag run。
- 參數
run_id (str | None) – 定義此 dag run 的 run id
run_type (airflow.utils.types.DagRunType | None) – DagRun 的類型
execution_date (datetime.datetime | None) – 此 dag run 的執行日期
state (airflow.utils.state.DagRunState) – dag run 的狀態
start_date (datetime.datetime | None) – 應評估此 dag run 的日期
external_trigger (bool | None) – 此 dag run 是否為外部觸發
conf (dict | None) – 包含要傳遞給 DAG 的組態/參數的字典
creating_job_id (int | None) – 建立此 DagRun 的作業 ID
session (sqlalchemy.orm.session.Session) – 資料庫 session
dag_hash (str | None) – 序列化 DAG 的雜湊值
data_interval (tuple[datetime.datetime, datetime.datetime] | None) – DagRun 的資料間隔
- classmethod bulk_sync_to_db(dags, session=NEW_SESSION)[source]¶
使用 airflow.models.DAG.bulk_write_to_db,此方法已棄用。
- classmethod bulk_write_to_db(dags, processor_subdir=None, session=NEW_SESSION)[source]¶
確保給定 dags 的 DagModel 列在 DB 的 dag 表格中是最新的。
請注意,此方法可以針對 DAG 和 SubDAG 呼叫。SubDag 實際上是 SubDagOperator。
- 參數
dags (Collection[DAG]) – 要儲存到 DB 的 DAG 物件
- 傳回值
無
- sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]¶
將關於此 DAG 的屬性儲存到 DB。
請注意,此方法可以針對 DAG 和 SubDAG 呼叫。SubDag 實際上是 SubDagOperator。
- 傳回值
無
- static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]¶
給定已知 DAG 的列表,停用在 ORM 中標記為活動的任何其他 DAG。
- 參數
active_dag_ids – 活動的 DAG ID 列表
- 傳回值
無
- static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]¶
停用排程器在過期日期之前最後一次觸及的任何 DAG。
這些 DAG 很可能已被刪除。
- 參數
expiration_date – 設定在此時間之前觸及的非活動 DAG
- 傳回值
無
- static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]¶
返回給定 DAG 中任務實例的數量。
- 參數
session – ORM session
dag_id – 要取得任務並行性的 DAG 的 ID
run_id – 要取得任務並行性的 DAG run 的 ID
task_ids – 給定 DAG 的有效任務 ID 列表
states – 如果提供,則要篩選的狀態列表
- 傳回值
正在執行的任務數量
- 傳回類型
- set_edge_info(upstream_task_id, downstream_task_id, info)[source]¶
在 DAG 上設定給定的邊緣資訊。
請注意,這將覆寫,而不是與現有資訊合併。
- class airflow.models.dag.DagTag(name, doc)[source]¶
繼承自:
airflow.models.base.Base
每個 DAG 的標籤名稱,允許在 DAG 視圖中快速篩選。
- class airflow.models.dag.DagOwnerAttributes(name, doc)[source]¶
繼承自:
airflow.models.base.Base
定義不同擁有者屬性的表格。
例如,擁有者的連結將作為超連結傳遞到「DAGs」視圖。
- class airflow.models.dag.DagModel(concurrency=None, **kwargs)[source]¶
繼承自:
airflow.models.base.Base
包含 DAG 屬性的表格。
- property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]¶
- property relative_fileloc: pathlib.Path | None[source]¶
可匯入的 dag ‘file’ 檔案位置,相對於已設定的 DAGs 資料夾。
- set_is_paused(is_paused, including_subdags=True, session=NEW_SESSION)[source]¶
暫停/取消暫停 DAG。
- 參數
is_paused (bool) – DAG 是否已暫停
including_subdags (bool) – 是否包含 DAG 的子 DAG
session – session
- classmethod deactivate_deleted_dags(alive_dag_filelocs, processor_subdir, session=NEW_SESSION)[source]¶
針對 DAG 檔案已被移除的 DAG,設定
is_active=False
。- 參數
alive_dag_filelocs (Container[str]) – 存活 DAG 的檔案路徑
processor_subdir (str) – DAG 處理器子目錄
session (sqlalchemy.orm.session.Session) – ORM Session
- classmethod dags_needing_dagruns(session)[source]¶
回傳(並鎖定)一個 DAG 物件列表,這些物件即將建立新的 DagRun。
這將回傳一個結果集,其中包含使用 “SELECT … FOR UPDATE” 查詢進行列級鎖定的列,您應確保所有排程決策都在單一交易中完成 – 一旦交易提交,它將被解鎖。
- airflow.models.dag.dag(dag_id='', description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]¶
Python DAG 裝飾器,將函數包裝成 Airflow DAG。
接受操作器 kwargs 的 kwargs。可用於參數化 DAG。
- 參數
dag_args – DAG 物件的引數
dag_kwargs – DAG 物件的 Kwargs。