Skip to content

Commit a75c8b0

Browse files
authored
fix(storage): correct direct connectivity check (#11152)
* fix(storage): correct direct connectivity check * refactor direct connectivity tests; experimental doc comment * fix gce check condition * update dual-region check; and remove duplicate * use asia-east1 as outside region * address feedback * add todo for us-west1 dual-region
1 parent 771aa46 commit a75c8b0

2 files changed

Lines changed: 74 additions & 25 deletions

File tree

storage/integration_test.go

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"google.golang.org/api/iterator"
6161
itesting "google.golang.org/api/iterator/testing"
6262
"google.golang.org/api/option"
63+
"google.golang.org/api/option/internaloption"
6364
"google.golang.org/api/transport"
6465
"google.golang.org/grpc/codes"
6566
"google.golang.org/grpc/status"
@@ -326,8 +327,18 @@ var readCases = []readCase{
326327
},
327328
}
328329

329-
func TestIntegration_DetectDirectConnectivity(t *testing.T) {
330-
ctx := skipHTTP("direct connectivity isn't available for json")
330+
// Test in a GCE environment expected to be located in one of:
331+
// - us-west1-a, us-west1-b, us-west-c
332+
//
333+
// The test skips when ran outside of a GCE instance and us-west-*.
334+
//
335+
// Test cases for direct connectivity (DC) check:
336+
// 1. DC detected with co-located GCS bucket in us-west1
337+
// 2. DC not detected with multi region bucket in EU
338+
// 3. DC not detected with dual region bucket in EUR4
339+
// 4. DC not detected with regional bucket in EUROPE-WEST1
340+
func TestIntegration_DetectDirectConnectivityInGCE(t *testing.T) {
341+
ctx := skipHTTP("grpc only test")
331342
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
332343
h := testHelper{t}
333344
// Using Resoource Detector to detect if test is being ran inside GCE
@@ -340,32 +351,68 @@ func TestIntegration_DetectDirectConnectivity(t *testing.T) {
340351
t.Fatalf("resource.New: %v", err)
341352
}
342353
attrs := detectedAttrs.Set()
343-
if v, exists := attrs.Value("cloud.platform"); exists && v.AsString() == "gcp_compute_engine" {
344-
v, exists = attrs.Value("cloud.region")
345-
if !exists {
346-
t.Fatalf("CheckDirectConnectivitySupported: region not detected")
347-
}
348-
region := v.AsString()
349-
newBucketName := prefix + uidSpace.New()
350-
newBucket := client.Bucket(newBucketName)
351-
h.mustCreate(newBucket, testutil.ProjID(), &BucketAttrs{Location: region, LocationType: "region"})
352-
defer h.mustDeleteBucket(newBucket)
353-
err := CheckDirectConnectivitySupported(ctx, newBucketName)
354-
if err != nil {
355-
t.Fatalf("CheckDirectConnectivitySupported: %v", err)
356-
}
357-
} else {
358-
err = CheckDirectConnectivitySupported(ctx, bucket)
359-
if err == nil {
360-
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
361-
}
362-
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
363-
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
364-
}
354+
if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" {
355+
t.Skip("only testable in a GCE instance")
356+
}
357+
if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") {
358+
t.Skip("inside a GCE instance but region is not us-west1")
359+
}
360+
for _, test := range []struct {
361+
name string
362+
attrs *BucketAttrs
363+
expectDirectConnectivity bool
364+
}{
365+
{
366+
name: "co-located-bucket",
367+
attrs: &BucketAttrs{Location: "us-west1"},
368+
expectDirectConnectivity: true,
369+
},
370+
{
371+
name: "not-colocated-multi-region-bucket",
372+
attrs: &BucketAttrs{Location: "EU"},
373+
},
374+
{
375+
name: "not-colocated-dual-region-bucket",
376+
attrs: &BucketAttrs{Location: "EUR4"},
377+
},
378+
{
379+
name: "not-colocated-region-bucket",
380+
attrs: &BucketAttrs{Location: "EUROPE-WEST1"},
381+
},
382+
// TODO: Add a test for DC dual-region with us-west1 when supported
383+
} {
384+
t.Run(test.name, func(t *testing.T) {
385+
newBucketName := prefix + uidSpace.New()
386+
newBucket := client.Bucket(newBucketName)
387+
h.mustCreate(newBucket, testutil.ProjID(), test.attrs)
388+
defer h.mustDeleteBucket(newBucket)
389+
err := CheckDirectConnectivitySupported(ctx, newBucketName)
390+
if err != nil && test.expectDirectConnectivity {
391+
t.Fatalf("CheckDirectConnectivitySupported: expected direct connectivity but failed: %v", err)
392+
}
393+
if err == nil && !test.expectDirectConnectivity {
394+
t.Fatalf("CheckDirectConnectivitySupported: detected direct connectivity when not expected")
395+
}
396+
})
365397
}
366398
})
367399
}
368400

401+
// Test handles the case when Direct Connectivity is disabled which causes
402+
// "grpc.lb.locality" to return ""
403+
func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) {
404+
ctx := skipHTTP("grpc only test")
405+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
406+
err := CheckDirectConnectivitySupported(ctx, bucket)
407+
if err == nil {
408+
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
409+
}
410+
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
411+
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
412+
}
413+
}, internaloption.EnableDirectPath(false))
414+
}
415+
369416
func TestIntegration_BucketCreateDelete(t *testing.T) {
370417
ctx := skipJSONReads(context.Background(), "no reads in test")
371418
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) {

storage/storage.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
244244
// Direct connectivity is expected to be available when running from inside
245245
// GCP and connecting to a bucket in the same region.
246246
//
247+
// Experimental helper that's subject to change.
248+
//
247249
// You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient]
248250
func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts ...option.ClientOption) error {
249251
view := metric.NewView(
@@ -282,7 +284,7 @@ func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts .
282284
hist := m.Data.(metricdata.Histogram[float64])
283285
for _, d := range hist.DataPoints {
284286
v, present := d.Attributes.Value("grpc.lb.locality")
285-
if present && v.AsString() != "" {
287+
if present && v.AsString() != "" && v.AsString() != "{}" {
286288
return nil
287289
}
288290
}

0 commit comments

Comments
 (0)