Google Cloud Data Catalog 運算子

The Data Catalog 是一項完全託管且可擴展的中繼資料管理服務,可讓組織快速探索、管理和理解 Google Cloud 中的所有資料。它提供:

  • 一個簡單易用的資料探索搜尋介面,由支援 Gmail 和 Drive 的相同 Google 搜尋技術提供支援

  • 一個彈性且強大的編目系統,用於捕獲技術和業務中繼資料

  • 一種用於敏感資料的自動標記機制,具有 DLP API 整合

先決條件任務

若要使用這些運算子,您必須執行幾件事:

管理條目

運算子使用 Entry 來表示條目

取得條目

取得條目是透過 CloudDataCatalogGetEntryOperatorCloudDataCatalogLookupEntryOperator 運算子執行的。

CloudDataCatalogGetEntryOperator 使用專案 ID、條目群組 ID、條目 ID 來取得條目。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

get_entry = CloudDataCatalogGetEntryOperator(
    task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")

CloudDataCatalogLookupEntryOperator 使用資源名稱來取得條目。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

current_entry_template = (
    "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
    "entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
    task_id="lookup_entry",
    linked_resource=current_entry_template.format(
        project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
    ),
)

您可以使用 Jinja 模板,搭配 linked_resource, sql_resource, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

lookup_entry_result = BashOperator(
    task_id="lookup_entry_result",
    bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)

建立條目

CloudDataCatalogCreateEntryOperator 運算子會建立條目。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

create_entry_gcs = CloudDataCatalogCreateEntryOperator(
    task_id="create_entry_gcs",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
    entry={
        "display_name": ENTRY_NAME,
        "type_": "FILESET",
        "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
    },
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry_id, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

新建立的條目 ID 可以透過 entry_id 鍵讀取。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

create_entry_gcs_result = BashOperator(
    task_id="create_entry_gcs_result",
    bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)

更新條目

CloudDataCatalogUpdateEntryOperator 運算子會更新條目。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

update_entry = CloudDataCatalogUpdateEntryOperator(
    task_id="update_entry",
    entry={"display_name": f"{ENTRY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
)

您可以使用 Jinja 模板,搭配 entry, update_mask, location, entry_group, entry_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

刪除條目

CloudDataCatalogDeleteEntryOperator 運算子會刪除條目。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

delete_entry = CloudDataCatalogDeleteEntryOperator(
    task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

管理條目群組

運算子使用 Entry 來表示條目群組。

建立條目群組

CloudDataCatalogCreateEntryGroupOperator 運算子會建立條目群組。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    location=LOCATION,
    entry_group_id=ENTRY_GROUP_ID,
    entry_group={"display_name": ENTRY_GROUP_NAME},
)

您可以使用 Jinja 模板,搭配 location, entry_group_id, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

新建立的條目群組 ID 可以透過 entry_group_id 鍵讀取。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

create_entry_group_result = BashOperator(
    task_id="create_entry_group_result",
    bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)

取得條目群組

CloudDataCatalogGetEntryGroupOperator 運算子會取得條目群組。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

get_entry_group = CloudDataCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    read_mask=FieldMask(paths=["name", "display_name"]),
)

您可以使用 Jinja 模板,搭配 location, entry_group, read_mask, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

get_entry_group_result = BashOperator(
    task_id="get_entry_group_result",
    bash_command=f"echo {get_entry_group.output}",
)

刪除條目群組

CloudDataCatalogDeleteEntryGroupOperator 運算子會刪除條目群組。

tests/system/google/datacatalog/example_datacatalog_entries.py[原始碼]

delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)

您可以使用 Jinja 模板,搭配 location, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

管理標籤範本

運算子使用 TagTemplate 來表示標籤範本。

建立標籤範本

CloudDataCatalogCreateTagTemplateOperator 運算子會建立標籤範本。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
    task_id="create_tag_template",
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
    tag_template={
        "display_name": TAG_TEMPLATE_DISPLAY_NAME,
        "fields": {
            FIELD_NAME_1: TagTemplateField(
                display_name="first-field", type_=dict(primitive_type="STRING")
            )
        },
    },
)

您可以使用 Jinja 模板,搭配 location, tag_template_id, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

新建立的標籤範本 ID 可以透過 tag_template_id 鍵讀取。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

create_tag_template_result = BashOperator(
    task_id="create_tag_template_result",
    bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)

刪除標籤範本

CloudDataCatalogDeleteTagTemplateOperator 運算子會刪除標籤範本。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

您可以使用 Jinja 模板,搭配 location, tag_template, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

取得標籤範本

CloudDataCatalogGetTagTemplateOperator 運算子會取得標籤範本。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

get_tag_template = CloudDataCatalogGetTagTemplateOperator(
    task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)

您可以使用 Jinja 模板,搭配 location, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

get_tag_template_result = BashOperator(
    task_id="get_tag_template_result",
    bash_command=f"echo {get_tag_template.output}",
)

更新標籤範本

CloudDataCatalogUpdateTagTemplateOperator 運算子會更新標籤範本。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

您可以使用 Jinja 模板,搭配 tag_template, update_mask, location, tag_template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

管理標籤

運算子使用 Tag 來表示標籤。

在條目上建立標籤

CloudDataCatalogCreateTagOperator 運算子會建立標籤。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

create_tag = CloudDataCatalogCreateTagOperator(
    task_id="create_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    template_id=TEMPLATE_ID,
    tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry, tag, template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

新建立的標籤 ID 可以透過 tag_id 鍵讀取。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

create_tag_result = BashOperator(
    task_id="create_tag_result",
    bash_command=f"echo {tag_id}",
)

更新標籤

CloudDataCatalogUpdateTagOperator 運算子會更新標籤。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

update_tag = CloudDataCatalogUpdateTagOperator(
    task_id="update_tag",
    tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
    update_mask={"paths": ["fields"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    tag_id=tag_id,
)

您可以使用 Jinja 模板,搭配 tag, update_mask, location, entry_group, entry, tag_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

刪除標籤

CloudDataCatalogDeleteTagOperator 運算子會刪除標籤。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

delete_tag = CloudDataCatalogDeleteTagOperator(
    task_id="delete_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    tag=tag_id,
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry, tag, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

列出條目上的標籤

CloudDataCatalogListTagsOperator 運算子會取得條目上的標籤列表。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

list_tags = CloudDataCatalogListTagsOperator(
    task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

您可以使用 Jinja 模板,搭配 location, entry_group, entry, page_size, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_tags.py[原始碼]

list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")

管理標籤範本欄位

運算子使用 TagTemplateField 來表示標籤範本欄位。

建立欄位

CloudDataCatalogCreateTagTemplateFieldOperator 運算子會建立標籤範本欄位。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
    task_id="create_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_2,
    tag_template_field=TagTemplateField(
        display_name="second-field", type_=FieldType(primitive_type="STRING")
    ),
)

您可以使用 Jinja 模板,搭配 location, tag_template, tag_template_field_id, tag_template_field, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

新建立的欄位 ID 可以透過 tag_template_field_id 鍵讀取。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

create_tag_template_field_result = BashOperator(
    task_id="create_tag_template_field_result",
    bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)

重新命名欄位

CloudDataCatalogRenameTagTemplateFieldOperator 運算子會重新命名標籤範本欄位。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
    task_id="rename_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_1,
    new_tag_template_field_id=FIELD_NAME_3,
)

您可以使用 Jinja 模板,搭配 location, tag_template, field, new_tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

更新欄位

CloudDataCatalogUpdateTagTemplateFieldOperator 運算子會更新標籤範本欄位。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
    task_id="update_tag_template_field",
    tag_template_field={"display_name": "Updated template field"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_1,
)

您可以使用 Jinja 模板,搭配 tag_template_field, update_mask, tag_template_field_name, location, tag_template, tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

刪除欄位

CloudDataCatalogDeleteTagTemplateFieldOperator 運算子會刪除標籤範本欄位。

tests/system/google/datacatalog/example_datacatalog_tag_templates.py[原始碼]

delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
    task_id="delete_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_2,
    force=True,
)

您可以使用 Jinja 模板,搭配 location, tag_template, field, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

搜尋資源

CloudDataCatalogSearchCatalogOperator 運算子會在 Data Catalog 中搜尋符合查詢的多個資源,例如條目、標籤。

query 參數應使用 搜尋語法定義。

tests/system/google/datacatalog/example_datacatalog_search_catalog.py[原始碼]

search_catalog = CloudDataCatalogSearchCatalogOperator(
    task_id="search_catalog",
    scope={"include_project_ids": [PROJECT_ID]},
    query=f"name:{ENTRY_GROUP_NAME}",
)

您可以使用 Jinja 模板,搭配 scope, query, page_size, order_by, retry, timeout, metadata, gcp_conn_id, impersonation_chain 參數,讓您動態決定值。

結果會儲存到 XCom,以便其他運算子使用。

tests/system/google/datacatalog/example_datacatalog_search_catalog.py[原始碼]

search_catalog_result = BashOperator(
    task_id="search_catalog_result",
    bash_command=f"echo {search_catalog.output}",
)

參考文獻

如需更多資訊,請參閱:

請問此條目是否有幫助?