airflow.providers.google.cloud.openlineage.utils

模組內容

類別

BigQueryJobRunFacet

表示 bigquery 執行相關統計資訊的 Facet。

函數

extract_ds_name_from_gcs_path(path)

從給定的路徑中提取並處理資料集名稱。

get_facets_from_bq_table(table)

從 BigQuery 表格物件取得 Facet。

get_identity_column_lineage_facet(dest_field_names, ...)

取得身分轉換的欄沿襲 Facet。

get_from_nullable_chain(source, chain)

從物件的巢狀結構中取得物件,其中不保證巢狀結構中的所有鍵都存在。

inject_openlineage_properties_into_dataproc_job(job, ...)

將 OpenLineage 屬性注入到 Spark 工作定義中。

屬性

log

BIGQUERY_NAMESPACE

BIGQUERY_URI

WILDCARD

airflow.providers.google.cloud.openlineage.utils.log[原始碼]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_NAMESPACE = 'bigquery'[原始碼]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_URI = 'bigquery'[原始碼]
airflow.providers.google.cloud.openlineage.utils.WILDCARD = '*'[原始碼]
airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[原始碼]

從給定的路徑中提取並處理資料集名稱。

Args

path: 要處理的路徑,例如 gcs 檔案的路徑。

Returns

已處理的資料集名稱。

範例
>>> extract_ds_name_from_gcs_path("/dir/file.*")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/pre_")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/file.txt")
'dir/file.txt'
>>> extract_ds_name_from_gcs_path("/dir/file.")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/")
'dir'
>>> extract_ds_name_from_gcs_path("")
'/'
>>> extract_ds_name_from_gcs_path("/")
'/'
>>> extract_ds_name_from_gcs_path(".")
'/'
airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[原始碼]

從 BigQuery 表格物件取得 Facet。

airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[原始碼]

取得身分轉換的欄沿襲 Facet。

此函數產生一個簡單的欄沿襲 Facet,其中每個目標欄位都由來自所有輸入資料集中具有該欄位的同名來源欄位組成。沿襲假設未套用任何轉換,表示欄位在來源和目標資料集之間保留其身分。

Args

dest_field_names: 應決定沿襲的目標欄位名稱列表。 input_datasets: 具有綱要 Facet 的輸入資料集列表。

Returns
包含單一鍵 columnLineage 的字典,對應到 ColumnLineageDatasetFacet

如果無法決定欄沿襲,則會傳回空字典 - 請參閱下方的注意事項。

注意事項
  • 如果任何輸入資料集缺少綱要 Facet,函數會立即傳回空字典。

  • 如果來源資料集綱要中的任何欄位未出現在目標表格中,函數會傳回空字典。目標表格可以包含額外的欄位,但所有來源欄位都應出現在目標表格中。

  • 如果沒有任何目標欄位可以與輸入資料集欄位比對,則會傳回空字典。

  • 目標表格中不存在於輸入資料集中的額外欄位會被忽略並在沿襲 Facet 中跳過,因為它們無法追溯到來源欄位。

  • 此函數假設未套用任何轉換,表示欄位在來源和目標資料集之間保留其身分。

class airflow.providers.google.cloud.openlineage.utils.BigQueryJobRunFacet[原始碼]

基底類別: airflow.providers.common.compat.openlineage.facet.RunFacet

表示 bigquery 執行相關統計資訊的 Facet。

此 Facet 用於提供關於 bigquery 執行的統計資訊。

參數
  • cached – BigQuery 快取查詢結果。對於快取查詢,不會提供其餘的統計資訊。

  • billedBytes – BigQuery 計費的位元組數。

  • properties – BigQUery 執行的完整屬性樹狀結構。

cached: bool[原始碼]
billedBytes: int | None[原始碼]
properties: str | None[原始碼]
airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[原始碼]

從物件的巢狀結構中取得物件,其中不保證巢狀結構中的所有鍵都存在。

旨在取代 dict.get() 陳述式的鏈。

使用範例

if (
    not job._properties.get("statistics")
    or not job._properties.get("statistics").get("query")
    or not job._properties.get("statistics").get("query").get("referencedTables")
):
    return None
result = job._properties.get("statistics").get("query").get("referencedTables")

變成

result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"])
if not result:
    return None
airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info)[原始碼]

將 OpenLineage 屬性注入到 Spark 工作定義中。

此函數除了在 Dataproc 工作定義中新增所需的 OpenLineage 屬性(如果尚不存在)之外,不會移除任何組態或以任何其他方式修改工作。

注意
如果發生以下情況,將會跳過對工作的任何修改
  • 無法存取 OpenLineage 提供者。

  • 不支援工作類型。

  • 停用了自動父系工作資訊注入。

  • Spark 工作定義中已存在任何包含父系工作資訊的 OpenLineage 屬性。

Args

job: 原始 Dataproc 工作定義。 context: 工作執行的 Airflow 內容。 inject_parent_job_info: 指示是否注入父系工作資訊的旗標。

Returns

修改後的工作定義,其中注入了 OpenLineage 屬性(如果適用)。

此條目是否有幫助?