Skip to content

Commit 640be05

Browse files
authored
feat(bigtable): Add support for materialized views for admin client (#11793)
* feat(bigtable): Add support for logical views * feat(bigtable): Add support for Materialized Views to admin client * fix build * fix build * fixed according to PR comments * PR fixes * fix merge * fix PR * wait for create mv\lv operations to complete before returning * oops
1 parent 7eaae0f commit 640be05

2 files changed

Lines changed: 286 additions & 2 deletions

File tree

bigtable/admin.go

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ func logicalViewPath(project, instance, logicalView string) string {
134134
return fmt.Sprintf("%s/logicalViews/%s", instancePrefix(project, instance), logicalView)
135135
}
136136

137+
func materializedlViewPath(project, instance, materializedView string) string {
138+
return fmt.Sprintf("%s/materializedViews/%s", instancePrefix(project, instance), materializedView)
139+
}
140+
137141
// EncryptionInfo represents the encryption info of a table.
138142
type EncryptionInfo struct {
139143
Status *Status
@@ -2788,8 +2792,13 @@ func (iac *InstanceAdminClient) CreateLogicalView(ctx context.Context, instanceI
27882792
Query: conf.Query,
27892793
},
27902794
}
2791-
_, err := iac.iClient.CreateLogicalView(ctx, req)
2792-
return err
2795+
2796+
op, err := iac.iClient.CreateLogicalView(ctx, req)
2797+
if err != nil {
2798+
return err
2799+
}
2800+
resp := btapb.LogicalView{}
2801+
return longrunning.InternalNewOperation(iac.lroClient, op).Wait(ctx, &resp)
27932802
}
27942803

27952804
// LogicalViewInfo contains logical view metadata. This struct is read-only.
@@ -2883,3 +2892,155 @@ func (iac *InstanceAdminClient) DeleteLogicalView(ctx context.Context, instanceI
28832892
_, err := iac.iClient.DeleteLogicalView(ctx, req)
28842893
return err
28852894
}
2895+
2896+
// Materialized Views
2897+
2898+
// CreateMaterializedView creates a new materialized view in an instance.
2899+
func (iac *InstanceAdminClient) CreateMaterializedView(ctx context.Context, instanceID string, conf *MaterializedViewInfo) error {
2900+
if conf.MaterializedViewID == "" {
2901+
return errors.New("MaterializedViewID is required")
2902+
}
2903+
2904+
ctx = mergeOutgoingMetadata(ctx, iac.md)
2905+
mv := &btapb.MaterializedView{
2906+
Query: conf.Query,
2907+
}
2908+
if conf.DeletionProtection != None {
2909+
switch dp := conf.DeletionProtection; dp {
2910+
case Protected:
2911+
mv.DeletionProtection = true
2912+
case Unprotected:
2913+
mv.DeletionProtection = false
2914+
default:
2915+
break
2916+
}
2917+
}
2918+
req := &btapb.CreateMaterializedViewRequest{
2919+
Parent: instancePrefix(iac.project, instanceID),
2920+
MaterializedViewId: conf.MaterializedViewID,
2921+
MaterializedView: mv,
2922+
}
2923+
op, err := iac.iClient.CreateMaterializedView(ctx, req)
2924+
if err != nil {
2925+
return err
2926+
}
2927+
resp := btapb.MaterializedView{}
2928+
return longrunning.InternalNewOperation(iac.lroClient, op).Wait(ctx, &resp)
2929+
}
2930+
2931+
// MaterializedViewInfo contains materialized view metadata. This struct is read-only.
2932+
type MaterializedViewInfo struct {
2933+
MaterializedViewID string
2934+
2935+
Query string
2936+
DeletionProtection DeletionProtection
2937+
}
2938+
2939+
// MaterializedViewInfo retrieves information about a materialized view.
2940+
func (iac *InstanceAdminClient) MaterializedViewInfo(ctx context.Context, instanceID, materializedViewID string) (*MaterializedViewInfo, error) {
2941+
ctx = mergeOutgoingMetadata(ctx, iac.md)
2942+
prefix := instancePrefix(iac.project, instanceID)
2943+
req := &btapb.GetMaterializedViewRequest{
2944+
Name: materializedlViewPath(iac.project, instanceID, materializedViewID),
2945+
}
2946+
var res *btapb.MaterializedView
2947+
2948+
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
2949+
var err error
2950+
res, err = iac.iClient.GetMaterializedView(ctx, req)
2951+
return err
2952+
}, retryOptions...)
2953+
2954+
if err != nil {
2955+
return nil, err
2956+
}
2957+
mv := &MaterializedViewInfo{MaterializedViewID: strings.TrimPrefix(res.Name, prefix+"/materializedViews/"), Query: res.Query}
2958+
if res.DeletionProtection {
2959+
mv.DeletionProtection = Protected
2960+
} else {
2961+
mv.DeletionProtection = Unprotected
2962+
}
2963+
return mv, nil
2964+
}
2965+
2966+
// MaterializedViews returns a list of the materialized views in the instance.
2967+
func (iac *InstanceAdminClient) MaterializedViews(ctx context.Context, instanceID string) ([]MaterializedViewInfo, error) {
2968+
views := []MaterializedViewInfo{}
2969+
prefix := instancePrefix(iac.project, instanceID)
2970+
req := &btapb.ListMaterializedViewsRequest{
2971+
Parent: prefix,
2972+
}
2973+
var res *btapb.ListMaterializedViewsResponse
2974+
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
2975+
var err error
2976+
res, err = iac.iClient.ListMaterializedViews(ctx, req)
2977+
return err
2978+
}, retryOptions...)
2979+
if err != nil {
2980+
return nil, err
2981+
}
2982+
2983+
for _, mView := range res.MaterializedViews {
2984+
mv := MaterializedViewInfo{MaterializedViewID: strings.TrimPrefix(mView.Name, prefix+"/materializedViews/"), Query: mView.Query}
2985+
if mView.DeletionProtection {
2986+
mv.DeletionProtection = Protected
2987+
} else {
2988+
mv.DeletionProtection = Unprotected
2989+
}
2990+
views = append(views, mv)
2991+
}
2992+
return views, nil
2993+
}
2994+
2995+
// UpdateMaterializedView updates a materialized view in an instance according to the given configuration.
2996+
func (iac *InstanceAdminClient) UpdateMaterializedView(ctx context.Context, instanceID string, conf MaterializedViewInfo) error {
2997+
ctx = mergeOutgoingMetadata(ctx, iac.md)
2998+
if conf.MaterializedViewID == "" {
2999+
return errors.New("MaterializedViewID is required")
3000+
}
3001+
mv := &btapb.MaterializedView{}
3002+
mv.Name = materializedlViewPath(iac.project, instanceID, conf.MaterializedViewID)
3003+
3004+
updateMask := &field_mask.FieldMask{
3005+
Paths: []string{},
3006+
}
3007+
if conf.Query != "" {
3008+
updateMask.Paths = append(updateMask.Paths, "query")
3009+
mv.Query = conf.Query
3010+
}
3011+
if conf.DeletionProtection != None {
3012+
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
3013+
switch dp := conf.DeletionProtection; dp {
3014+
case Protected:
3015+
mv.DeletionProtection = true
3016+
case Unprotected:
3017+
mv.DeletionProtection = false
3018+
default:
3019+
break
3020+
}
3021+
}
3022+
req := &btapb.UpdateMaterializedViewRequest{
3023+
MaterializedView: mv,
3024+
UpdateMask: updateMask,
3025+
}
3026+
lro, err := iac.iClient.UpdateMaterializedView(ctx, req)
3027+
if err != nil {
3028+
return fmt.Errorf("error from update materialized view: %w", err)
3029+
}
3030+
var res btapb.MaterializedView
3031+
op := longrunning.InternalNewOperation(iac.lroClient, lro)
3032+
if err = op.Wait(ctx, &res); err != nil {
3033+
return fmt.Errorf("error from operation: %v", err)
3034+
}
3035+
return nil
3036+
}
3037+
3038+
// DeleteMaterializedView deletes a materialized view in an instance.
3039+
func (iac *InstanceAdminClient) DeleteMaterializedView(ctx context.Context, instanceID, materializedViewID string) error {
3040+
ctx = mergeOutgoingMetadata(ctx, iac.md)
3041+
req := &btapb.DeleteMaterializedViewRequest{
3042+
Name: materializedlViewPath(iac.project, instanceID, materializedViewID),
3043+
}
3044+
_, err := iac.iClient.DeleteMaterializedView(ctx, req)
3045+
return err
3046+
}

bigtable/integration_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4442,6 +4442,129 @@ func TestIntegration_AdminLogicalView(t *testing.T) {
44424442
}
44434443
}
44444444

4445+
func TestIntegration_AdminMaterializedView(t *testing.T) {
4446+
testEnv, err := NewIntegrationEnv()
4447+
if err != nil {
4448+
t.Fatalf("IntegrationEnv: %v", err)
4449+
}
4450+
defer testEnv.Close()
4451+
4452+
if !testEnv.Config().UseProd {
4453+
t.Skip("emulator doesn't support materializedViews")
4454+
}
4455+
4456+
timeout := 15 * time.Minute
4457+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
4458+
defer cancel()
4459+
4460+
adminClient, err := testEnv.NewAdminClient()
4461+
if err != nil {
4462+
t.Fatalf("NewAdminClient: %v", err)
4463+
}
4464+
defer adminClient.Close()
4465+
4466+
instanceAdminClient, err := testEnv.NewInstanceAdminClient()
4467+
if err != nil {
4468+
t.Fatalf("NewInstanceAdminClient: %v", err)
4469+
}
4470+
defer instanceAdminClient.Close()
4471+
4472+
tblConf := TableConf{
4473+
TableID: testEnv.Config().Table,
4474+
Families: map[string]GCPolicy{
4475+
"fam1": MaxVersionsPolicy(1),
4476+
"fam2": MaxVersionsPolicy(2),
4477+
},
4478+
}
4479+
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
4480+
t.Fatalf("Creating table from TableConf: %v", err)
4481+
}
4482+
// Delete the table at the end of the test. Schedule ahead of time
4483+
// in case the client fails
4484+
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
4485+
4486+
// Create materialized view
4487+
materializedViewUUID := uid.NewSpace("materializedView-", &uid.Options{})
4488+
materializedView := materializedViewUUID.New()
4489+
defer instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView)
4490+
4491+
materializedViewInfo := MaterializedViewInfo{
4492+
MaterializedViewID: materializedView,
4493+
Query: fmt.Sprintf("SELECT _key, fam1[col1] as col FROM %s", tblConf.TableID),
4494+
DeletionProtection: Protected,
4495+
}
4496+
if err = instanceAdminClient.CreateMaterializedView(ctx, testEnv.Config().Instance, &materializedViewInfo); err != nil {
4497+
t.Fatalf("Creating materialized view: %v", err)
4498+
}
4499+
4500+
// List materialized views
4501+
materializedViews, err := instanceAdminClient.MaterializedViews(ctx, testEnv.Config().Instance)
4502+
if err != nil {
4503+
t.Fatalf("Listing materialized views: %v", err)
4504+
}
4505+
if got, want := len(materializedViews), 1; got != want {
4506+
t.Fatalf("Listing materialized views count: %d, want: != %d", got, want)
4507+
}
4508+
if got, want := materializedViews[0].MaterializedViewID, materializedView; got != want {
4509+
t.Errorf("MaterializedView Name: %s, want: %s", got, want)
4510+
}
4511+
if got, want := materializedViews[0].Query, materializedViewInfo.Query; got != want {
4512+
t.Errorf("MaterializedView Query: %q, want: %q", got, want)
4513+
}
4514+
4515+
// Get materialized view
4516+
mvInfo, err := instanceAdminClient.MaterializedViewInfo(ctx, testEnv.Config().Instance, materializedView)
4517+
if err != nil {
4518+
t.Fatalf("Getting materialized view: %v", err)
4519+
}
4520+
if got, want := mvInfo.Query, materializedViewInfo.Query; got != want {
4521+
t.Errorf("MaterializedView Query: %q, want: %q", got, want)
4522+
}
4523+
// Cannot delete the authorized view because it is deletion protected
4524+
if err = instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView); err == nil {
4525+
t.Fatalf("DeleteMaterializedView: %v", err)
4526+
}
4527+
4528+
// Update authorized view
4529+
4530+
// Update materialized view
4531+
newMaterializedViewInfo := MaterializedViewInfo{
4532+
MaterializedViewID: materializedView,
4533+
DeletionProtection: Unprotected,
4534+
}
4535+
err = instanceAdminClient.UpdateMaterializedView(ctx, testEnv.Config().Instance, newMaterializedViewInfo)
4536+
if err != nil {
4537+
t.Fatalf("UpdateMaterializedView failed: %v", err)
4538+
}
4539+
4540+
// Check that updated materialized view has the correct deletion protection
4541+
mvInfo, err = instanceAdminClient.MaterializedViewInfo(ctx, testEnv.Config().Instance, materializedView)
4542+
if err != nil {
4543+
t.Fatalf("Getting materialized view: %v", err)
4544+
}
4545+
if got, want := mvInfo.DeletionProtection, Unprotected; got != want {
4546+
t.Errorf("MaterializedViewInfo deletion protection: %v, want: %v", got, want)
4547+
}
4548+
// Check that the subset_view field doesn't change
4549+
if got, want := mvInfo.Query, materializedViewInfo.Query; cmp.Equal(got, want) {
4550+
t.Errorf("Query: %v, want: %v", got, want)
4551+
}
4552+
4553+
// Delete materialized view
4554+
if err = instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView); err != nil {
4555+
t.Fatalf("DeleteMaterializedView: %v", err)
4556+
}
4557+
4558+
// Verify the materialized view was deleted.
4559+
materializedViews, err = instanceAdminClient.MaterializedViews(ctx, testEnv.Config().Instance)
4560+
if err != nil {
4561+
t.Fatalf("Listing materialized views: %v", err)
4562+
}
4563+
if got, want := len(materializedViews), 0; got != want {
4564+
t.Fatalf("Listing materialized views count: %d, want: != %d", got, want)
4565+
}
4566+
}
4567+
44454568
// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
44464569
func TestIntegration_DirectPathFallback(t *testing.T) {
44474570
ctx := context.Background()

0 commit comments

Comments
 (0)