Google Cloud Bigtable Operators¶
先決條件任務¶
若要使用這些 operators,您必須完成幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
依照 Google Cloud 文件中的說明,為您的專案啟用計費功能。
依照 Cloud Console 文件中的說明,啟用 API。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'安裝 提供詳細資訊。
BigtableCreateInstanceOperator¶
使用 BigtableCreateInstanceOperator
建立 Google Cloud Bigtable 執行個體。
如果具有指定 ID 的 Cloud Bigtable 執行個體已存在,operator 不會比較其組態,並立即成功。不會對現有的執行個體進行任何變更。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
create_instance_task = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task",
)
BigtableUpdateInstanceOperator¶
使用 BigtableUpdateInstanceOperator
更新現有的 Google Cloud Bigtable 執行個體。
只有以下組態可以針對現有的執行個體進行更新:instance_display_name、instance_type 和 instance_labels。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID_1,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=CBT_INSTANCE_TYPE_PROD,
instance_labels=CBT_INSTANCE_LABELS_UPDATED,
task_id="update_instance_task",
)
BigtableDeleteInstanceOperator¶
使用 BigtableDeleteInstanceOperator
刪除 Google Cloud Bigtable 執行個體。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
delete_instance_task = BigtableDeleteInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
task_id="delete_instance_task",
)
BigtableUpdateClusterOperator¶
使用 BigtableUpdateClusterOperator
修改 Cloud Bigtable 叢集中節點的數量。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
cluster_update_task = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task",
)
BigtableCreateTableOperator¶
在 Cloud Bigtable 執行個體中建立表格。
如果具有指定 ID 的表格已存在於 Cloud Bigtable 執行個體中,operator 會比較 Column Families。如果 Column Families 完全相同,operator 會成功。否則,operator 會失敗並顯示適當的錯誤訊息。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
create_table_task = BigtableCreateTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="create_table",
)
進階¶
建立表格時,您可以指定選用的 initial_split_keys
和 column_families
。請參閱 Google Cloud Bigtable 的 Python 用戶端文件 以了解表格 和 以了解 Column Families。
BigtableDeleteTableOperator¶
使用 BigtableDeleteTableOperator
刪除 Google Cloud Bigtable 中的表格。
使用 operator¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
delete_table_task = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
)
BigtableTableReplicationCompletedSensor¶
您可以建立包含或不含專案 ID 的 operator。如果缺少專案 ID,將會從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
使用 BigtableTableReplicationCompletedSensor
等待表格完全複製。
相同的引數適用於此 sensor,如同 BigtableCreateTableOperator。
注意: 如果表格或 Cloud Bigtable 執行個體不存在,此 sensor 會等待表格直到逾時,且不會引發任何例外狀況。
使用 operator¶
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID_2,
table_id=CBT_TABLE_ID,
poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)