Skip to content

Commit abb615e

Browse files
authored
fix(bigtable): Track number of readrows to set rowsLimit in subsequent requests (#10213)
* test(bigtable): Use rowslimit in proxy * test(bigtable): Limit rows only if specified by client * refactor(bigtable): Refactoring code * track rows read in the library instead of proxy
1 parent 793e7d0 commit abb615e

4 files changed

Lines changed: 200 additions & 5 deletions

File tree

bigtable/bigtable.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const prodAddr = "bigtable.googleapis.com:443"
5353
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
5454
const featureFlagsHeaderKey = "bigtable-features"
5555

56+
var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative")
57+
5658
// Client is a client for reading and writing data to tables in an instance.
5759
//
5860
// A Client is safe to use concurrently, except for its Close method.
@@ -391,7 +393,25 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
391393
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
392394
var prevRowKey string
393395
attrMap := make(map[string]interface{})
396+
397+
numRowsRead := int64(0)
398+
rowLimitSet := false
399+
intialRowLimit := int64(0)
400+
for _, opt := range opts {
401+
if l, ok := opt.(limitRows); ok {
402+
rowLimitSet = true
403+
intialRowLimit = l.limit
404+
}
405+
}
406+
if intialRowLimit < 0 {
407+
return errNegativeRowLimit
408+
}
409+
394410
err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
411+
if rowLimitSet && numRowsRead >= intialRowLimit {
412+
return nil
413+
}
414+
395415
req := &btpb.ReadRowsRequest{
396416
AppProfileId: t.c.appProfile,
397417
}
@@ -410,7 +430,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
410430
}
411431
req.Rows = arg.proto()
412432
}
413-
settings := makeReadSettings(req)
433+
settings := makeReadSettings(req, numRowsRead)
414434
for _, opt := range opts {
415435
opt.set(&settings)
416436
}
@@ -473,7 +493,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
473493
continue
474494
}
475495
prevRowKey = row.Key()
476-
if !f(row) {
496+
continueReading := f(row)
497+
numRowsRead++
498+
if !continueReading {
477499
// Cancel and drain stream.
478500
cancel()
479501
for {
@@ -939,14 +961,16 @@ type FullReadStatsFunc func(*FullReadStats)
939961
type readSettings struct {
940962
req *btpb.ReadRowsRequest
941963
fullReadStatsFunc FullReadStatsFunc
964+
numRowsRead int64
942965
}
943966

944-
func makeReadSettings(req *btpb.ReadRowsRequest) readSettings {
945-
return readSettings{req, nil}
967+
func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings {
968+
return readSettings{req, nil, numRowsRead}
946969
}
947970

948971
// A ReadOption is an optional argument to ReadRows.
949972
type ReadOption interface {
973+
// set modifies the request stored in the settings
950974
set(settings *readSettings)
951975
}
952976

@@ -965,7 +989,11 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} }
965989

966990
type limitRows struct{ limit int64 }
967991

968-
func (lr limitRows) set(settings *readSettings) { settings.req.RowsLimit = lr.limit }
992+
func (lr limitRows) set(settings *readSettings) {
993+
// Since 'numRowsRead' out of 'limit' requested rows have already been read,
994+
// the subsequest requests should fetch only the remaining rows.
995+
settings.req.RowsLimit = lr.limit - settings.numRowsRead
996+
}
969997

970998
// WithFullReadStats returns a ReadOption that will request FullReadStats
971999
// and invoke the given callback on the resulting FullReadStats.

bigtable/bigtable_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"encoding/binary"
23+
"errors"
2324
"reflect"
2425
"testing"
2526
"time"
@@ -715,6 +716,110 @@ func TestReadRowsRequestStats(t *testing.T) {
715716
}
716717
}
717718

719+
func TestReadRowsLimit(t *testing.T) {
720+
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
721+
if err != nil {
722+
t.Fatalf("NewEmulatedEnv failed: %v", err)
723+
}
724+
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
725+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
726+
)
727+
if err != nil {
728+
t.Fatalf("grpc.Dial failed: %v", err)
729+
}
730+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
731+
defer cancel()
732+
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
733+
if err != nil {
734+
t.Fatalf("NewClient failed: %v", err)
735+
}
736+
defer adminClient.Close()
737+
tableConf := &TableConf{
738+
TableID: testEnv.config.Table,
739+
Families: map[string]GCPolicy{
740+
"f": NoGcPolicy(),
741+
},
742+
}
743+
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
744+
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
745+
}
746+
747+
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
748+
if err != nil {
749+
t.Fatalf("NewClientWithConfig failed: %v", err)
750+
}
751+
defer client.Close()
752+
table := client.Open(testEnv.config.Table)
753+
754+
m := NewMutation()
755+
m.Set("f", "q", ServerTime, []byte("value"))
756+
if err = table.Apply(ctx, "row1", m); err != nil {
757+
t.Fatalf("Apply failed: %v", err)
758+
}
759+
760+
m = NewMutation()
761+
m.Set("f", "q", ServerTime, []byte("value"))
762+
m.Set("f", "q2", ServerTime, []byte("value2"))
763+
if err = table.Apply(ctx, "row2", m); err != nil {
764+
t.Fatalf("Apply failed: %v", err)
765+
}
766+
767+
m = NewMutation()
768+
m.Set("f", "excluded", ServerTime, []byte("value"))
769+
if err = table.Apply(ctx, "row3", m); err != nil {
770+
t.Fatalf("Apply failed: %v", err)
771+
}
772+
773+
for _, test := range []struct {
774+
desc string
775+
limit *int64
776+
wantRowCount int64
777+
wantErr error
778+
}{
779+
{
780+
desc: "No limit",
781+
wantRowCount: 3,
782+
},
783+
{
784+
desc: "Limit less than number of rows in table",
785+
limit: ptr(int64(2)),
786+
wantRowCount: 2,
787+
},
788+
{
789+
desc: "Limit greater than number of rows in table",
790+
limit: ptr(int64(5)),
791+
wantRowCount: 3,
792+
},
793+
{
794+
desc: "Negative row limit",
795+
limit: ptr(int64(-1)),
796+
wantErr: errNegativeRowLimit,
797+
},
798+
} {
799+
gotRowCount := int64(0)
800+
t.Run(test.desc, func(t *testing.T) {
801+
opts := []ReadOption{}
802+
if test.limit != nil {
803+
opts = append(opts, LimitRows(*test.limit))
804+
}
805+
if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
806+
gotRowCount++
807+
return true
808+
}, opts...); !errors.Is(err, test.wantErr) {
809+
t.Errorf("ReadRows err got: %v, want: %v", err, test.wantErr)
810+
}
811+
812+
if gotRowCount != test.wantRowCount {
813+
t.Errorf("ReadRows returned %d rows, want %d", gotRowCount, test.wantRowCount)
814+
}
815+
})
816+
}
817+
}
818+
819+
// ptr returns a pointer to its argument.
820+
// It can be used to initialize pointer fields:
821+
func ptr[T any](t T) *T { return &t }
822+
718823
// TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile
719824
func TestHeaderPopulatedWithAppProfile(t *testing.T) {
720825
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})

bigtable/internal/testproxy/proxy.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,13 +572,19 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
572572

573573
var c int32
574574
var rowsPb []*btpb.Row
575+
575576
lim := req.GetCancelAfterRows()
576577

577578
reversed := req.GetRequest().GetReversed()
578579
opts := []bigtable.ReadOption{}
579580
if reversed {
580581
opts = append(opts, bigtable.ReverseScan())
581582
}
583+
584+
rowsLimit := req.GetRequest().GetRowsLimit()
585+
if rowsLimit > 0 {
586+
opts = append(opts, bigtable.LimitRows(rowsLimit))
587+
}
582588
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {
583589

584590
c++

bigtable/retry_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,62 @@ func TestRetryReadRows(t *testing.T) {
460460
}
461461
}
462462

463+
func TestRetryReadRowsLimit(t *testing.T) {
464+
ctx := context.Background()
465+
466+
// Intercept requests and delegate to an interceptor defined by the test case
467+
errCount := 0
468+
var f func(grpc.ServerStream) error
469+
errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
470+
if strings.HasSuffix(info.FullMethod, "ReadRows") {
471+
return f(ss)
472+
}
473+
return handler(ctx, ss)
474+
}
475+
476+
tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector))
477+
defer cleanup()
478+
if err != nil {
479+
t.Fatalf("fake server setup: %v", err)
480+
}
481+
482+
initialRowLimit := int64(3)
483+
484+
errCount = 0
485+
// Test overall request failure and retries
486+
f = func(ss grpc.ServerStream) error {
487+
var err error
488+
req := new(btpb.ReadRowsRequest)
489+
must(ss.RecvMsg(req))
490+
switch errCount {
491+
case 0:
492+
if want, got := initialRowLimit, req.RowsLimit; want != got {
493+
t.Errorf("RowsLimit: got %v, want %v", got, want)
494+
}
495+
must(writeReadRowsResponse(ss, "a", "b"))
496+
err = status.Errorf(codes.Unavailable, "")
497+
case 1:
498+
if want, got := initialRowLimit-2, req.RowsLimit; want != got {
499+
t.Errorf("RowsLimit: got %v, want %v", got, want)
500+
}
501+
must(writeReadRowsResponse(ss, "c"))
502+
err = nil
503+
}
504+
errCount++
505+
return err
506+
}
507+
508+
var got []string
509+
must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
510+
got = append(got, r.Key())
511+
return true
512+
}, LimitRows(initialRowLimit)))
513+
want := []string{"a", "b", "c"}
514+
if !testutil.Equal(got, want) {
515+
t.Errorf("retry range integration: got %v, want %v", got, want)
516+
}
517+
}
518+
463519
func TestRetryReverseReadRows(t *testing.T) {
464520
ctx := context.Background()
465521

0 commit comments

Comments
 (0)