Skip to content

Commit 5f03fb3

Browse files
authored
feat(bigtable): Record client_blocking_latencies (#12698)
* feat(bigtable): Record client_blocking_latencies * remove mutex * Use attempt start time to calculate client blocking latency * use SentTime of OutPayload
1 parent 7fa61a2 commit 5f03fb3

3 files changed

Lines changed: 124 additions & 34 deletions

File tree

bigtable/bigtable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
125125
}
126126
// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
127127
o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
128-
128+
o = append(o, option.WithGRPCDialOption(grpc.WithStatsHandler(sharedLatencyStatsHandler)))
129129
// Default to a small connection pool that can be overridden.
130130
o = append(o,
131131
option.WithGRPCConnectionPool(4),

bigtable/metrics.go

Lines changed: 114 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"os"
2424
"reflect"
25+
"sync/atomic"
2526
"time"
2627

2728
"cloud.google.com/go/bigtable/internal"
@@ -31,6 +32,7 @@ import (
3132
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
3233
"google.golang.org/api/option"
3334
"google.golang.org/grpc/metadata"
35+
"google.golang.org/grpc/stats"
3436
)
3537

3638
const (
@@ -59,21 +61,28 @@ const (
5961
metricLabelKeyClientUID = "client_uid"
6062

6163
// Metric names
62-
metricNameOperationLatencies = "operation_latencies"
63-
metricNameAttemptLatencies = "attempt_latencies"
64-
metricNameServerLatencies = "server_latencies"
65-
metricNameAppBlockingLatencies = "application_latencies"
66-
metricNameFirstRespLatencies = "first_response_latencies"
67-
metricNameRetryCount = "retry_count"
68-
metricNameDebugTags = "debug_tags"
69-
metricNameConnErrCount = "connectivity_error_count"
64+
metricNameOperationLatencies = "operation_latencies"
65+
metricNameAttemptLatencies = "attempt_latencies"
66+
metricNameServerLatencies = "server_latencies"
67+
metricNameAppBlockingLatencies = "application_latencies"
68+
metricNameClientBlockingLatencies = "throttling_latencies"
69+
metricNameFirstRespLatencies = "first_response_latencies"
70+
metricNameRetryCount = "retry_count"
71+
metricNameDebugTags = "debug_tags"
72+
metricNameConnErrCount = "connectivity_error_count"
7073

7174
// Metric units
7275
metricUnitMS = "ms"
7376
metricUnitCount = "1"
7477
maxAttrsLen = 12 // Monitored resource labels + Metric labels
7578
)
7679

80+
type contextKey string
81+
82+
const (
83+
statsContextKey contextKey = "bigtable/clientBlockingLatencyTracker"
84+
)
85+
7786
// These are effectively constant, but for testing purposes they are mutable
7887
var (
7988
// duration between two metric exports
@@ -119,6 +128,9 @@ var (
119128
recordedPerAttempt: false,
120129
},
121130
metricNameAppBlockingLatencies: {},
131+
metricNameClientBlockingLatencies: {
132+
recordedPerAttempt: true,
133+
},
122134
metricNameRetryCount: {
123135
additionalAttrs: []string{
124136
metricLabelKeyStatus,
@@ -157,6 +169,8 @@ var (
157169
}
158170
return filteredOptions
159171
}
172+
173+
sharedLatencyStatsHandler = &latencyStatsHandler{}
160174
)
161175

162176
type metricInfo struct {
@@ -174,14 +188,15 @@ type builtinMetricsTracerFactory struct {
174188
// do not change across different function calls on client
175189
clientAttributes []attribute.KeyValue
176190

177-
operationLatencies metric.Float64Histogram
178-
serverLatencies metric.Float64Histogram
179-
attemptLatencies metric.Float64Histogram
180-
firstRespLatencies metric.Float64Histogram
181-
appBlockingLatencies metric.Float64Histogram
182-
retryCount metric.Int64Counter
183-
connErrCount metric.Int64Counter
184-
debugTags metric.Int64Counter
191+
operationLatencies metric.Float64Histogram
192+
serverLatencies metric.Float64Histogram
193+
attemptLatencies metric.Float64Histogram
194+
firstRespLatencies metric.Float64Histogram
195+
appBlockingLatencies metric.Float64Histogram
196+
clientBlockingLatencies metric.Float64Histogram
197+
retryCount metric.Int64Counter
198+
connErrCount metric.Int64Counter
199+
debugTags metric.Int64Counter
185200
}
186201

187202
func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
@@ -329,6 +344,17 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
329344
return err
330345
}
331346

347+
// Create client_blocking_latencies
348+
tf.clientBlockingLatencies, err = meter.Float64Histogram(
349+
metricNameClientBlockingLatencies,
350+
metric.WithDescription("The latencies of requests queued on gRPC channels."),
351+
metric.WithUnit(metricUnitMS),
352+
metric.WithExplicitBucketBoundaries(bucketBounds...),
353+
)
354+
if err != nil {
355+
return err
356+
}
357+
332358
// Create retry_count
333359
tf.retryCount, err = meter.Int64Counter(
334360
metricNameRetryCount,
@@ -369,14 +395,15 @@ type builtinMetricsTracer struct {
369395
// do not change across different operations on client
370396
clientAttributes []attribute.KeyValue
371397

372-
instrumentOperationLatencies metric.Float64Histogram
373-
instrumentServerLatencies metric.Float64Histogram
374-
instrumentAttemptLatencies metric.Float64Histogram
375-
instrumentFirstRespLatencies metric.Float64Histogram
376-
instrumentAppBlockingLatencies metric.Float64Histogram
377-
instrumentRetryCount metric.Int64Counter
378-
instrumentConnErrCount metric.Int64Counter
379-
instrumentDebugTags metric.Int64Counter
398+
instrumentOperationLatencies metric.Float64Histogram
399+
instrumentServerLatencies metric.Float64Histogram
400+
instrumentAttemptLatencies metric.Float64Histogram
401+
instrumentFirstRespLatencies metric.Float64Histogram
402+
instrumentAppBlockingLatencies metric.Float64Histogram
403+
instrumentClientBlockingLatencies metric.Float64Histogram
404+
instrumentRetryCount metric.Int64Counter
405+
instrumentConnErrCount metric.Int64Counter
406+
instrumentDebugTags metric.Int64Counter
380407

381408
tableName string
382409
method string
@@ -432,6 +459,9 @@ type attemptTracer struct {
432459

433460
// Error seen while getting server latency from headers/trailers
434461
serverLatencyErr error
462+
463+
// Tracker for client blocking latency
464+
blockingLatencyTracker *blockingLatencyTracker
435465
}
436466

437467
func (a *attemptTracer) setStartTime(t time.Time) {
@@ -474,14 +504,15 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
474504
currOp: currOpTracer,
475505
clientAttributes: tf.clientAttributes,
476506

477-
instrumentOperationLatencies: tf.operationLatencies,
478-
instrumentServerLatencies: tf.serverLatencies,
479-
instrumentAttemptLatencies: tf.attemptLatencies,
480-
instrumentFirstRespLatencies: tf.firstRespLatencies,
481-
instrumentAppBlockingLatencies: tf.appBlockingLatencies,
482-
instrumentRetryCount: tf.retryCount,
483-
instrumentConnErrCount: tf.connErrCount,
484-
instrumentDebugTags: tf.debugTags,
507+
instrumentOperationLatencies: tf.operationLatencies,
508+
instrumentServerLatencies: tf.serverLatencies,
509+
instrumentAttemptLatencies: tf.attemptLatencies,
510+
instrumentFirstRespLatencies: tf.firstRespLatencies,
511+
instrumentAppBlockingLatencies: tf.appBlockingLatencies,
512+
instrumentClientBlockingLatencies: tf.clientBlockingLatencies,
513+
instrumentRetryCount: tf.retryCount,
514+
instrumentConnErrCount: tf.connErrCount,
515+
instrumentDebugTags: tf.debugTags,
485516

486517
tableName: tableName,
487518
isStreaming: isStreaming,
@@ -585,6 +616,15 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT
585616
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
586617
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributeSet(attemptLatAttrs))
587618

619+
// Record client_blocking_latencies
620+
var clientBlockingLatencyMs float64
621+
if mt.currOp.currAttempt.blockingLatencyTracker != nil {
622+
messageSentNanos := mt.currOp.currAttempt.blockingLatencyTracker.getMessageSentNanos()
623+
clientBlockingLatencyMs = convertToMs(time.Unix(0, int64(messageSentNanos)).Sub(mt.currOp.currAttempt.startTime))
624+
}
625+
clientBlockingLatAttrs, _ := mt.toOtelMetricAttrs(metricNameClientBlockingLatencies)
626+
mt.instrumentClientBlockingLatencies.Record(mt.ctx, clientBlockingLatencyMs, metric.WithAttributeSet(clientBlockingLatAttrs))
627+
588628
// Record server_latencies
589629
serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies)
590630
if mt.currOp.currAttempt.serverLatencyErr == nil {
@@ -651,3 +691,45 @@ func (mt *builtinMetricsTracer) incrementAppBlockingLatency(latency float64) {
651691

652692
mt.currOp.incrementAppBlockingLatency(latency)
653693
}
694+
695+
// blockingLatencyTracker is used to calculate the time between stream creation and the first message send.
696+
type blockingLatencyTracker struct {
697+
endNanos atomic.Int64
698+
}
699+
700+
func (t *blockingLatencyTracker) recordLatency(end time.Time) {
701+
endN := end.UnixNano()
702+
// Ensure that only the time of the first OutPayload event is recorded.
703+
t.endNanos.CompareAndSwap(0, endN)
704+
}
705+
706+
func (t *blockingLatencyTracker) getMessageSentNanos() int64 {
707+
return t.endNanos.Load()
708+
}
709+
710+
// latencyStatsHandler is a gRPC stats.Handler to measure client blocking latency.
711+
type latencyStatsHandler struct{}
712+
713+
var _ stats.Handler = (*latencyStatsHandler)(nil)
714+
715+
func (h *latencyStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
716+
// The tracker should already be in the context, added by gaxInvokeWithRecorder.
717+
return ctx
718+
}
719+
720+
func (h *latencyStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
721+
tracker, ok := ctx.Value(statsContextKey).(*blockingLatencyTracker)
722+
if !ok {
723+
return
724+
}
725+
726+
if op, ok := s.(*stats.OutPayload); ok {
727+
tracker.recordLatency(op.SentTime)
728+
}
729+
}
730+
731+
func (h *latencyStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
732+
return ctx
733+
}
734+
735+
func (h *latencyStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

bigtable/metrics_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,15 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
178178
appProfile := "test-app-profile"
179179
clientUID := "test-uid"
180180

181-
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies, metricNameAppBlockingLatencies}
181+
wantMetricNamesStdout := []string{
182+
metricNameAttemptLatencies, metricNameAttemptLatencies,
183+
metricNameConnErrCount, metricNameConnErrCount,
184+
metricNameOperationLatencies,
185+
metricNameRetryCount,
186+
metricNameServerLatencies,
187+
metricNameClientBlockingLatencies, metricNameClientBlockingLatencies,
188+
metricNameAppBlockingLatencies,
189+
}
182190
wantMetricTypesGCM := []string{}
183191
for _, wantMetricName := range wantMetricNamesStdout {
184192
wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName)

0 commit comments

Comments
 (0)