Skip to content

Commit d540a83

Browse files
committed
chore: validate range received before forwarding for an outstanding read
1 parent 8e516e6 commit d540a83

4 files changed

Lines changed: 252 additions & 9 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.gax.rpc.ClientStream;
2424
import com.google.api.gax.rpc.ResponseObserver;
2525
import com.google.api.gax.rpc.StreamController;
26+
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2627
import com.google.common.base.Preconditions;
2728
import com.google.protobuf.ByteString;
2829
import com.google.rpc.Status;
@@ -36,6 +37,7 @@
3637
import com.google.storage.v2.ObjectRangeData;
3738
import com.google.storage.v2.ReadRange;
3839
import com.google.storage.v2.ReadRangeError;
40+
import io.grpc.Status.Code;
3941
import java.io.IOException;
4042
import java.util.List;
4143
import java.util.concurrent.ExecutionException;
@@ -226,6 +228,7 @@ public void onStart(StreamController controller) {
226228
controller.request(2);
227229
}
228230

231+
@SuppressWarnings("rawtypes")
229232
@Override
230233
public void onResponse(BidiReadObjectResponse response) {
231234
controller.request(1);
@@ -246,11 +249,9 @@ public void onResponse(BidiReadObjectResponse response) {
246249
ReadRange readRange = d.getReadRange();
247250
long id = readRange.getReadId();
248251
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
249-
if (read == null) {
252+
if (read == null || !read.acceptingBytes()) {
250253
continue;
251254
}
252-
// TODO: validate read is still open
253-
// todo: check that offsets match up
254255
ChecksummedData checksummedData = d.getChecksummedData();
255256
ByteString content = checksummedData.getContent();
256257
int crc32C = checksummedData.getCrc32C();
@@ -270,18 +271,56 @@ public void onResponse(BidiReadObjectResponse response) {
270271
requestStream.send(requestWithNewReadId);
271272
continue;
272273
}
274+
273275
final int idx = i;
274-
//noinspection rawtypes
275-
ResponseContentLifecycleHandle.ChildRef childRef =
276-
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
277-
read.accept(childRef);
278-
if (d.getRangeEnd()) {
276+
long begin = readRange.getReadOffset();
277+
long position = read.getReadCursor().position();
278+
if (begin == position) {
279+
ChildRef childRef =
280+
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
281+
read.accept(childRef);
282+
} else if (begin < position) {
283+
int skip = Math.toIntExact(position - begin);
284+
ChildRef childRef =
285+
handle.borrow(
286+
r ->
287+
r.getObjectDataRanges(idx)
288+
.getChecksummedData()
289+
.getContent()
290+
.substring(skip));
291+
read.accept(childRef);
292+
//noinspection resource
293+
read = state.assignNewReadId(id);
294+
if (read.getReadCursor().hasRemaining()) {
295+
BidiReadObjectRequest requestWithNewReadId =
296+
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
297+
requestStream.send(requestWithNewReadId);
298+
}
299+
} else {
300+
Status status =
301+
Status.newBuilder()
302+
.setCode(Code.OUT_OF_RANGE.value())
303+
.setMessage(
304+
String.format("position = %d, readRange.read_offset = %d", position, begin))
305+
.build();
306+
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
307+
// todo: record failure for read
308+
BidiReadObjectRequest requestWithNewReadId =
309+
BidiReadObjectRequest.newBuilder()
310+
.addReadRanges(readWithNewId.makeReadRange())
311+
.build();
312+
requestStream.send(requestWithNewReadId);
313+
// todo
314+
continue;
315+
}
316+
if (d.getRangeEnd() && !read.getReadCursor().hasRemaining()) {
317+
final BlobDescriptorStreamRead finalRead = read;
279318
// invoke eof on exec, the resolving future could have a downstream callback
280319
// that we don't want to block this grpc thread
281320
executor.execute(
282321
StorageException.liftToRunnable(
283322
() -> {
284-
read.eof();
323+
finalRead.eof();
285324
// don't remove the outstanding read until the future has been resolved
286325
state.removeOutstandingRead(id);
287326
}));

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ private BlobDescriptorStreamRead(
5151
this.closed = closed;
5252
}
5353

54+
ReadCursor getReadCursor() {
55+
return readCursor;
56+
}
57+
58+
abstract boolean acceptingBytes();
59+
5460
abstract void accept(ChildRef childRef) throws IOException;
5561

5662
abstract void eof() throws IOException;
@@ -111,9 +117,17 @@ private AccumulatingRead(
111117
this.complete = complete;
112118
}
113119

120+
@Override
121+
boolean acceptingBytes() {
122+
return !complete.isDone() && readCursor.hasRemaining();
123+
}
124+
114125
@Override
115126
void fail(Status status) throws IOException {
116127
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
128+
if (!status.getMessage().isEmpty()) {
129+
grpcStatus = grpcStatus.withDescription(status.getMessage());
130+
}
117131
StatusRuntimeException cause = grpcStatus.asRuntimeException();
118132
ApiException apiException =
119133
ApiExceptionFactory.createException(

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorFakeTest.java

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2828
import com.google.cloud.storage.it.ChecksummedTestContent;
2929
import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor;
30+
import com.google.common.collect.ImmutableMap;
3031
import com.google.protobuf.Any;
3132
import com.google.protobuf.ByteString;
3233
import com.google.storage.v2.BidiReadHandle;
@@ -638,6 +639,186 @@ public void onCompleted() {
638639
}
639640
}
640641

642+
@Test
643+
public void objectRangeData_offset_notAligned_lt() throws Exception {
644+
645+
Object metadata =
646+
Object.newBuilder()
647+
.setBucket(BucketName.format("_", "b"))
648+
.setName("o")
649+
.setGeneration(1)
650+
.build();
651+
byte[] b64bytes = DataGenerator.base64Characters().genBytes(64);
652+
ChecksummedTestContent expected = ChecksummedTestContent.of(b64bytes, 10, 20);
653+
654+
BidiReadObjectRequest req1 =
655+
BidiReadObjectRequest.newBuilder()
656+
.setReadObjectSpec(
657+
BidiReadObjectSpec.newBuilder()
658+
.setBucket(metadata.getBucket())
659+
.setObject(metadata.getName())
660+
.build())
661+
.build();
662+
BidiReadObjectResponse res1 = BidiReadObjectResponse.newBuilder().setMetadata(metadata).build();
663+
664+
ChecksummedTestContent content2 = ChecksummedTestContent.of(b64bytes, 9, 20);
665+
BidiReadObjectRequest req2 =
666+
BidiReadObjectRequest.newBuilder().addReadRanges(getReadRange(1, 10, 20)).build();
667+
BidiReadObjectResponse res2 =
668+
BidiReadObjectResponse.newBuilder()
669+
.addObjectDataRanges(
670+
ObjectRangeData.newBuilder()
671+
.setChecksummedData(content2.asChecksummedData())
672+
.setReadRange(getReadRange(1, 9, content2))
673+
.setRangeEnd(true)
674+
.build())
675+
.build();
676+
677+
ChecksummedTestContent content3 = ChecksummedTestContent.of(b64bytes, 29, 1);
678+
BidiReadObjectRequest req3 =
679+
BidiReadObjectRequest.newBuilder().addReadRanges(getReadRange(2, 29, 1)).build();
680+
BidiReadObjectResponse res3 =
681+
BidiReadObjectResponse.newBuilder()
682+
.setMetadata(metadata)
683+
.addObjectDataRanges(
684+
ObjectRangeData.newBuilder()
685+
.setChecksummedData(content3.asChecksummedData())
686+
.setReadRange(getReadRange(2, 29, content3))
687+
.setRangeEnd(true)
688+
.build())
689+
.build();
690+
691+
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db =
692+
ImmutableMap.<BidiReadObjectRequest, BidiReadObjectResponse>builder()
693+
.put(req1, res1)
694+
.put(req2, res2)
695+
.put(req3, res3)
696+
.buildOrThrow();
697+
698+
runTestAgainstFakeServer(expected, db, ByteRangeSpec.relativeLength(10L, 20L));
699+
}
700+
701+
@Test
702+
public void objectRangeData_offset_notAligned_gt() throws Exception {
703+
704+
Object metadata =
705+
Object.newBuilder()
706+
.setBucket(BucketName.format("_", "b"))
707+
.setName("o")
708+
.setGeneration(1)
709+
.build();
710+
byte[] b64bytes = DataGenerator.base64Characters().genBytes(64);
711+
ChecksummedTestContent expected = ChecksummedTestContent.of(b64bytes, 10, 20);
712+
713+
BidiReadObjectRequest req1 =
714+
BidiReadObjectRequest.newBuilder()
715+
.setReadObjectSpec(
716+
BidiReadObjectSpec.newBuilder()
717+
.setBucket(metadata.getBucket())
718+
.setObject(metadata.getName())
719+
.build())
720+
.build();
721+
BidiReadObjectResponse res1 = BidiReadObjectResponse.newBuilder().setMetadata(metadata).build();
722+
723+
ChecksummedTestContent content2 = ChecksummedTestContent.of(b64bytes, 11, 20);
724+
BidiReadObjectRequest req2 =
725+
BidiReadObjectRequest.newBuilder().addReadRanges(getReadRange(1, 10, 20)).build();
726+
BidiReadObjectResponse res2 =
727+
BidiReadObjectResponse.newBuilder()
728+
.addObjectDataRanges(
729+
ObjectRangeData.newBuilder()
730+
.setChecksummedData(content2.asChecksummedData())
731+
.setReadRange(getReadRange(1, 11, content2))
732+
.setRangeEnd(true)
733+
.build())
734+
.build();
735+
736+
ChecksummedTestContent content3 = ChecksummedTestContent.of(b64bytes, 10, 20);
737+
BidiReadObjectRequest req3 =
738+
BidiReadObjectRequest.newBuilder().addReadRanges(getReadRange(2, 10, 20)).build();
739+
BidiReadObjectResponse res3 =
740+
BidiReadObjectResponse.newBuilder()
741+
.setMetadata(metadata)
742+
.addObjectDataRanges(
743+
ObjectRangeData.newBuilder()
744+
.setChecksummedData(content3.asChecksummedData())
745+
.setReadRange(getReadRange(2, 10, content3))
746+
.setRangeEnd(true)
747+
.build())
748+
.build();
749+
750+
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db =
751+
ImmutableMap.<BidiReadObjectRequest, BidiReadObjectResponse>builder()
752+
.put(req1, res1)
753+
.put(req2, res2)
754+
.put(req3, res3)
755+
.buildOrThrow();
756+
757+
runTestAgainstFakeServer(expected, db, ByteRangeSpec.relativeLength(10L, 20L));
758+
}
759+
760+
private void runTestAgainstFakeServer(
761+
ChecksummedTestContent expected,
762+
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db,
763+
ByteRangeSpec range)
764+
throws Exception {
765+
766+
StorageImplBase fake =
767+
new StorageImplBase() {
768+
@Override
769+
public StreamObserver<BidiReadObjectRequest> bidiReadObject(
770+
StreamObserver<BidiReadObjectResponse> respond) {
771+
return new StreamObserver<BidiReadObjectRequest>() {
772+
@Override
773+
public void onNext(BidiReadObjectRequest req) {
774+
if (db.containsKey(req)) {
775+
respond.onNext(db.get(req));
776+
} else {
777+
respond.onError(TestUtils.apiException(Code.UNIMPLEMENTED, "Unexpected request"));
778+
}
779+
}
780+
781+
@Override
782+
public void onError(Throwable t) {
783+
respond.onError(t);
784+
}
785+
786+
@Override
787+
public void onCompleted() {
788+
respond.onCompleted();
789+
}
790+
};
791+
}
792+
};
793+
794+
try (FakeServer fakeServer = FakeServer.of(fake);
795+
Storage storage = fakeServer.getGrpcStorageOptions().getService()) {
796+
797+
BlobId id = BlobId.of("b", "o");
798+
ApiFuture<BlobDescriptor> futureObjectDescriptor = storage.getBlobDescriptor(id);
799+
800+
try (BlobDescriptor bd = futureObjectDescriptor.get(5, TimeUnit.SECONDS)) {
801+
ApiFuture<byte[]> future = bd.readRangeAsBytes(range);
802+
803+
byte[] actual = future.get(5, TimeUnit.SECONDS);
804+
Crc32cLengthKnown actualCrc32c = Hasher.enabled().hash(ByteBuffer.wrap(actual));
805+
806+
byte[] expectedBytes = expected.getBytes();
807+
Crc32cLengthKnown expectedCrc32c =
808+
Crc32cValue.of(expected.getCrc32c(), expectedBytes.length);
809+
810+
assertAll(
811+
() -> assertThat(actual).hasLength(expectedBytes.length),
812+
() -> assertThat(xxd(actual)).isEqualTo(xxd(expectedBytes)),
813+
() -> assertThat(actualCrc32c).isEqualTo(expectedCrc32c));
814+
}
815+
}
816+
}
817+
818+
private ReadRange getReadRange(int readId, int readOffset, ChecksummedTestContent content) {
819+
return getReadRange(readId, readOffset, content.asChecksummedData().getContent().size());
820+
}
821+
641822
private static ReadRange getReadRange(int readId, int readOffset, int readLimit) {
642823
return ReadRange.newBuilder()
643824
.setReadId(readId)

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.google.cloud.storage.it;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkPositionIndexes;
21+
1922
import com.google.common.base.MoreObjects;
2023
import com.google.common.hash.Hashing;
2124
import com.google.common.io.BaseEncoding;
@@ -112,4 +115,10 @@ public static ChecksummedTestContent of(byte[] bytes) {
112115
String md5Base64 = Base64.getEncoder().encodeToString(Hashing.md5().hashBytes(bytes).asBytes());
113116
return new ChecksummedTestContent(bytes, crc32c, md5Base64);
114117
}
118+
119+
public static ChecksummedTestContent of(byte[] bytes, int from, int length) {
120+
checkArgument(length >= 0, "length >= 0 (%s >= 0)", length);
121+
checkPositionIndexes(from, from + length, bytes.length);
122+
return of(Arrays.copyOfRange(bytes, from, from + length));
123+
}
115124
}

0 commit comments

Comments
 (0)