@@ -27,6 +27,7 @@ import (
2727 "sort"
2828 "strings"
2929 "sync"
30+ "sync/atomic"
3031 "testing"
3132 "time"
3233
@@ -93,7 +94,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
9394 attribute .String (metricLabelKeyClientUID , clientUID ),
9495 attribute .String (metricLabelKeyClientName , clientName ),
9596 }
96- wantMetricNamesStdout := []string {metricNameAttemptLatencies , metricNameAttemptLatencies , metricNameOperationLatencies , metricNameRetryCount , metricNameServerLatencies }
97+ wantMetricNamesStdout := []string {metricNameAttemptLatencies , metricNameAttemptLatencies , metricNameConnErrCount , metricNameConnErrCount , metricNameOperationLatencies , metricNameRetryCount , metricNameServerLatencies }
9798 wantMetricTypesGCM := []string {}
9899 for _ , wantMetricName := range wantMetricNamesStdout {
99100 wantMetricTypesGCM = append (wantMetricTypesGCM , builtInMetricsMeterName + wantMetricName )
@@ -211,7 +212,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
211212 gotNonNilInstruments := gotClient .metricsTracerFactory .operationLatencies != nil &&
212213 gotClient .metricsTracerFactory .serverLatencies != nil &&
213214 gotClient .metricsTracerFactory .attemptLatencies != nil &&
214- gotClient .metricsTracerFactory .retryCount != nil
215+ gotClient .metricsTracerFactory .retryCount != nil &&
216+ gotClient .metricsTracerFactory .connErrCount != nil
215217 if test .wantBuiltinEnabled != gotNonNilInstruments {
216218 t .Errorf ("NonNilInstruments: got: %v, want: %v" , gotNonNilInstruments , test .wantBuiltinEnabled )
217219 }
@@ -301,6 +303,160 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
301303 }
302304}
303305
306+ func TestConnectivityErrorCount (t * testing.T ) {
307+ ctx := context .Background ()
308+ project := "test-project"
309+ instance := "test-instance"
310+ appProfile := "test-app-profile"
311+
312+ // Increase sampling period to simulate potential delays
313+ origSamplePeriod := defaultSamplePeriod
314+ defaultSamplePeriod = 500 * time .Millisecond
315+ defer func () {
316+ defaultSamplePeriod = origSamplePeriod
317+ }()
318+
319+ // Setup mock monitoring server
320+ monitoringServer , err := NewMetricTestServer ()
321+ if err != nil {
322+ t .Fatalf ("Error setting up metrics test server: %v" , err )
323+ }
324+ go monitoringServer .Serve ()
325+ defer monitoringServer .Shutdown ()
326+
327+ // Override exporter options to connect to the mock server
328+ origCreateExporterOptions := createExporterOptions
329+ createExporterOptions = func (opts ... option.ClientOption ) []option.ClientOption {
330+ return []option.ClientOption {
331+ option .WithEndpoint (monitoringServer .Endpoint ),
332+ option .WithoutAuthentication (),
333+ option .WithGRPCDialOption (grpc .WithTransportCredentials (insecure .NewCredentials ())),
334+ }
335+ }
336+ defer func () {
337+ createExporterOptions = origCreateExporterOptions
338+ }()
339+
340+ // Control structure for mock server behavior during the specific ReadRows call.
341+ // We use a channel to signal the interceptor that the ReadRows call under test is active.
342+ readRowsCallActive := make (chan bool , 1 )
343+ var testSpecificAttemptCount int32
344+
345+ serverStreamInterceptor := func (srv interface {}, ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
346+ if strings .HasSuffix (info .FullMethod , "ReadRows" ) {
347+ select {
348+ case <- readRowsCallActive :
349+ currentTestAttempt := atomic .AddInt32 (& testSpecificAttemptCount , 1 )
350+ if currentTestAttempt == 1 {
351+ // Put the token back for subsequent retries of this specific call.
352+ readRowsCallActive <- true
353+ return status .Error (codes .Unavailable , "Mock Unavailable error for connectivity test" )
354+ }
355+ if currentTestAttempt == 2 {
356+ header := metadata .New (map [string ]string {
357+ locationMDKey : string (testHeaders ),
358+ })
359+ if errH := ss .SendHeader (header ); errH != nil {
360+ t .Errorf ("[ServerInterceptor Attempt 2] Error sending header: %v" , errH )
361+ }
362+
363+ // Send a minimal successful message to ensure headers are processed by the client.
364+ emptyResp := & btpb.ReadRowsResponse {}
365+ if errS := ss .SendMsg (emptyResp ); errS != nil {
366+ t .Errorf ("[ServerInterceptor Attempt 2] Error sending empty message: %v" , errS )
367+ return status .Errorf (codes .Internal , "mock server failed to send empty message: %v" , errS )
368+ }
369+
370+ readRowsCallActive <- true
371+ return status .Error (codes .Unavailable , "Mock Unavailable error with location headers" )
372+ }
373+
374+ // On the third and final attempt, cause a non-retriable error.
375+ atomic .StoreInt32 (& testSpecificAttemptCount , 0 )
376+ // Do not put the token back, as this is the final attempt for this ReadRows sequence.
377+ return status .Error (codes .Internal , "non-retriable error" )
378+ default :
379+ return handler (srv , ss )
380+ }
381+ }
382+ return handler (srv , ss )
383+ }
384+
385+ config := ClientConfig {AppProfile : appProfile }
386+ tbl , cleanup , gotErr := setupFakeServer (project , instance , config , grpc .StreamInterceptor (serverStreamInterceptor ))
387+ defer cleanup ()
388+ if gotErr != nil {
389+ t .Fatalf ("setupFakeServer error: got: %v, want: nil" , gotErr )
390+ }
391+
392+ // Pop out any old requests from the monitoring server to ensure a clean state.
393+ monitoringServer .CreateServiceTimeSeriesRequests ()
394+ atomic .StoreInt32 (& testSpecificAttemptCount , 0 )
395+
396+ readRowsCallActive <- true
397+
398+ // Perform a read rows operation that will undergo a specific retry sequence:
399+ // Attempt 1: Fails with Unavailable (no server headers) -> conn error count = 1
400+ // Attempt 2: Fails with Unavailable (with location header) -> conn error count = 0
401+ // Attempt 3: Fails with Internal (no server headers) -> conn error count = 1
402+ // The overall operation fails with the final Internal error.
403+ err = tbl .ReadRows (ctx , NewRange ("a" , "z" ), func (r Row ) bool { return true })
404+ if err == nil {
405+ t .Fatal ("ReadRows: got nil error, want an error" )
406+ }
407+ if status .Code (err ) != codes .Internal {
408+ t .Fatalf ("ReadRows: got error code %v, want %v" , status .Code (err ), codes .Internal )
409+ }
410+
411+ // Wait a bit for metrics to be exported. The defaultSamplePeriod is 500ms,
412+ // so waiting slightly longer should be sufficient.
413+ // If tests are flaky, this might need adjustment or a more sophisticated wait.
414+ time .Sleep (defaultSamplePeriod + 200 * time .Millisecond )
415+
416+ var totalConnectivityErrorsFromMetrics int64
417+ statusesReported := make (map [string ]int64 )
418+ foundConnErrMetricForTest := false
419+
420+ exportedMetricBatches := monitoringServer .CreateServiceTimeSeriesRequests ()
421+ for _ , batch := range exportedMetricBatches {
422+ for _ , ts := range batch .TimeSeries {
423+ if strings .HasSuffix (ts .Metric .Type , metricNameConnErrCount ) {
424+ methodLabel , ok := ts .Metric .Labels [metricLabelKeyMethod ]
425+ if ! ok || methodLabel != "Bigtable.ReadRows" {
426+ continue
427+ }
428+ foundConnErrMetricForTest = true
429+ statusKey := ts .Metric .Labels [metricLabelKeyStatus ]
430+ for _ , point := range ts .Points {
431+ // Summing up values from points. For a counter, this is the delta.
432+ // We expect each reported error to be a single point with a value of 1.
433+ statusesReported [statusKey ] += point .GetValue ().GetInt64Value ()
434+ totalConnectivityErrorsFromMetrics += point .GetValue ().GetInt64Value ()
435+ }
436+ }
437+ }
438+ }
439+
440+ if ! foundConnErrMetricForTest {
441+ t .Fatalf ("Metric %s for method Bigtable.ReadRows was not found in exported metrics. Batches received: %+v" , metricNameConnErrCount , exportedMetricBatches )
442+ }
443+
444+ if statusesReported [codes .Unavailable .String ()] != 1 {
445+ t .Errorf ("Metric %s for status %s: got cumulative value %d, want 1. All statuses: %v" ,
446+ metricNameConnErrCount , codes .Unavailable .String (), statusesReported [codes .Unavailable .String ()], statusesReported )
447+ }
448+ if statusesReported [codes .Internal .String ()] != 1 {
449+ t .Errorf ("Metric %s for status %s: got cumulative value %d, want 1. All statuses: %v" ,
450+ metricNameConnErrCount , codes .Internal .String (), statusesReported [codes .Internal .String ()], statusesReported )
451+ }
452+
453+ // The total connectivity errors should be 2.
454+ // Attempt 2 (Unavailable, with location) should not increment the error count.
455+ if totalConnectivityErrorsFromMetrics != 2 {
456+ t .Errorf ("Metric %s: got cumulative value %d, want 2. Statuses reported: %v" ,
457+ metricNameConnErrCount , totalConnectivityErrorsFromMetrics , statusesReported )
458+ }
459+ }
304460func setMockErrorHandler (t * testing.T , mockErrorHandler * MockErrorHandler ) {
305461 origErrHandler := otel .GetErrorHandler ()
306462 otel .SetErrorHandler (mockErrorHandler )
0 commit comments