Skip to content

Commit ea00b17

Browse files
authored
feat(bigtable): Add PrepareStatement to run GoogleSQL queries (#11829)
* feat(bigtable): Add PrepareStatement to run GoogleSQL queries * undo rename * feat(bigtable): Add protoformat
1 parent aa91367 commit ea00b17

3 files changed

Lines changed: 194 additions & 26 deletions

File tree

bigtable/bigtable.go

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"google.golang.org/grpc/metadata"
4343
"google.golang.org/grpc/status"
4444
"google.golang.org/protobuf/proto"
45+
"google.golang.org/protobuf/types/known/timestamppb"
4546

4647
// Install google-c2p resolver, which is required for direct path.
4748
_ "google.golang.org/grpc/xds/googledirectpath"
@@ -240,6 +241,10 @@ func convertToGrpcStatusErr(err error) (codes.Code, error) {
240241
return codes.Unknown, err
241242
}
242243

244+
func (c *Client) fullInstanceName() string {
245+
return fmt.Sprintf("projects/%s/instances/%s", c.project, c.instance)
246+
}
247+
243248
func (c *Client) fullTableName(table string) string {
244249
return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
245250
}
@@ -252,10 +257,14 @@ func (c *Client) fullMaterializedViewName(materializedView string) string {
252257
return fmt.Sprintf("projects/%s/instances/%s/materializedViews/%s", c.project, c.instance, materializedView)
253258
}
254259

255-
func (c *Client) requestParamsHeaderValue(table string) string {
260+
func (c *Client) reqParamsHeaderValTable(table string) string {
256261
return fmt.Sprintf("table_name=%s&app_profile_id=%s", url.QueryEscape(c.fullTableName(table)), url.QueryEscape(c.appProfile))
257262
}
258263

264+
func (c *Client) reqParamsHeaderValInstance() string {
265+
return fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(c.fullInstanceName()), url.QueryEscape(c.appProfile))
266+
}
267+
259268
// mergeOutgoingMetadata returns a context populated by the existing outgoing
260269
// metadata merged with the provided mds.
261270
func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
@@ -319,7 +328,7 @@ func (c *Client) Open(table string) *Table {
319328
table: table,
320329
md: metadata.Join(metadata.Pairs(
321330
resourcePrefixHeader, c.fullTableName(table),
322-
requestParamsHeader, c.requestParamsHeaderValue(table),
331+
requestParamsHeader, c.reqParamsHeaderValTable(table),
323332
), c.newFeatureFlags()),
324333
}
325334
}
@@ -331,7 +340,7 @@ func (c *Client) OpenTable(table string) TableAPI {
331340
table: table,
332341
md: metadata.Join(metadata.Pairs(
333342
resourcePrefixHeader, c.fullTableName(table),
334-
requestParamsHeader, c.requestParamsHeaderValue(table),
343+
requestParamsHeader, c.reqParamsHeaderValTable(table),
335344
), c.newFeatureFlags()),
336345
}}
337346
}
@@ -343,7 +352,7 @@ func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI {
343352
table: table,
344353
md: metadata.Join(metadata.Pairs(
345354
resourcePrefixHeader, c.fullAuthorizedViewName(table, authorizedView),
346-
requestParamsHeader, c.requestParamsHeaderValue(table),
355+
requestParamsHeader, c.reqParamsHeaderValTable(table),
347356
), c.newFeatureFlags()),
348357
authorizedView: authorizedView,
349358
}}
@@ -355,12 +364,97 @@ func (c *Client) OpenMaterializedView(materializedView string) TableAPI {
355364
c: c,
356365
md: metadata.Join(metadata.Pairs(
357366
resourcePrefixHeader, c.fullMaterializedViewName(materializedView),
358-
requestParamsHeader, c.requestParamsHeaderValue(materializedView),
367+
requestParamsHeader, c.reqParamsHeaderValTable(materializedView),
359368
), c.newFeatureFlags()),
360369
materializedView: materializedView,
361370
}}
362371
}
363372

373+
// PreparedStatement stores the results of query preparation that can be used to
374+
// create [BoundStatements]s to execute queries.
375+
//
376+
// Whenever possible this should be shared across different instances of the same query,
377+
// in order to amortize query preparation costs.
378+
type PreparedStatement struct {
379+
c *Client
380+
paramTypes map[string]*btpb.Type
381+
// Structure of rows in the response stream of `ExecuteQueryResponse` for the
382+
// returned `prepared_query`.
383+
metadata *btpb.ResultSetMetadata
384+
// A serialized prepared query. It is an opaque
385+
// blob of bytes to send in `ExecuteQueryRequest`.
386+
preparedQuery []byte
387+
// The time at which the prepared query token becomes invalid.
388+
// A token may become invalid early due to changes in the data being read, but
389+
// it provides a guideline to refresh query plans asynchronously.
390+
validUntil *timestamppb.Timestamp
391+
}
392+
393+
// PrepareOption can be passed while preparing a query statement.
394+
type PrepareOption interface {
395+
}
396+
397+
// PrepareStatement prepares a query for execution. If possible, this should be called once and
398+
// reused across requests. This will amortize the cost of query preparation.
399+
func (c *Client) PrepareStatement(ctx context.Context, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (preparedStatement *PreparedStatement, err error) {
400+
md := metadata.Join(metadata.Pairs(
401+
resourcePrefixHeader, c.fullInstanceName(),
402+
requestParamsHeader, c.reqParamsHeaderValInstance(),
403+
), c.newFeatureFlags())
404+
405+
ctx = mergeOutgoingMetadata(ctx, md)
406+
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.PrepareQuery")
407+
defer func() { trace.EndSpan(ctx, err) }()
408+
409+
mt := c.newBuiltinMetricsTracer(ctx, "", false)
410+
defer recordOperationCompletion(mt)
411+
412+
preparedStatement, err = c.prepareStatement(ctx, mt, query, paramTypes, opts...)
413+
statusCode, statusErr := convertToGrpcStatusErr(err)
414+
mt.currOp.setStatus(statusCode.String())
415+
return preparedStatement, statusErr
416+
}
417+
418+
func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (*PreparedStatement, error) {
419+
reqParamTypes := map[string]*btpb.Type{}
420+
for k, v := range paramTypes {
421+
if v == nil {
422+
return nil, errors.New("bigtable: invalid SQLType: nil")
423+
}
424+
tpb, err := v.typeProto()
425+
if err != nil {
426+
return nil, err
427+
}
428+
reqParamTypes[k] = tpb
429+
}
430+
req := &btpb.PrepareQueryRequest{
431+
InstanceName: c.fullInstanceName(),
432+
AppProfileId: c.appProfile,
433+
Query: query,
434+
DataFormat: &btpb.PrepareQueryRequest_ProtoFormat{
435+
ProtoFormat: &btpb.ProtoFormat{},
436+
},
437+
ParamTypes: reqParamTypes,
438+
}
439+
var res *btpb.PrepareQueryResponse
440+
err := gaxInvokeWithRecorder(ctx, mt, "PrepareQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
441+
var err error
442+
res, err = c.client.PrepareQuery(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
443+
return err
444+
}, retryOptions...)
445+
if err != nil {
446+
return nil, err
447+
}
448+
449+
return &PreparedStatement{
450+
c: c,
451+
metadata: res.Metadata,
452+
preparedQuery: res.PreparedQuery,
453+
validUntil: res.ValidUntil,
454+
paramTypes: reqParamTypes,
455+
}, err
456+
}
457+
364458
func (ti *tableImpl) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
365459
return ti.Table.ReadRows(ctx, arg, f, opts...)
366460
}
@@ -1650,7 +1744,11 @@ func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]
16501744
}
16511745

16521746
func (t *Table) newBuiltinMetricsTracer(ctx context.Context, isStreaming bool) *builtinMetricsTracer {
1653-
mt := t.c.metricsTracerFactory.createBuiltinMetricsTracer(ctx, t.table, isStreaming)
1747+
return t.c.newBuiltinMetricsTracer(ctx, t.table, isStreaming)
1748+
}
1749+
1750+
func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isStreaming bool) *builtinMetricsTracer {
1751+
mt := c.metricsTracerFactory.createBuiltinMetricsTracer(ctx, table, isStreaming)
16541752
return &mt
16551753
}
16561754

bigtable/integration_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4736,6 +4736,65 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
47364736
}
47374737
}
47384738

4739+
func TestIntegration_PrepareStatement(t *testing.T) {
4740+
ctx := context.Background()
4741+
testEnv, client, _, _, _, cleanup, err := setupIntegration(ctx, t)
4742+
if err != nil {
4743+
t.Fatal(err)
4744+
}
4745+
defer cleanup()
4746+
4747+
if !testEnv.Config().UseProd {
4748+
t.Skip("emulator doesn't support PrepareQuery")
4749+
}
4750+
4751+
if _, err = client.PrepareStatement(ctx,
4752+
"SELECT @bytesParam as bytesCol, @stringParam AS strCol, @int64Param AS int64Col, "+
4753+
"@float32Param AS float32Col, @float64Param AS float64Col, @boolParam AS boolCol, "+
4754+
"@tsParam AS tsCol, @dateParam AS dateCol, @bytesArrayParam AS bytesArrayCol, "+
4755+
"@stringArrayParam AS stringArrayCol, @int64ArrayParam AS int64ArrayCol, "+
4756+
"@float32ArrayParam AS float32ArrayCol, @float64ArrayParam AS float64ArrayCol, "+
4757+
"@boolArrayParam AS boolArrayCol, @tsArrayParam AS tsArrayCol, "+
4758+
"@dateArrayParam AS dateArrayCol",
4759+
map[string]SQLType{
4760+
"bytesParam": BytesSQLType{},
4761+
"stringParam": StringSQLType{},
4762+
"int64Param": Int64SQLType{},
4763+
"float32Param": Float32SQLType{},
4764+
"float64Param": Float64SQLType{},
4765+
"boolParam": BoolSQLType{},
4766+
"tsParam": TimestampSQLType{},
4767+
"dateParam": DateSQLType{},
4768+
"bytesArrayParam": ArraySQLType{
4769+
ElemType: BytesSQLType{},
4770+
},
4771+
"stringArrayParam": ArraySQLType{
4772+
ElemType: StringSQLType{},
4773+
},
4774+
"int64ArrayParam": ArraySQLType{
4775+
ElemType: Int64SQLType{},
4776+
},
4777+
"float32ArrayParam": ArraySQLType{
4778+
ElemType: Float32SQLType{},
4779+
},
4780+
"float64ArrayParam": ArraySQLType{
4781+
ElemType: Float64SQLType{},
4782+
},
4783+
"boolArrayParam": ArraySQLType{
4784+
ElemType: BoolSQLType{},
4785+
},
4786+
"tsArrayParam": ArraySQLType{
4787+
ElemType: TimestampSQLType{},
4788+
},
4789+
"dateArrayParam": ArraySQLType{
4790+
ElemType: DateSQLType{},
4791+
},
4792+
},
4793+
); err != nil {
4794+
t.Fatal("PrepareStatement: " + err.Error())
4795+
}
4796+
}
4797+
47394798
// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
47404799
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
47414800
numCount := 0

bigtable/sql_types.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,116 +17,127 @@ limitations under the License.
1717
package bigtable
1818

1919
import (
20+
"errors"
21+
2022
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
2123
)
2224

2325
// SQLType represents the type of data that can be used to query Cloud Bigtable.
2426
// It is based on the GoogleSQL standard.
2527
type SQLType interface {
2628
// Used while preparing the query
27-
typeProto() *btpb.Type
29+
typeProto() (*btpb.Type, error)
2830
}
2931

3032
// BytesSQLType represents a slice of bytes.
3133
type BytesSQLType struct{}
3234

33-
func (s BytesSQLType) typeProto() *btpb.Type {
35+
func (s BytesSQLType) typeProto() (*btpb.Type, error) {
3436
return &btpb.Type{
3537
Kind: &btpb.Type_BytesType{
3638
BytesType: &btpb.Type_Bytes{},
3739
},
38-
}
40+
}, nil
3941
}
4042

4143
// StringSQLType represents a string.
4244
type StringSQLType struct {
4345
}
4446

45-
func (s StringSQLType) typeProto() *btpb.Type {
47+
func (s StringSQLType) typeProto() (*btpb.Type, error) {
4648
return &btpb.Type{
4749
Kind: &btpb.Type_StringType{
4850
StringType: &btpb.Type_String{},
4951
},
50-
}
52+
}, nil
5153
}
5254

5355
// Int64SQLType represents an 8-byte integer.
5456
type Int64SQLType struct{}
5557

56-
func (s Int64SQLType) typeProto() *btpb.Type {
58+
func (s Int64SQLType) typeProto() (*btpb.Type, error) {
5759
return &btpb.Type{
5860
Kind: &btpb.Type_Int64Type{
5961
Int64Type: &btpb.Type_Int64{},
6062
},
61-
}
63+
}, nil
6264
}
6365

6466
// Float32SQLType represents a 32-bit floating-point number.
6567
type Float32SQLType struct{}
6668

67-
func (s Float32SQLType) typeProto() *btpb.Type {
69+
func (s Float32SQLType) typeProto() (*btpb.Type, error) {
6870
return &btpb.Type{
6971
Kind: &btpb.Type_Float32Type{
7072
Float32Type: &btpb.Type_Float32{},
7173
},
72-
}
74+
}, nil
7375
}
7476

7577
// Float64SQLType represents a 64-bit floating-point number.
7678
type Float64SQLType struct{}
7779

78-
func (s Float64SQLType) typeProto() *btpb.Type {
80+
func (s Float64SQLType) typeProto() (*btpb.Type, error) {
7981
return &btpb.Type{
8082
Kind: &btpb.Type_Float64Type{
8183
Float64Type: &btpb.Type_Float64{},
8284
},
83-
}
85+
}, nil
8486
}
8587

8688
// BoolSQLType represents a boolean.
8789
type BoolSQLType struct{}
8890

89-
func (s BoolSQLType) typeProto() *btpb.Type {
91+
func (s BoolSQLType) typeProto() (*btpb.Type, error) {
9092
return &btpb.Type{
9193
Kind: &btpb.Type_BoolType{
9294
BoolType: &btpb.Type_Bool{},
9395
},
94-
}
96+
}, nil
9597
}
9698

9799
// TimestampSQLType represents a point in time.
98100
type TimestampSQLType struct{}
99101

100-
func (s TimestampSQLType) typeProto() *btpb.Type {
102+
func (s TimestampSQLType) typeProto() (*btpb.Type, error) {
101103
return &btpb.Type{
102104
Kind: &btpb.Type_TimestampType{
103105
TimestampType: &btpb.Type_Timestamp{},
104106
},
105-
}
107+
}, nil
106108
}
107109

108110
// DateSQLType represents a calendar date.
109111
type DateSQLType struct{}
110112

111-
func (s DateSQLType) typeProto() *btpb.Type {
113+
func (s DateSQLType) typeProto() (*btpb.Type, error) {
112114
return &btpb.Type{
113115
Kind: &btpb.Type_DateType{
114116
DateType: &btpb.Type_Date{},
115117
},
116-
}
118+
}, nil
117119
}
118120

119121
// ArraySQLType represents an ordered list of elements of a given type.
120122
type ArraySQLType struct {
121123
ElemType SQLType
122124
}
123125

124-
func (s ArraySQLType) typeProto() *btpb.Type {
126+
func (s ArraySQLType) typeProto() (*btpb.Type, error) {
127+
if s.ElemType == nil {
128+
return nil, errors.New("must specify an explicit element type")
129+
}
130+
131+
tp, err := s.ElemType.typeProto()
132+
if err != nil {
133+
return nil, err
134+
}
135+
125136
return &btpb.Type{
126137
Kind: &btpb.Type_ArrayType{
127138
ArrayType: &btpb.Type_Array{
128-
ElementType: s.ElemType.typeProto(),
139+
ElementType: tp,
129140
},
130141
},
131-
}
142+
}, nil
132143
}

0 commit comments

Comments
 (0)