Skip to content

Commit 54bfc32

Browse files
authored
fix(storage): add mutex around uses of mrd variables (#11405)
1 parent 893d27a commit 54bfc32

1 file changed

Lines changed: 16 additions & 6 deletions

File tree

storage/grpc_client.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
11901190
for {
11911191
select {
11921192
case <-rr.ctx.Done():
1193+
rr.mu.Lock()
11931194
rr.done = true
1195+
rr.mu.Unlock()
11941196
return
11951197
case <-rr.managerRetry:
11961198
return
@@ -1349,7 +1351,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13491351
rr.mu.Unlock()
13501352
}
13511353

1354+
rr.mu.Lock()
13521355
rr.objectSize = size
1356+
rr.mu.Unlock()
13531357

13541358
go streamManager()
13551359
go streamReceiver()
@@ -1424,8 +1428,12 @@ func (r *gRPCBidiReader) reopenStream(failSpec []rangeSpec) error {
14241428

14251429
// Add will add current range to stream.
14261430
func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) {
1427-
if offset > mr.objectSize {
1428-
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", mr.objectSize))
1431+
mr.mu.Lock()
1432+
objectSize := mr.objectSize
1433+
mr.mu.Unlock()
1434+
1435+
if offset > objectSize {
1436+
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", objectSize))
14291437
return
14301438
}
14311439
if limit < 0 {
@@ -1463,8 +1471,10 @@ func (mr *gRPCBidiReader) close() error {
14631471
if mr.cancel != nil {
14641472
mr.cancel()
14651473
}
1474+
mr.mu.Lock()
14661475
mr.done = true
14671476
mr.activeTask = 0
1477+
mr.mu.Unlock()
14681478
mr.closeReceiver <- true
14691479
mr.closeManager <- true
14701480
return nil
@@ -1877,11 +1887,11 @@ type gRPCBidiReader struct {
18771887
closeManager chan bool
18781888
managerRetry chan bool
18791889
receiverRetry chan bool
1890+
mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access
18801891
mp map[int64]rangeSpec // always use the mutex when accessing the map
1881-
mu sync.Mutex // protects map from concurrent access.
1882-
done bool
1883-
activeTask int64
1884-
objectSize int64
1892+
done bool // always use the mutex when accessing this variable
1893+
activeTask int64 // always use the mutex when accessing this variable
1894+
objectSize int64 // always use the mutex when accessing this variable
18851895
retrier func(error, string)
18861896
streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one.
18871897
}

0 commit comments

Comments
 (0)