@@ -42,6 +42,7 @@ import (
4242 "google.golang.org/grpc/metadata"
4343 "google.golang.org/grpc/status"
4444 "google.golang.org/protobuf/proto"
45+ "google.golang.org/protobuf/types/known/timestamppb"
4546
4647 // Install google-c2p resolver, which is required for direct path.
4748 _ "google.golang.org/grpc/xds/googledirectpath"
@@ -240,6 +241,10 @@ func convertToGrpcStatusErr(err error) (codes.Code, error) {
240241 return codes .Unknown , err
241242}
242243
244+ func (c * Client ) fullInstanceName () string {
245+ return fmt .Sprintf ("projects/%s/instances/%s" , c .project , c .instance )
246+ }
247+
243248func (c * Client ) fullTableName (table string ) string {
244249 return fmt .Sprintf ("projects/%s/instances/%s/tables/%s" , c .project , c .instance , table )
245250}
@@ -252,10 +257,14 @@ func (c *Client) fullMaterializedViewName(materializedView string) string {
252257 return fmt .Sprintf ("projects/%s/instances/%s/materializedViews/%s" , c .project , c .instance , materializedView )
253258}
254259
255- func (c * Client ) requestParamsHeaderValue (table string ) string {
260+ func (c * Client ) reqParamsHeaderValTable (table string ) string {
256261 return fmt .Sprintf ("table_name=%s&app_profile_id=%s" , url .QueryEscape (c .fullTableName (table )), url .QueryEscape (c .appProfile ))
257262}
258263
264+ func (c * Client ) reqParamsHeaderValInstance () string {
265+ return fmt .Sprintf ("name=%s&app_profile_id=%s" , url .QueryEscape (c .fullInstanceName ()), url .QueryEscape (c .appProfile ))
266+ }
267+
259268// mergeOutgoingMetadata returns a context populated by the existing outgoing
260269// metadata merged with the provided mds.
261270func mergeOutgoingMetadata (ctx context.Context , mds ... metadata.MD ) context.Context {
@@ -319,7 +328,7 @@ func (c *Client) Open(table string) *Table {
319328 table : table ,
320329 md : metadata .Join (metadata .Pairs (
321330 resourcePrefixHeader , c .fullTableName (table ),
322- requestParamsHeader , c .requestParamsHeaderValue (table ),
331+ requestParamsHeader , c .reqParamsHeaderValTable (table ),
323332 ), c .newFeatureFlags ()),
324333 }
325334}
@@ -331,7 +340,7 @@ func (c *Client) OpenTable(table string) TableAPI {
331340 table : table ,
332341 md : metadata .Join (metadata .Pairs (
333342 resourcePrefixHeader , c .fullTableName (table ),
334- requestParamsHeader , c .requestParamsHeaderValue (table ),
343+ requestParamsHeader , c .reqParamsHeaderValTable (table ),
335344 ), c .newFeatureFlags ()),
336345 }}
337346}
@@ -343,7 +352,7 @@ func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI {
343352 table : table ,
344353 md : metadata .Join (metadata .Pairs (
345354 resourcePrefixHeader , c .fullAuthorizedViewName (table , authorizedView ),
346- requestParamsHeader , c .requestParamsHeaderValue (table ),
355+ requestParamsHeader , c .reqParamsHeaderValTable (table ),
347356 ), c .newFeatureFlags ()),
348357 authorizedView : authorizedView ,
349358 }}
@@ -355,12 +364,97 @@ func (c *Client) OpenMaterializedView(materializedView string) TableAPI {
355364 c : c ,
356365 md : metadata .Join (metadata .Pairs (
357366 resourcePrefixHeader , c .fullMaterializedViewName (materializedView ),
358- requestParamsHeader , c .requestParamsHeaderValue (materializedView ),
367+ requestParamsHeader , c .reqParamsHeaderValTable (materializedView ),
359368 ), c .newFeatureFlags ()),
360369 materializedView : materializedView ,
361370 }}
362371}
363372
373+ // PreparedStatement stores the results of query preparation that can be used to
374+ // create [BoundStatements]s to execute queries.
375+ //
376+ // Whenever possible this should be shared across different instances of the same query,
377+ // in order to amortize query preparation costs.
378+ type PreparedStatement struct {
379+ c * Client
380+ paramTypes map [string ]* btpb.Type
381+ // Structure of rows in the response stream of `ExecuteQueryResponse` for the
382+ // returned `prepared_query`.
383+ metadata * btpb.ResultSetMetadata
384+ // A serialized prepared query. It is an opaque
385+ // blob of bytes to send in `ExecuteQueryRequest`.
386+ preparedQuery []byte
387+ // The time at which the prepared query token becomes invalid.
388+ // A token may become invalid early due to changes in the data being read, but
389+ // it provides a guideline to refresh query plans asynchronously.
390+ validUntil * timestamppb.Timestamp
391+ }
392+
393+ // PrepareOption can be passed while preparing a query statement.
394+ type PrepareOption interface {
395+ }
396+
397+ // PrepareStatement prepares a query for execution. If possible, this should be called once and
398+ // reused across requests. This will amortize the cost of query preparation.
399+ func (c * Client ) PrepareStatement (ctx context.Context , query string , paramTypes map [string ]SQLType , opts ... PrepareOption ) (preparedStatement * PreparedStatement , err error ) {
400+ md := metadata .Join (metadata .Pairs (
401+ resourcePrefixHeader , c .fullInstanceName (),
402+ requestParamsHeader , c .reqParamsHeaderValInstance (),
403+ ), c .newFeatureFlags ())
404+
405+ ctx = mergeOutgoingMetadata (ctx , md )
406+ ctx = trace .StartSpan (ctx , "cloud.google.com/go/bigtable.PrepareQuery" )
407+ defer func () { trace .EndSpan (ctx , err ) }()
408+
409+ mt := c .newBuiltinMetricsTracer (ctx , "" , false )
410+ defer recordOperationCompletion (mt )
411+
412+ preparedStatement , err = c .prepareStatement (ctx , mt , query , paramTypes , opts ... )
413+ statusCode , statusErr := convertToGrpcStatusErr (err )
414+ mt .currOp .setStatus (statusCode .String ())
415+ return preparedStatement , statusErr
416+ }
417+
418+ func (c * Client ) prepareStatement (ctx context.Context , mt * builtinMetricsTracer , query string , paramTypes map [string ]SQLType , opts ... PrepareOption ) (* PreparedStatement , error ) {
419+ reqParamTypes := map [string ]* btpb.Type {}
420+ for k , v := range paramTypes {
421+ if v == nil {
422+ return nil , errors .New ("bigtable: invalid SQLType: nil" )
423+ }
424+ tpb , err := v .typeProto ()
425+ if err != nil {
426+ return nil , err
427+ }
428+ reqParamTypes [k ] = tpb
429+ }
430+ req := & btpb.PrepareQueryRequest {
431+ InstanceName : c .fullInstanceName (),
432+ AppProfileId : c .appProfile ,
433+ Query : query ,
434+ DataFormat : & btpb.PrepareQueryRequest_ProtoFormat {
435+ ProtoFormat : & btpb.ProtoFormat {},
436+ },
437+ ParamTypes : reqParamTypes ,
438+ }
439+ var res * btpb.PrepareQueryResponse
440+ err := gaxInvokeWithRecorder (ctx , mt , "PrepareQuery" , func (ctx context.Context , headerMD , trailerMD * metadata.MD , _ gax.CallSettings ) error {
441+ var err error
442+ res , err = c .client .PrepareQuery (ctx , req , grpc .Header (headerMD ), grpc .Trailer (trailerMD ))
443+ return err
444+ }, retryOptions ... )
445+ if err != nil {
446+ return nil , err
447+ }
448+
449+ return & PreparedStatement {
450+ c : c ,
451+ metadata : res .Metadata ,
452+ preparedQuery : res .PreparedQuery ,
453+ validUntil : res .ValidUntil ,
454+ paramTypes : reqParamTypes ,
455+ }, err
456+ }
457+
364458func (ti * tableImpl ) ReadRows (ctx context.Context , arg RowSet , f func (Row ) bool , opts ... ReadOption ) error {
365459 return ti .Table .ReadRows (ctx , arg , f , opts ... )
366460}
@@ -1650,7 +1744,11 @@ func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]
16501744}
16511745
16521746func (t * Table ) newBuiltinMetricsTracer (ctx context.Context , isStreaming bool ) * builtinMetricsTracer {
1653- mt := t .c .metricsTracerFactory .createBuiltinMetricsTracer (ctx , t .table , isStreaming )
1747+ return t .c .newBuiltinMetricsTracer (ctx , t .table , isStreaming )
1748+ }
1749+
1750+ func (c * Client ) newBuiltinMetricsTracer (ctx context.Context , table string , isStreaming bool ) * builtinMetricsTracer {
1751+ mt := c .metricsTracerFactory .createBuiltinMetricsTracer (ctx , table , isStreaming )
16541752 return & mt
16551753}
16561754
0 commit comments