@@ -22,6 +22,7 @@ import (
2222 "fmt"
2323 "os"
2424 "reflect"
25+ "sync/atomic"
2526 "time"
2627
2728 "cloud.google.com/go/bigtable/internal"
@@ -31,6 +32,7 @@ import (
3132 sdkmetric "go.opentelemetry.io/otel/sdk/metric"
3233 "google.golang.org/api/option"
3334 "google.golang.org/grpc/metadata"
35+ "google.golang.org/grpc/stats"
3436)
3537
3638const (
@@ -59,21 +61,28 @@ const (
5961 metricLabelKeyClientUID = "client_uid"
6062
6163 // Metric names
62- metricNameOperationLatencies = "operation_latencies"
63- metricNameAttemptLatencies = "attempt_latencies"
64- metricNameServerLatencies = "server_latencies"
65- metricNameAppBlockingLatencies = "application_latencies"
66- metricNameFirstRespLatencies = "first_response_latencies"
67- metricNameRetryCount = "retry_count"
68- metricNameDebugTags = "debug_tags"
69- metricNameConnErrCount = "connectivity_error_count"
64+ metricNameOperationLatencies = "operation_latencies"
65+ metricNameAttemptLatencies = "attempt_latencies"
66+ metricNameServerLatencies = "server_latencies"
67+ metricNameAppBlockingLatencies = "application_latencies"
68+ metricNameClientBlockingLatencies = "throttling_latencies"
69+ metricNameFirstRespLatencies = "first_response_latencies"
70+ metricNameRetryCount = "retry_count"
71+ metricNameDebugTags = "debug_tags"
72+ metricNameConnErrCount = "connectivity_error_count"
7073
7174 // Metric units
7275 metricUnitMS = "ms"
7376 metricUnitCount = "1"
7477 maxAttrsLen = 12 // Monitored resource labels + Metric labels
7578)
7679
80+ type contextKey string
81+
82+ const (
83+ statsContextKey contextKey = "bigtable/clientBlockingLatencyTracker"
84+ )
85+
7786// These are effectively constant, but for testing purposes they are mutable
7887var (
7988 // duration between two metric exports
@@ -119,6 +128,9 @@ var (
119128 recordedPerAttempt : false ,
120129 },
121130 metricNameAppBlockingLatencies : {},
131+ metricNameClientBlockingLatencies : {
132+ recordedPerAttempt : true ,
133+ },
122134 metricNameRetryCount : {
123135 additionalAttrs : []string {
124136 metricLabelKeyStatus ,
@@ -157,6 +169,8 @@ var (
157169 }
158170 return filteredOptions
159171 }
172+
173+ sharedLatencyStatsHandler = & latencyStatsHandler {}
160174)
161175
162176type metricInfo struct {
@@ -174,14 +188,15 @@ type builtinMetricsTracerFactory struct {
174188 // do not change across different function calls on client
175189 clientAttributes []attribute.KeyValue
176190
177- operationLatencies metric.Float64Histogram
178- serverLatencies metric.Float64Histogram
179- attemptLatencies metric.Float64Histogram
180- firstRespLatencies metric.Float64Histogram
181- appBlockingLatencies metric.Float64Histogram
182- retryCount metric.Int64Counter
183- connErrCount metric.Int64Counter
184- debugTags metric.Int64Counter
191+ operationLatencies metric.Float64Histogram
192+ serverLatencies metric.Float64Histogram
193+ attemptLatencies metric.Float64Histogram
194+ firstRespLatencies metric.Float64Histogram
195+ appBlockingLatencies metric.Float64Histogram
196+ clientBlockingLatencies metric.Float64Histogram
197+ retryCount metric.Int64Counter
198+ connErrCount metric.Int64Counter
199+ debugTags metric.Int64Counter
185200}
186201
187202func newBuiltinMetricsTracerFactory (ctx context.Context , project , instance , appProfile string , metricsProvider MetricsProvider , opts ... option.ClientOption ) (* builtinMetricsTracerFactory , error ) {
@@ -329,6 +344,17 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
329344 return err
330345 }
331346
347+ // Create client_blocking_latencies
348+ tf .clientBlockingLatencies , err = meter .Float64Histogram (
349+ metricNameClientBlockingLatencies ,
350+ metric .WithDescription ("The latencies of requests queued on gRPC channels." ),
351+ metric .WithUnit (metricUnitMS ),
352+ metric .WithExplicitBucketBoundaries (bucketBounds ... ),
353+ )
354+ if err != nil {
355+ return err
356+ }
357+
332358 // Create retry_count
333359 tf .retryCount , err = meter .Int64Counter (
334360 metricNameRetryCount ,
@@ -369,14 +395,15 @@ type builtinMetricsTracer struct {
369395 // do not change across different operations on client
370396 clientAttributes []attribute.KeyValue
371397
372- instrumentOperationLatencies metric.Float64Histogram
373- instrumentServerLatencies metric.Float64Histogram
374- instrumentAttemptLatencies metric.Float64Histogram
375- instrumentFirstRespLatencies metric.Float64Histogram
376- instrumentAppBlockingLatencies metric.Float64Histogram
377- instrumentRetryCount metric.Int64Counter
378- instrumentConnErrCount metric.Int64Counter
379- instrumentDebugTags metric.Int64Counter
398+ instrumentOperationLatencies metric.Float64Histogram
399+ instrumentServerLatencies metric.Float64Histogram
400+ instrumentAttemptLatencies metric.Float64Histogram
401+ instrumentFirstRespLatencies metric.Float64Histogram
402+ instrumentAppBlockingLatencies metric.Float64Histogram
403+ instrumentClientBlockingLatencies metric.Float64Histogram
404+ instrumentRetryCount metric.Int64Counter
405+ instrumentConnErrCount metric.Int64Counter
406+ instrumentDebugTags metric.Int64Counter
380407
381408 tableName string
382409 method string
@@ -432,6 +459,9 @@ type attemptTracer struct {
432459
433460 // Error seen while getting server latency from headers/trailers
434461 serverLatencyErr error
462+
463+ // Tracker for client blocking latency
464+ blockingLatencyTracker * blockingLatencyTracker
435465}
436466
437467func (a * attemptTracer ) setStartTime (t time.Time ) {
@@ -474,14 +504,15 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
474504 currOp : currOpTracer ,
475505 clientAttributes : tf .clientAttributes ,
476506
477- instrumentOperationLatencies : tf .operationLatencies ,
478- instrumentServerLatencies : tf .serverLatencies ,
479- instrumentAttemptLatencies : tf .attemptLatencies ,
480- instrumentFirstRespLatencies : tf .firstRespLatencies ,
481- instrumentAppBlockingLatencies : tf .appBlockingLatencies ,
482- instrumentRetryCount : tf .retryCount ,
483- instrumentConnErrCount : tf .connErrCount ,
484- instrumentDebugTags : tf .debugTags ,
507+ instrumentOperationLatencies : tf .operationLatencies ,
508+ instrumentServerLatencies : tf .serverLatencies ,
509+ instrumentAttemptLatencies : tf .attemptLatencies ,
510+ instrumentFirstRespLatencies : tf .firstRespLatencies ,
511+ instrumentAppBlockingLatencies : tf .appBlockingLatencies ,
512+ instrumentClientBlockingLatencies : tf .clientBlockingLatencies ,
513+ instrumentRetryCount : tf .retryCount ,
514+ instrumentConnErrCount : tf .connErrCount ,
515+ instrumentDebugTags : tf .debugTags ,
485516
486517 tableName : tableName ,
487518 isStreaming : isStreaming ,
@@ -585,6 +616,15 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT
585616 attemptLatAttrs , _ := mt .toOtelMetricAttrs (metricNameAttemptLatencies )
586617 mt .instrumentAttemptLatencies .Record (mt .ctx , elapsedTime , metric .WithAttributeSet (attemptLatAttrs ))
587618
619+ // Record client_blocking_latencies
620+ var clientBlockingLatencyMs float64
621+ if mt .currOp .currAttempt .blockingLatencyTracker != nil {
622+ messageSentNanos := mt .currOp .currAttempt .blockingLatencyTracker .getMessageSentNanos ()
623+ clientBlockingLatencyMs = convertToMs (time .Unix (0 , int64 (messageSentNanos )).Sub (mt .currOp .currAttempt .startTime ))
624+ }
625+ clientBlockingLatAttrs , _ := mt .toOtelMetricAttrs (metricNameClientBlockingLatencies )
626+ mt .instrumentClientBlockingLatencies .Record (mt .ctx , clientBlockingLatencyMs , metric .WithAttributeSet (clientBlockingLatAttrs ))
627+
588628 // Record server_latencies
589629 serverLatAttrs , _ := mt .toOtelMetricAttrs (metricNameServerLatencies )
590630 if mt .currOp .currAttempt .serverLatencyErr == nil {
@@ -651,3 +691,45 @@ func (mt *builtinMetricsTracer) incrementAppBlockingLatency(latency float64) {
651691
652692 mt .currOp .incrementAppBlockingLatency (latency )
653693}
694+
695+ // blockingLatencyTracker is used to calculate the time between stream creation and the first message send.
696+ type blockingLatencyTracker struct {
697+ endNanos atomic.Int64
698+ }
699+
700+ func (t * blockingLatencyTracker ) recordLatency (end time.Time ) {
701+ endN := end .UnixNano ()
702+ // Ensure that only the time of the first OutPayload event is recorded.
703+ t .endNanos .CompareAndSwap (0 , endN )
704+ }
705+
706+ func (t * blockingLatencyTracker ) getMessageSentNanos () int64 {
707+ return t .endNanos .Load ()
708+ }
709+
710+ // latencyStatsHandler is a gRPC stats.Handler to measure client blocking latency.
711+ type latencyStatsHandler struct {}
712+
713+ var _ stats.Handler = (* latencyStatsHandler )(nil )
714+
715+ func (h * latencyStatsHandler ) TagRPC (ctx context.Context , info * stats.RPCTagInfo ) context.Context {
716+ // The tracker should already be in the context, added by gaxInvokeWithRecorder.
717+ return ctx
718+ }
719+
720+ func (h * latencyStatsHandler ) HandleRPC (ctx context.Context , s stats.RPCStats ) {
721+ tracker , ok := ctx .Value (statsContextKey ).(* blockingLatencyTracker )
722+ if ! ok {
723+ return
724+ }
725+
726+ if op , ok := s .(* stats.OutPayload ); ok {
727+ tracker .recordLatency (op .SentTime )
728+ }
729+ }
730+
731+ func (h * latencyStatsHandler ) TagConn (ctx context.Context , info * stats.ConnTagInfo ) context.Context {
732+ return ctx
733+ }
734+
735+ func (h * latencyStatsHandler ) HandleConn (context.Context , stats.ConnStats ) {}
0 commit comments