Skip to content

Commit dd45fd7

Browse files
authored
feat(bigtable): connectivity_error_count client side metric (#12438)
* feat(bigtable): connectivity_error_count client side metric * test(bigtable): validate exported value of connectivity_error_count * test: poll for expected metrics * logs * fix tests * revert changes
1 parent d2cdb3c commit dd45fd7

3 files changed

Lines changed: 191 additions & 2 deletions

File tree

bigtable/bigtable.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2322,4 +2322,20 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) {
23222322
if mt.currOp.currAttempt.serverLatencyErr == nil {
23232323
mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributeSet(serverLatAttrs))
23242324
}
2325+
2326+
// Record connectivity_error_count
2327+
connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount)
2328+
// Determine if connection error should be incremented.
2329+
// A true connectivity error occurs only when we receive NO server-side signals.
2330+
// 1. Server latency (from server-timing header) is a signal, but absent in DirectPath.
2331+
// 2. Location (from x-goog-ext header) is a signal present in both paths.
2332+
// Therefore, we only count an error if BOTH signals are missing.
2333+
isServerLatencyEffectivelyEmpty := mt.currOp.currAttempt.serverLatencyErr != nil || mt.currOp.currAttempt.serverLatency == 0
2334+
isLocationEmpty := mt.currOp.currAttempt.clusterID == defaultCluster
2335+
if isServerLatencyEffectivelyEmpty && isLocationEmpty {
2336+
// This is a connectivity error: the request likely never reached Google's network.
2337+
mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributeSet(connErrCountAttrs))
2338+
} else {
2339+
mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributeSet(connErrCountAttrs))
2340+
}
23252341
}

bigtable/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const (
6262
metricNameServerLatencies = "server_latencies"
6363
metricNameRetryCount = "retry_count"
6464
metricNameDebugTags = "debug_tags"
65+
metricNameConnErrCount = "connectivity_error_count"
6566

6667
// Metric units
6768
metricUnitMS = "ms"
@@ -113,6 +114,12 @@ var (
113114
},
114115
recordedPerAttempt: true,
115116
},
117+
metricNameConnErrCount: {
118+
additionalAttrs: []string{
119+
metricLabelKeyStatus,
120+
},
121+
recordedPerAttempt: true,
122+
},
116123
}
117124

118125
// Generates unique client ID in the format go-<random UUID>@<hostname>
@@ -161,6 +168,7 @@ type builtinMetricsTracerFactory struct {
161168
serverLatencies metric.Float64Histogram
162169
attemptLatencies metric.Float64Histogram
163170
retryCount metric.Int64Counter
171+
connErrCount metric.Int64Counter
164172
debugTags metric.Int64Counter
165173
}
166174

@@ -271,6 +279,13 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
271279
return err
272280
}
273281

282+
// Create connectivity_error_count
283+
tf.connErrCount, err = meter.Int64Counter(
284+
metricNameConnErrCount,
285+
metric.WithDescription("Number of requests that failed to reach the Google datacenter. (Requests without google response headers"),
286+
metric.WithUnit(metricUnitCount),
287+
)
288+
274289
// Create debug_tags
275290
tf.debugTags, err = meter.Int64Counter(
276291
metricNameDebugTags,
@@ -295,6 +310,7 @@ type builtinMetricsTracer struct {
295310
instrumentServerLatencies metric.Float64Histogram
296311
instrumentAttemptLatencies metric.Float64Histogram
297312
instrumentRetryCount metric.Int64Counter
313+
instrumentConnErrCount metric.Int64Counter
298314
instrumentDebugTags metric.Int64Counter
299315

300316
tableName string
@@ -392,6 +408,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
392408
instrumentServerLatencies: tf.serverLatencies,
393409
instrumentAttemptLatencies: tf.attemptLatencies,
394410
instrumentRetryCount: tf.retryCount,
411+
instrumentConnErrCount: tf.connErrCount,
395412
instrumentDebugTags: tf.debugTags,
396413

397414
tableName: tableName,

bigtable/metrics_test.go

Lines changed: 158 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sort"
2828
"strings"
2929
"sync"
30+
"sync/atomic"
3031
"testing"
3132
"time"
3233

@@ -93,7 +94,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
9394
attribute.String(metricLabelKeyClientUID, clientUID),
9495
attribute.String(metricLabelKeyClientName, clientName),
9596
}
96-
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}
97+
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}
9798
wantMetricTypesGCM := []string{}
9899
for _, wantMetricName := range wantMetricNamesStdout {
99100
wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName)
@@ -211,7 +212,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
211212
gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil &&
212213
gotClient.metricsTracerFactory.serverLatencies != nil &&
213214
gotClient.metricsTracerFactory.attemptLatencies != nil &&
214-
gotClient.metricsTracerFactory.retryCount != nil
215+
gotClient.metricsTracerFactory.retryCount != nil &&
216+
gotClient.metricsTracerFactory.connErrCount != nil
215217
if test.wantBuiltinEnabled != gotNonNilInstruments {
216218
t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled)
217219
}
@@ -301,6 +303,160 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
301303
}
302304
}
303305

306+
func TestConnectivityErrorCount(t *testing.T) {
307+
ctx := context.Background()
308+
project := "test-project"
309+
instance := "test-instance"
310+
appProfile := "test-app-profile"
311+
312+
// Increase sampling period to simulate potential delays
313+
origSamplePeriod := defaultSamplePeriod
314+
defaultSamplePeriod = 500 * time.Millisecond
315+
defer func() {
316+
defaultSamplePeriod = origSamplePeriod
317+
}()
318+
319+
// Setup mock monitoring server
320+
monitoringServer, err := NewMetricTestServer()
321+
if err != nil {
322+
t.Fatalf("Error setting up metrics test server: %v", err)
323+
}
324+
go monitoringServer.Serve()
325+
defer monitoringServer.Shutdown()
326+
327+
// Override exporter options to connect to the mock server
328+
origCreateExporterOptions := createExporterOptions
329+
createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption {
330+
return []option.ClientOption{
331+
option.WithEndpoint(monitoringServer.Endpoint),
332+
option.WithoutAuthentication(),
333+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
334+
}
335+
}
336+
defer func() {
337+
createExporterOptions = origCreateExporterOptions
338+
}()
339+
340+
// Control structure for mock server behavior during the specific ReadRows call.
341+
// We use a channel to signal the interceptor that the ReadRows call under test is active.
342+
readRowsCallActive := make(chan bool, 1)
343+
var testSpecificAttemptCount int32
344+
345+
serverStreamInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
346+
if strings.HasSuffix(info.FullMethod, "ReadRows") {
347+
select {
348+
case <-readRowsCallActive:
349+
currentTestAttempt := atomic.AddInt32(&testSpecificAttemptCount, 1)
350+
if currentTestAttempt == 1 {
351+
// Put the token back for subsequent retries of this specific call.
352+
readRowsCallActive <- true
353+
return status.Error(codes.Unavailable, "Mock Unavailable error for connectivity test")
354+
}
355+
if currentTestAttempt == 2 {
356+
header := metadata.New(map[string]string{
357+
locationMDKey: string(testHeaders),
358+
})
359+
if errH := ss.SendHeader(header); errH != nil {
360+
t.Errorf("[ServerInterceptor Attempt 2] Error sending header: %v", errH)
361+
}
362+
363+
// Send a minimal successful message to ensure headers are processed by the client.
364+
emptyResp := &btpb.ReadRowsResponse{}
365+
if errS := ss.SendMsg(emptyResp); errS != nil {
366+
t.Errorf("[ServerInterceptor Attempt 2] Error sending empty message: %v", errS)
367+
return status.Errorf(codes.Internal, "mock server failed to send empty message: %v", errS)
368+
}
369+
370+
readRowsCallActive <- true
371+
return status.Error(codes.Unavailable, "Mock Unavailable error with location headers")
372+
}
373+
374+
// On the third and final attempt, cause a non-retriable error.
375+
atomic.StoreInt32(&testSpecificAttemptCount, 0)
376+
// Do not put the token back, as this is the final attempt for this ReadRows sequence.
377+
return status.Error(codes.Internal, "non-retriable error")
378+
default:
379+
return handler(srv, ss)
380+
}
381+
}
382+
return handler(srv, ss)
383+
}
384+
385+
config := ClientConfig{AppProfile: appProfile}
386+
tbl, cleanup, gotErr := setupFakeServer(project, instance, config, grpc.StreamInterceptor(serverStreamInterceptor))
387+
defer cleanup()
388+
if gotErr != nil {
389+
t.Fatalf("setupFakeServer error: got: %v, want: nil", gotErr)
390+
}
391+
392+
// Pop out any old requests from the monitoring server to ensure a clean state.
393+
monitoringServer.CreateServiceTimeSeriesRequests()
394+
atomic.StoreInt32(&testSpecificAttemptCount, 0)
395+
396+
readRowsCallActive <- true
397+
398+
// Perform a read rows operation that will undergo a specific retry sequence:
399+
// Attempt 1: Fails with Unavailable (no server headers) -> conn error count = 1
400+
// Attempt 2: Fails with Unavailable (with location header) -> conn error count = 0
401+
// Attempt 3: Fails with Internal (no server headers) -> conn error count = 1
402+
// The overall operation fails with the final Internal error.
403+
err = tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool { return true })
404+
if err == nil {
405+
t.Fatal("ReadRows: got nil error, want an error")
406+
}
407+
if status.Code(err) != codes.Internal {
408+
t.Fatalf("ReadRows: got error code %v, want %v", status.Code(err), codes.Internal)
409+
}
410+
411+
// Wait a bit for metrics to be exported. The defaultSamplePeriod is 500ms,
412+
// so waiting slightly longer should be sufficient.
413+
// If tests are flaky, this might need adjustment or a more sophisticated wait.
414+
time.Sleep(defaultSamplePeriod + 200*time.Millisecond)
415+
416+
var totalConnectivityErrorsFromMetrics int64
417+
statusesReported := make(map[string]int64)
418+
foundConnErrMetricForTest := false
419+
420+
exportedMetricBatches := monitoringServer.CreateServiceTimeSeriesRequests()
421+
for _, batch := range exportedMetricBatches {
422+
for _, ts := range batch.TimeSeries {
423+
if strings.HasSuffix(ts.Metric.Type, metricNameConnErrCount) {
424+
methodLabel, ok := ts.Metric.Labels[metricLabelKeyMethod]
425+
if !ok || methodLabel != "Bigtable.ReadRows" {
426+
continue
427+
}
428+
foundConnErrMetricForTest = true
429+
statusKey := ts.Metric.Labels[metricLabelKeyStatus]
430+
for _, point := range ts.Points {
431+
// Summing up values from points. For a counter, this is the delta.
432+
// We expect each reported error to be a single point with a value of 1.
433+
statusesReported[statusKey] += point.GetValue().GetInt64Value()
434+
totalConnectivityErrorsFromMetrics += point.GetValue().GetInt64Value()
435+
}
436+
}
437+
}
438+
}
439+
440+
if !foundConnErrMetricForTest {
441+
t.Fatalf("Metric %s for method Bigtable.ReadRows was not found in exported metrics. Batches received: %+v", metricNameConnErrCount, exportedMetricBatches)
442+
}
443+
444+
if statusesReported[codes.Unavailable.String()] != 1 {
445+
t.Errorf("Metric %s for status %s: got cumulative value %d, want 1. All statuses: %v",
446+
metricNameConnErrCount, codes.Unavailable.String(), statusesReported[codes.Unavailable.String()], statusesReported)
447+
}
448+
if statusesReported[codes.Internal.String()] != 1 {
449+
t.Errorf("Metric %s for status %s: got cumulative value %d, want 1. All statuses: %v",
450+
metricNameConnErrCount, codes.Internal.String(), statusesReported[codes.Internal.String()], statusesReported)
451+
}
452+
453+
// The total connectivity errors should be 2.
454+
// Attempt 2 (Unavailable, with location) should not increment the error count.
455+
if totalConnectivityErrorsFromMetrics != 2 {
456+
t.Errorf("Metric %s: got cumulative value %d, want 2. Statuses reported: %v",
457+
metricNameConnErrCount, totalConnectivityErrorsFromMetrics, statusesReported)
458+
}
459+
}
304460
func setMockErrorHandler(t *testing.T, mockErrorHandler *MockErrorHandler) {
305461
origErrHandler := otel.GetErrorHandler()
306462
otel.SetErrorHandler(mockErrorHandler)

0 commit comments

Comments
 (0)