Skip to content

Commit 8264a96

Browse files
authored
feat(storage): Implement io.WriterTo in Reader (#9659)
* feat(storage): Implement io.WriterTo in Reader This allows the gRPC Reader to write directly into the application write buffer, saving a data copy. Users can get the benefit of this directly by explicitly calling Reader.WriteTo, but they can also benefit implicitly if they are calling io.Copy. A bunch of checksum logic had to be moved from the parent Reader into the transport Readers to make this work, since we need to update the checksum for every message read in WriteTo. * fix conf test object vars * fix review comments * fix EOF case.
1 parent 77a5dfc commit 8264a96

4 files changed

Lines changed: 189 additions & 34 deletions

File tree

storage/grpc_client.go

Lines changed: 107 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/base64"
2020
"errors"
2121
"fmt"
22+
"hash/crc32"
2223
"io"
2324
"net/url"
2425
"os"
@@ -1042,6 +1043,16 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
10421043
// This is the size of the entire object, even if only a range was requested.
10431044
size := obj.GetSize()
10441045

1046+
// Only support checksums when reading an entire object, not a range.
1047+
var (
1048+
wantCRC uint32
1049+
checkCRC bool
1050+
)
1051+
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
1052+
wantCRC = checksums.GetCrc32C()
1053+
checkCRC = true
1054+
}
1055+
10451056
r = &Reader{
10461057
Attrs: ReaderObjectAttrs{
10471058
Size: size,
@@ -1063,7 +1074,10 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
10631074
settings: s,
10641075
zeroRange: params.length == 0,
10651076
databuf: databuf,
1077+
wantCRC: wantCRC,
1078+
checkCRC: checkCRC,
10661079
},
1080+
checkCRC: checkCRC,
10671081
}
10681082

10691083
cr := msg.GetContentRange()
@@ -1081,12 +1095,6 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
10811095
r.reader.Close()
10821096
}
10831097

1084-
// Only support checksums when reading an entire object, not a range.
1085-
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
1086-
r.wantCRC = checksums.GetCrc32C()
1087-
r.checkCRC = true
1088-
}
1089-
10901098
return r, nil
10911099
}
10921100

@@ -1464,12 +1472,34 @@ type gRPCReader struct {
14641472
databuf []byte
14651473
cancel context.CancelFunc
14661474
settings *settings
1475+
checkCRC bool // should we check the CRC?
1476+
wantCRC uint32 // the CRC32c value the server sent in the header
1477+
gotCRC uint32 // running crc
1478+
}
1479+
1480+
// Update the running CRC with the data in the slice, if CRC checking was enabled.
1481+
func (r *gRPCReader) updateCRC(b []byte) {
1482+
if r.checkCRC {
1483+
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, b)
1484+
}
1485+
}
1486+
1487+
// Checks whether the CRC matches at the conclusion of a read, if CRC checking was enabled.
1488+
func (r *gRPCReader) runCRCCheck() error {
1489+
if r.checkCRC && r.gotCRC != r.wantCRC {
1490+
return fmt.Errorf("storage: bad CRC on read: got %d, want %d", r.gotCRC, r.wantCRC)
1491+
}
1492+
return nil
14671493
}
14681494

14691495
// Read reads bytes into the user's buffer from an open gRPC stream.
14701496
func (r *gRPCReader) Read(p []byte) (int, error) {
1471-
// The entire object has been read by this reader, return EOF.
1497+
// The entire object has been read by this reader, check the checksum if
1498+
// necessary and return EOF.
14721499
if r.size == r.seen || r.zeroRange {
1500+
if err := r.runCRCCheck(); err != nil {
1501+
return 0, err
1502+
}
14731503
return 0, io.EOF
14741504
}
14751505

@@ -1478,7 +1508,7 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
14781508
// using the same reader. One encounters an error and the stream is closed
14791509
// and then reopened while the other routine attempts to read from it.
14801510
if r.stream == nil {
1481-
return 0, fmt.Errorf("reader has been closed")
1511+
return 0, fmt.Errorf("storage: reader has been closed")
14821512
}
14831513

14841514
var n int
@@ -1487,6 +1517,7 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
14871517
if len(r.leftovers) > 0 {
14881518
n = copy(p, r.leftovers)
14891519
r.seen += int64(n)
1520+
r.updateCRC(p[:n])
14901521
r.leftovers = r.leftovers[n:]
14911522
return n, nil
14921523
}
@@ -1512,10 +1543,78 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
15121543
r.leftovers = content[n:]
15131544
}
15141545
r.seen += int64(n)
1546+
r.updateCRC(p[:n])
15151547

15161548
return n, nil
15171549
}
15181550

1551+
// WriteTo writes all the data requested by the Reader into w, implementing
1552+
// io.WriterTo.
1553+
func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) {
1554+
// The entire object has been read by this reader, check the checksum if
1555+
// necessary and return nil.
1556+
if r.size == r.seen || r.zeroRange {
1557+
if err := r.runCRCCheck(); err != nil {
1558+
return 0, err
1559+
}
1560+
return 0, nil
1561+
}
1562+
1563+
// No stream to read from, either never initialized or Close was called.
1564+
// Note: There is a potential concurrency issue if multiple routines are
1565+
// using the same reader. One encounters an error and the stream is closed
1566+
// and then reopened while the other routine attempts to read from it.
1567+
if r.stream == nil {
1568+
return 0, fmt.Errorf("storage: reader has been closed")
1569+
}
1570+
1571+
// Track bytes written during before call.
1572+
var alreadySeen = r.seen
1573+
1574+
// Write any leftovers to the stream. There will be some leftovers from the
1575+
// original NewRangeReader call.
1576+
if len(r.leftovers) > 0 {
1577+
// Write() will write the entire leftovers slice unless there is an error.
1578+
written, err := w.Write(r.leftovers)
1579+
r.seen += int64(written)
1580+
r.updateCRC(r.leftovers)
1581+
r.leftovers = nil
1582+
if err != nil {
1583+
return r.seen - alreadySeen, err
1584+
}
1585+
}
1586+
1587+
// Loop and receive additional messages until the entire data is written.
1588+
for {
1589+
// Attempt to receive the next message on the stream.
1590+
// Will terminate with io.EOF once data has all come through.
1591+
// recv() handles stream reopening and retry logic so no need for retries here.
1592+
msg, err := r.recv()
1593+
if err != nil {
1594+
if err == io.EOF {
1595+
// We are done; check the checksum if necessary and return.
1596+
err = r.runCRCCheck()
1597+
}
1598+
return r.seen - alreadySeen, err
1599+
}
1600+
1601+
// TODO: Determine if we need to capture incremental CRC32C for this
1602+
// chunk. The Object CRC32C checksum is captured when directed to read
1603+
// the entire Object. If directed to read a range, we may need to
1604+
// calculate the range's checksum for verification if the checksum is
1605+
// present in the response here.
1606+
// TODO: Figure out if we need to support decompressive transcoding
1607+
// https://cloud.google.com/storage/docs/transcoding.
1608+
written, err := w.Write(msg)
1609+
r.seen += int64(written)
1610+
r.updateCRC(msg)
1611+
if err != nil {
1612+
return r.seen - alreadySeen, err
1613+
}
1614+
}
1615+
1616+
}
1617+
15191618
// Close cancels the read stream's context in order for it to be closed and
15201619
// collected.
15211620
func (r *gRPCReader) Close() error {

storage/http_client.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/base64"
2020
"errors"
2121
"fmt"
22+
"hash/crc32"
2223
"io"
2324
"io/ioutil"
2425
"net/http"
@@ -1218,9 +1219,12 @@ func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket strin
12181219
}
12191220

12201221
type httpReader struct {
1221-
body io.ReadCloser
1222-
seen int64
1223-
reopen func(seen int64) (*http.Response, error)
1222+
body io.ReadCloser
1223+
seen int64
1224+
reopen func(seen int64) (*http.Response, error)
1225+
checkCRC bool // should we check the CRC?
1226+
wantCRC uint32 // the CRC32c value the server sent in the header
1227+
gotCRC uint32 // running crc
12241228
}
12251229

12261230
func (r *httpReader) Read(p []byte) (int, error) {
@@ -1229,7 +1233,22 @@ func (r *httpReader) Read(p []byte) (int, error) {
12291233
m, err := r.body.Read(p[n:])
12301234
n += m
12311235
r.seen += int64(m)
1232-
if err == nil || err == io.EOF {
1236+
if r.checkCRC {
1237+
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
1238+
}
1239+
if err == nil {
1240+
return n, nil
1241+
}
1242+
if err == io.EOF {
1243+
// Check CRC here. It would be natural to check it in Close, but
1244+
// everybody defers Close on the assumption that it doesn't return
1245+
// anything worth looking at.
1246+
if r.checkCRC {
1247+
if r.gotCRC != r.wantCRC {
1248+
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
1249+
r.gotCRC, r.wantCRC)
1250+
}
1251+
}
12331252
return n, err
12341253
}
12351254
// Read failed (likely due to connection issues), but we will try to reopen
@@ -1435,11 +1454,12 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
14351454
Attrs: attrs,
14361455
size: size,
14371456
remain: remain,
1438-
wantCRC: crc,
14391457
checkCRC: checkCRC,
14401458
reader: &httpReader{
1441-
reopen: reopen,
1442-
body: body,
1459+
reopen: reopen,
1460+
body: body,
1461+
wantCRC: crc,
1462+
checkCRC: checkCRC,
14431463
},
14441464
}, nil
14451465
}

storage/reader.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,7 @@ var emptyBody = ioutil.NopCloser(strings.NewReader(""))
198198
type Reader struct {
199199
Attrs ReaderObjectAttrs
200200
seen, remain, size int64
201-
checkCRC bool // should we check the CRC?
202-
wantCRC uint32 // the CRC32c value the server sent in the header
203-
gotCRC uint32 // running crc
201+
checkCRC bool // Did we check the CRC? This is now only used by tests.
204202

205203
reader io.ReadCloser
206204
ctx context.Context
@@ -218,17 +216,17 @@ func (r *Reader) Read(p []byte) (int, error) {
218216
if r.remain != -1 {
219217
r.remain -= int64(n)
220218
}
221-
if r.checkCRC {
222-
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
223-
// Check CRC here. It would be natural to check it in Close, but
224-
// everybody defers Close on the assumption that it doesn't return
225-
// anything worth looking at.
226-
if err == io.EOF {
227-
if r.gotCRC != r.wantCRC {
228-
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
229-
r.gotCRC, r.wantCRC)
230-
}
231-
}
219+
return n, err
220+
}
221+
222+
// WriteTo writes all the data from the Reader to w. Fulfills the io.WriterTo interface.
223+
// This is called implicitly when calling io.Copy on a Reader.
224+
func (r *Reader) WriteTo(w io.Writer) (int64, error) {
225+
// This implicitly calls r.reader.WriteTo for gRPC only. JSON and XML don't have an
226+
// implementation of WriteTo.
227+
n, err := io.Copy(w, r.reader)
228+
if r.remain != -1 {
229+
r.remain -= int64(n)
232230
}
233231
return n, err
234232
}

storage/retry_conformance_test.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"io"
24-
"io/ioutil"
2524
"net/http"
2625
"net/url"
2726
"os"
@@ -211,12 +210,25 @@ var methods = map[string][]retryFunc{
211210
if err != nil {
212211
return err
213212
}
214-
wr, err := io.Copy(ioutil.Discard, r)
213+
wr, err := r.WriteTo(io.Discard)
215214
if got, want := wr, len(randomBytesToWrite); got != int64(want) {
216215
return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
217216
}
218217
return err
219218
},
219+
func(ctx context.Context, c *Client, fs *resources, _ bool) error {
220+
// This tests downloads by calling Reader.Read rather than Reader.WriteTo.
221+
r, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).NewReader(ctx)
222+
if err != nil {
223+
return err
224+
}
225+
// Use ReadAll because it calls Read implicitly, not WriteTo.
226+
b, err := io.ReadAll(r)
227+
if got, want := len(b), len(randomBytesToWrite); got != want {
228+
return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
229+
}
230+
return err
231+
},
220232
func(ctx context.Context, c *Client, fs *resources, _ bool) error {
221233
// Test JSON reads.
222234
client, ok := c.tc.(*httpStorageClient)
@@ -233,7 +245,7 @@ var methods = map[string][]retryFunc{
233245
if err != nil {
234246
return err
235247
}
236-
wr, err := io.Copy(ioutil.Discard, r)
248+
wr, err := io.Copy(io.Discard, r)
237249
if got, want := wr, len(randomBytesToWrite); got != int64(want) {
238250
return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
239251
}
@@ -253,7 +265,7 @@ var methods = map[string][]retryFunc{
253265
return err
254266
}
255267
defer r.Close()
256-
data, err := ioutil.ReadAll(r)
268+
data, err := io.ReadAll(r)
257269
if err != nil {
258270
return fmt.Errorf("failed to ReadAll, err: %v", err)
259271
}
@@ -265,6 +277,32 @@ var methods = map[string][]retryFunc{
265277
}
266278
return nil
267279
},
280+
func(ctx context.Context, c *Client, fs *resources, _ bool) error {
281+
// Test download via Reader.WriteTo.
282+
// Before running the test method, populate a large test object of 9 MiB.
283+
objName := objectIDs.New()
284+
if err := uploadTestObject(fs.bucket.Name, objName, randomBytes3MiB); err != nil {
285+
return fmt.Errorf("failed to create 9 MiB large object pre test, err: %v", err)
286+
}
287+
// Download the large test object for the S8 download method group.
288+
r, err := c.Bucket(fs.bucket.Name).Object(objName).NewReader(ctx)
289+
if err != nil {
290+
return err
291+
}
292+
defer r.Close()
293+
var data bytes.Buffer
294+
_, err = r.WriteTo(&data)
295+
if err != nil {
296+
return fmt.Errorf("failed to ReadAll, err: %v", err)
297+
}
298+
if got, want := data.Len(), 3*MiB; got != want {
299+
return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
300+
}
301+
if got, want := data.Bytes(), randomBytes3MiB; !bytes.Equal(got, want) {
302+
return fmt.Errorf("body mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
303+
}
304+
return nil
305+
},
268306
func(ctx context.Context, c *Client, fs *resources, _ bool) error {
269307
// Test JSON reads.
270308
// Before running the test method, populate a large test object.
@@ -289,7 +327,7 @@ var methods = map[string][]retryFunc{
289327
return err
290328
}
291329
defer r.Close()
292-
data, err := ioutil.ReadAll(r)
330+
data, err := io.ReadAll(r)
293331
if err != nil {
294332
return fmt.Errorf("failed to ReadAll, err: %v", err)
295333
}

0 commit comments

Comments
 (0)