Skip to content

Commit 60c2323

Browse files
authored
feat(storage): introduce dp detector based on grpc metrics (#11100)
* feat: introduce dp detector based on grpc metrics * revert changes to go.work.sum * add context on having to still use an exporter * update func doc * rename and fix vet * adjust where exporter is initialized * address feedback * inspect ResourceMetrics instead of using stdoutmetric exporter * shutdown MeterProvider and ManualReader * only include provider.Shutdown() * update docs on return value and xs refactor * address feedback * address feedback on error wrapping and bucket prefix * revert error sentinel
1 parent c2bcbf2 commit 60c2323

2 files changed

Lines changed: 101 additions & 0 deletions

File tree

storage/integration_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import (
5353
"github.com/google/go-cmp/cmp"
5454
"github.com/google/go-cmp/cmp/cmpopts"
5555
"github.com/googleapis/gax-go/v2/apierror"
56+
"go.opentelemetry.io/contrib/detectors/gcp"
57+
"go.opentelemetry.io/otel/sdk/resource"
5658
"golang.org/x/oauth2/google"
5759
"google.golang.org/api/googleapi"
5860
"google.golang.org/api/iterator"
@@ -324,6 +326,46 @@ var readCases = []readCase{
324326
},
325327
}
326328

329+
func TestIntegration_DetectDirectConnectivity(t *testing.T) {
330+
ctx := skipHTTP("direct connectivity isn't available for json")
331+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
332+
h := testHelper{t}
333+
// Using Resoource Detector to detect if test is being ran inside GCE
334+
// if so, the test expects Direct Connectivity to be detected.
335+
// Otherwise, it will only validate that Direct Connectivity was not
336+
// detected.
337+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/resourcedetectionprocessor/README.md
338+
detectedAttrs, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()))
339+
if err != nil {
340+
t.Fatalf("resource.New: %v", err)
341+
}
342+
attrs := detectedAttrs.Set()
343+
if v, exists := attrs.Value("cloud.platform"); exists && v.AsString() == "gcp_compute_engine" {
344+
v, exists = attrs.Value("cloud.region")
345+
if !exists {
346+
t.Fatalf("CheckDirectConnectivitySupported: region not detected")
347+
}
348+
region := v.AsString()
349+
newBucketName := prefix + uidSpace.New()
350+
newBucket := client.Bucket(newBucketName)
351+
h.mustCreate(newBucket, testutil.ProjID(), &BucketAttrs{Location: region, LocationType: "region"})
352+
defer h.mustDeleteBucket(newBucket)
353+
err := CheckDirectConnectivitySupported(ctx, newBucketName)
354+
if err != nil {
355+
t.Fatalf("CheckDirectConnectivitySupported: %v", err)
356+
}
357+
} else {
358+
err = CheckDirectConnectivitySupported(ctx, bucket)
359+
if err == nil {
360+
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
361+
}
362+
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
363+
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
364+
}
365+
}
366+
})
367+
}
368+
327369
func TestIntegration_BucketCreateDelete(t *testing.T) {
328370
ctx := skipJSONReads(context.Background(), "no reads in test")
329371
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) {

storage/storage.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,18 @@ import (
4343
"cloud.google.com/go/storage/internal"
4444
"cloud.google.com/go/storage/internal/apiv2/storagepb"
4545
"github.com/googleapis/gax-go/v2"
46+
"go.opentelemetry.io/otel/attribute"
47+
"go.opentelemetry.io/otel/sdk/metric"
48+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
4649
"golang.org/x/oauth2/google"
4750
"google.golang.org/api/googleapi"
4851
"google.golang.org/api/option"
4952
"google.golang.org/api/option/internaloption"
5053
raw "google.golang.org/api/storage/v1"
5154
"google.golang.org/api/transport"
5255
htransport "google.golang.org/api/transport/http"
56+
"google.golang.org/grpc/experimental/stats"
57+
"google.golang.org/grpc/stats/opentelemetry"
5358
"google.golang.org/protobuf/proto"
5459
"google.golang.org/protobuf/reflect/protoreflect"
5560
"google.golang.org/protobuf/types/known/fieldmaskpb"
@@ -233,6 +238,60 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
233238
return &Client{tc: tc}, nil
234239
}
235240

241+
// CheckDirectConnectivitySupported checks if gRPC direct connectivity
242+
// is available for a specific bucket from the environment where the client
243+
// is running. A `nil` error represents Direct Connectivity was detected.
244+
// Direct connectivity is expected to be available when running from inside
245+
// GCP and connecting to a bucket in the same region.
246+
//
247+
// You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient]
248+
func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts ...option.ClientOption) error {
249+
view := metric.NewView(
250+
metric.Instrument{
251+
Name: "grpc.client.attempt.duration",
252+
Kind: metric.InstrumentKindHistogram,
253+
},
254+
metric.Stream{AttributeFilter: attribute.NewAllowKeysFilter("grpc.lb.locality")},
255+
)
256+
mr := metric.NewManualReader()
257+
provider := metric.NewMeterProvider(metric.WithReader(mr), metric.WithView(view))
258+
// Provider handles shutting down ManualReader
259+
defer provider.Shutdown(ctx)
260+
mo := opentelemetry.MetricsOptions{
261+
MeterProvider: provider,
262+
Metrics: stats.NewMetrics("grpc.client.attempt.duration"),
263+
OptionalLabels: []string{"grpc.lb.locality"},
264+
}
265+
combinedOpts := append(opts, WithDisabledClientMetrics(), option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})))
266+
client, err := NewGRPCClient(ctx, combinedOpts...)
267+
if err != nil {
268+
return fmt.Errorf("storage.NewGRPCClient: %w", err)
269+
}
270+
defer client.Close()
271+
if _, err = client.Bucket(bucket).Attrs(ctx); err != nil {
272+
return fmt.Errorf("Bucket.Attrs: %w", err)
273+
}
274+
// Call manual reader to collect metric
275+
rm := metricdata.ResourceMetrics{}
276+
if err = mr.Collect(context.Background(), &rm); err != nil {
277+
return fmt.Errorf("ManualReader.Collect: %w", err)
278+
}
279+
for _, sm := range rm.ScopeMetrics {
280+
for _, m := range sm.Metrics {
281+
if m.Name == "grpc.client.attempt.duration" {
282+
hist := m.Data.(metricdata.Histogram[float64])
283+
for _, d := range hist.DataPoints {
284+
v, present := d.Attributes.Value("grpc.lb.locality")
285+
if present && v.AsString() != "" {
286+
return nil
287+
}
288+
}
289+
}
290+
}
291+
}
292+
return errors.New("storage: direct connectivity not detected")
293+
}
294+
236295
// Close closes the Client.
237296
//
238297
// Close need not be called at program exit.

0 commit comments

Comments
 (0)