@@ -43,13 +43,18 @@ import (
4343 "cloud.google.com/go/storage/internal"
4444 "cloud.google.com/go/storage/internal/apiv2/storagepb"
4545 "github.com/googleapis/gax-go/v2"
46+ "go.opentelemetry.io/otel/attribute"
47+ "go.opentelemetry.io/otel/sdk/metric"
48+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
4649 "golang.org/x/oauth2/google"
4750 "google.golang.org/api/googleapi"
4851 "google.golang.org/api/option"
4952 "google.golang.org/api/option/internaloption"
5053 raw "google.golang.org/api/storage/v1"
5154 "google.golang.org/api/transport"
5255 htransport "google.golang.org/api/transport/http"
56+ "google.golang.org/grpc/experimental/stats"
57+ "google.golang.org/grpc/stats/opentelemetry"
5358 "google.golang.org/protobuf/proto"
5459 "google.golang.org/protobuf/reflect/protoreflect"
5560 "google.golang.org/protobuf/types/known/fieldmaskpb"
@@ -233,6 +238,60 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
233238 return & Client {tc : tc }, nil
234239}
235240
241+ // CheckDirectConnectivitySupported checks if gRPC direct connectivity
242+ // is available for a specific bucket from the environment where the client
243+ // is running. A `nil` error represents Direct Connectivity was detected.
244+ // Direct connectivity is expected to be available when running from inside
245+ // GCP and connecting to a bucket in the same region.
246+ //
247+ // You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient]
248+ func CheckDirectConnectivitySupported (ctx context.Context , bucket string , opts ... option.ClientOption ) error {
249+ view := metric .NewView (
250+ metric.Instrument {
251+ Name : "grpc.client.attempt.duration" ,
252+ Kind : metric .InstrumentKindHistogram ,
253+ },
254+ metric.Stream {AttributeFilter : attribute .NewAllowKeysFilter ("grpc.lb.locality" )},
255+ )
256+ mr := metric .NewManualReader ()
257+ provider := metric .NewMeterProvider (metric .WithReader (mr ), metric .WithView (view ))
258+ // Provider handles shutting down ManualReader
259+ defer provider .Shutdown (ctx )
260+ mo := opentelemetry.MetricsOptions {
261+ MeterProvider : provider ,
262+ Metrics : stats .NewMetrics ("grpc.client.attempt.duration" ),
263+ OptionalLabels : []string {"grpc.lb.locality" },
264+ }
265+ combinedOpts := append (opts , WithDisabledClientMetrics (), option .WithGRPCDialOption (opentelemetry .DialOption (opentelemetry.Options {MetricsOptions : mo })))
266+ client , err := NewGRPCClient (ctx , combinedOpts ... )
267+ if err != nil {
268+ return fmt .Errorf ("storage.NewGRPCClient: %w" , err )
269+ }
270+ defer client .Close ()
271+ if _ , err = client .Bucket (bucket ).Attrs (ctx ); err != nil {
272+ return fmt .Errorf ("Bucket.Attrs: %w" , err )
273+ }
274+ // Call manual reader to collect metric
275+ rm := metricdata.ResourceMetrics {}
276+ if err = mr .Collect (context .Background (), & rm ); err != nil {
277+ return fmt .Errorf ("ManualReader.Collect: %w" , err )
278+ }
279+ for _ , sm := range rm .ScopeMetrics {
280+ for _ , m := range sm .Metrics {
281+ if m .Name == "grpc.client.attempt.duration" {
282+ hist := m .Data .(metricdata.Histogram [float64 ])
283+ for _ , d := range hist .DataPoints {
284+ v , present := d .Attributes .Value ("grpc.lb.locality" )
285+ if present && v .AsString () != "" {
286+ return nil
287+ }
288+ }
289+ }
290+ }
291+ }
292+ return errors .New ("storage: direct connectivity not detected" )
293+ }
294+
236295// Close closes the Client.
237296//
238297// Close need not be called at program exit.
0 commit comments