Google Cloud Data Catalog 運算子¶
The Data Catalog 是一項完全託管且可擴展的中繼資料管理服務,可讓組織快速探索、管理和理解 Google Cloud 中的所有資料。它提供:
一個簡單易用的資料探索搜尋介面,由支援 Gmail 和 Drive 的相同 Google 搜尋技術提供支援
一個彈性且強大的編目系統,用於捕獲技術和業務中繼資料
一種用於敏感資料的自動標記機制,具有 DLP API 整合
先決條件任務¶
若要使用這些運算子,您必須執行幾件事:
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
啟用您專案的計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[google]'安裝的詳細資訊請參閱 Installation。
管理條目¶
運算子使用 Entry
來表示條目
取得條目¶
取得條目是透過 CloudDataCatalogGetEntryOperator
和 CloudDataCatalogLookupEntryOperator
運算子執行的。
CloudDataCatalogGetEntryOperator
使用專案 ID、條目群組 ID、條目 ID 來取得條目。
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
CloudDataCatalogLookupEntryOperator
使用資源名稱來取得條目。
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
您可以使用 Jinja 模板,搭配 linked_resource
, sql_resource
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
建立條目¶
CloudDataCatalogCreateEntryOperator
運算子會建立條目。
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry_id
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
新建立的條目 ID 可以透過 entry_id
鍵讀取。
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
更新條目¶
CloudDataCatalogUpdateEntryOperator
運算子會更新條目。
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
您可以使用 Jinja 模板,搭配 entry
, update_mask
, location
, entry_group
, entry_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
刪除條目¶
CloudDataCatalogDeleteEntryOperator
運算子會刪除條目。
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
管理條目群組¶
運算子使用 Entry
來表示條目群組。
建立條目群組¶
CloudDataCatalogCreateEntryGroupOperator
運算子會建立條目群組。
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
您可以使用 Jinja 模板,搭配 location
, entry_group_id
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
新建立的條目群組 ID 可以透過 entry_group_id
鍵讀取。
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
取得條目群組¶
CloudDataCatalogGetEntryGroupOperator
運算子會取得條目群組。
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, read_mask
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
刪除條目群組¶
CloudDataCatalogDeleteEntryGroupOperator
運算子會刪除條目群組。
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
管理標籤範本¶
運算子使用 TagTemplate
來表示標籤範本。
建立標籤範本¶
CloudDataCatalogCreateTagTemplateOperator
運算子會建立標籤範本。
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
您可以使用 Jinja 模板,搭配 location
, tag_template_id
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
新建立的標籤範本 ID 可以透過 tag_template_id
鍵讀取。
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
刪除標籤範本¶
CloudDataCatalogDeleteTagTemplateOperator
運算子會刪除標籤範本。
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
您可以使用 Jinja 模板,搭配 location
, tag_template
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
取得標籤範本¶
CloudDataCatalogGetTagTemplateOperator
運算子會取得標籤範本。
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
您可以使用 Jinja 模板,搭配 location
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
更新標籤範本¶
CloudDataCatalogUpdateTagTemplateOperator
運算子會更新標籤範本。
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
您可以使用 Jinja 模板,搭配 tag_template
, update_mask
, location
, tag_template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
管理標籤¶
運算子使用 Tag
來表示標籤。
在條目上建立標籤¶
CloudDataCatalogCreateTagOperator
運算子會建立標籤。
create_tag = CloudDataCatalogCreateTagOperator(
task_id="create_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
template_id=TEMPLATE_ID,
tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry
, tag
, template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
新建立的標籤 ID 可以透過 tag_id
鍵讀取。
create_tag_result = BashOperator(
task_id="create_tag_result",
bash_command=f"echo {tag_id}",
)
更新標籤¶
CloudDataCatalogUpdateTagOperator
運算子會更新標籤。
update_tag = CloudDataCatalogUpdateTagOperator(
task_id="update_tag",
tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
update_mask={"paths": ["fields"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag_id=tag_id,
)
您可以使用 Jinja 模板,搭配 tag
, update_mask
, location
, entry_group
, entry
, tag_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
刪除標籤¶
CloudDataCatalogDeleteTagOperator
運算子會刪除標籤。
delete_tag = CloudDataCatalogDeleteTagOperator(
task_id="delete_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag=tag_id,
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry
, tag
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
列出條目上的標籤¶
CloudDataCatalogListTagsOperator
運算子會取得條目上的標籤列表。
list_tags = CloudDataCatalogListTagsOperator(
task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以使用 Jinja 模板,搭配 location
, entry_group
, entry
, page_size
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
管理標籤範本欄位¶
運算子使用 TagTemplateField
來表示標籤範本欄位。
建立欄位¶
CloudDataCatalogCreateTagTemplateFieldOperator
運算子會建立標籤範本欄位。
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
您可以使用 Jinja 模板,搭配 location
, tag_template
, tag_template_field_id
, tag_template_field
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
新建立的欄位 ID 可以透過 tag_template_field_id
鍵讀取。
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
重新命名欄位¶
CloudDataCatalogRenameTagTemplateFieldOperator
運算子會重新命名標籤範本欄位。
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
您可以使用 Jinja 模板,搭配 location
, tag_template
, field
, new_tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
更新欄位¶
CloudDataCatalogUpdateTagTemplateFieldOperator
運算子會更新標籤範本欄位。
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
您可以使用 Jinja 模板,搭配 tag_template_field
, update_mask
, tag_template_field_name
, location
, tag_template
, tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
刪除欄位¶
CloudDataCatalogDeleteTagTemplateFieldOperator
運算子會刪除標籤範本欄位。
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
您可以使用 Jinja 模板,搭配 location
, tag_template
, field
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
搜尋資源¶
CloudDataCatalogSearchCatalogOperator
運算子會在 Data Catalog 中搜尋符合查詢的多個資源,例如條目、標籤。
query 參數應使用 搜尋語法定義。
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
您可以使用 Jinja 模板,搭配 scope
, query
, page_size
, order_by
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
參數,讓您動態決定值。
結果會儲存到 XCom,以便其他運算子使用。
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)