@@ -29,6 +29,7 @@ import (
2929 "slices"
3030 "strconv"
3131 "strings"
32+ "sync"
3233 "testing"
3334 "time"
3435
@@ -43,6 +44,7 @@ import (
4344 "google.golang.org/api/option"
4445 "google.golang.org/grpc"
4546 "google.golang.org/grpc/codes"
47+ expgrpc "google.golang.org/grpc/experimental"
4648 "google.golang.org/grpc/mem"
4749 "google.golang.org/grpc/status"
4850 "google.golang.org/protobuf/proto"
@@ -3004,6 +3006,165 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
30043006 })
30053007}
30063008
3009+ // Used to test gRPC buffer pool allocs and frees.
3010+ // See https://pkg.go.dev/google.golang.org/grpc/mem
3011+ type testBufferPool struct {
3012+ allocs int64
3013+ frees int64
3014+ sync.Mutex // mutex needed becuase Get/Put can be called in parallel.
3015+ }
3016+
3017+ func (bp * testBufferPool ) Get (length int ) * []byte {
3018+ bp .Lock ()
3019+ bp .allocs += int64 (length )
3020+ bp .Unlock ()
3021+ return mem .DefaultBufferPool ().Get (length )
3022+ }
3023+
3024+ func (bp * testBufferPool ) Put (b * []byte ) {
3025+ if b != nil {
3026+ bp .Lock ()
3027+ bp .frees += int64 (len (* b ))
3028+ bp .Unlock ()
3029+ }
3030+ mem .DefaultBufferPool ().Put (b )
3031+ }
3032+
3033+ func (bp * testBufferPool ) getAllocsAndFrees () (int64 , int64 ) {
3034+ bp .Lock ()
3035+ defer bp .Unlock ()
3036+ return bp .allocs , bp .frees
3037+ }
3038+
3039+ // Test that successful downloads using Reader and MultiRangeDownloader free
3040+ // all of their allocated buffers.
3041+ func TestReadCodecLeaksEmulated (t * testing.T ) {
3042+ checkEmulatorEnvironment (t )
3043+ ctx := context .Background ()
3044+ var bp testBufferPool
3045+ client , err := NewGRPCClient (ctx , option .WithGRPCDialOption (expgrpc .WithBufferPool (& bp )), experimental .WithZonalBucketAPIs ())
3046+ if err != nil {
3047+ t .Fatalf ("NewGRPCClient: %v" , err )
3048+ }
3049+ var (
3050+ contents = randomBytes9MiB
3051+ prefix = time .Now ().Nanosecond ()
3052+ bucketName = fmt .Sprintf ("bucket-%d" , prefix )
3053+ objName = fmt .Sprintf ("%d-object" , prefix )
3054+ bkt = client .Bucket (bucketName )
3055+ obj = bkt .Object (objName )
3056+ )
3057+
3058+ // Upload object.
3059+ if err := bkt .Create (ctx , "project" , nil ); err != nil {
3060+ t .Fatalf ("creating bucket: %v" , err )
3061+ }
3062+ w := obj .NewWriter (ctx )
3063+ if _ , err := io .Copy (w , bytes .NewReader (contents )); err != nil {
3064+ t .Fatalf ("uploading object: %v" , err )
3065+ }
3066+ if err := w .Close (); err != nil {
3067+ t .Fatalf ("closing writer: %v" , err )
3068+ }
3069+ if bp .allocs != bp .frees {
3070+ t .Errorf ("upload: alloc'd bytes %v not equal to freed bytes %v" , bp .allocs , bp .frees )
3071+ }
3072+
3073+ // Test multiple download paths.
3074+ testCases := []struct {
3075+ name string
3076+ downloadFunc func (obj * ObjectHandle ) ([]byte , error )
3077+ }{
3078+ {
3079+ name : "Reader.Read" ,
3080+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
3081+ r , err := obj .NewReader (ctx )
3082+ defer r .Close ()
3083+ if err != nil {
3084+ return nil , err
3085+ }
3086+ gotContents , err := io .ReadAll (r )
3087+ return gotContents , err
3088+ },
3089+ },
3090+ {
3091+ name : "Reader.WriteTo" ,
3092+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
3093+ r , err := obj .NewReader (ctx )
3094+ defer r .Close ()
3095+ if err != nil {
3096+ return nil , err
3097+ }
3098+ buf := bytes .NewBuffer ([]byte {})
3099+ _ , err = r .WriteTo (buf )
3100+ return buf .Bytes (), err
3101+ },
3102+ },
3103+ {
3104+ name : "MultiRangeDownloader 3MiB ranges" ,
3105+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
3106+ mrd , err := obj .NewMultiRangeDownloader (ctx )
3107+ var bufs []* bytes.Buffer
3108+ var currOff int64
3109+ var increment int64 = 3 * MiB
3110+ for range 3 {
3111+ buf := bytes .NewBuffer ([]byte {})
3112+ mrd .Add (buf , currOff , increment , func (int64 , int64 , error ) {})
3113+ bufs = append (bufs , buf )
3114+ currOff += increment
3115+ }
3116+ mrd .Wait ()
3117+ if err := mrd .Close (); err != nil {
3118+ return nil , err
3119+ }
3120+ var b []byte
3121+ for _ , buf := range bufs {
3122+ b = append (b , buf .Bytes ()... )
3123+ }
3124+ return b , err
3125+ }},
3126+ {
3127+ name : "MultiRangeDownloader 256k ranges" ,
3128+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
3129+ mrd , err := obj .NewMultiRangeDownloader (ctx )
3130+ var bufs []* bytes.Buffer
3131+ var currOff int64
3132+ var increment int64 = 256 * 1024
3133+ for range 36 {
3134+ buf := bytes .NewBuffer ([]byte {})
3135+ mrd .Add (buf , currOff , increment , func (int64 , int64 , error ) {})
3136+ bufs = append (bufs , buf )
3137+ currOff += increment
3138+ }
3139+ mrd .Wait ()
3140+ if err := mrd .Close (); err != nil {
3141+ return nil , err
3142+ }
3143+ var b []byte
3144+ for _ , buf := range bufs {
3145+ b = append (b , buf .Bytes ()... )
3146+ }
3147+ return b , err
3148+ }},
3149+ }
3150+
3151+ for _ , tc := range testCases {
3152+ t .Run (tc .name , func (t * testing.T ) {
3153+ gotContents , err := tc .downloadFunc (obj )
3154+ if err != nil {
3155+ t .Fatalf ("downloading content: %v" , err )
3156+ }
3157+ if ! bytes .Equal (gotContents , contents ) {
3158+ t .Errorf ("downloaded bytes did not match; got %v bytes, want %v" , len (gotContents ), len (contents ))
3159+ }
3160+ allocs , frees := bp .getAllocsAndFrees ()
3161+ if allocs != frees {
3162+ t .Errorf ("download: alloc'd bytes %v not equal to freed bytes %v" , allocs , frees )
3163+ }
3164+ })
3165+ }
3166+ }
3167+
30073168// createRetryTest creates a bucket in the emulator and sets up a test using the
30083169// Retry Test API for the given instructions. This is intended for emulator tests
30093170// of retry behavior that are not covered by conformance tests.
0 commit comments