Skip to content

Commit 35c2fbf

Browse files
committed
chore: update BlobDescriptor implementation to handle redirect when opening
1 parent e5e1107 commit 35c2fbf

3 files changed

Lines changed: 202 additions & 60 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 17 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,17 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.SettableApiFuture;
2121
import com.google.api.gax.grpc.GrpcCallContext;
22-
import com.google.api.gax.rpc.ApiException;
2322
import com.google.api.gax.rpc.BidiStreamingCallable;
2423
import com.google.api.gax.rpc.ClientStream;
25-
import com.google.api.gax.rpc.ErrorDetails;
2624
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2725
import com.google.api.gax.rpc.StreamController;
2826
import com.google.common.base.Preconditions;
29-
import com.google.protobuf.Any;
30-
import com.google.protobuf.InvalidProtocolBufferException;
31-
import com.google.rpc.Status;
3227
import com.google.storage.v2.BidiReadHandle;
3328
import com.google.storage.v2.BidiReadObjectRedirectedError;
3429
import com.google.storage.v2.BidiReadObjectRequest;
3530
import com.google.storage.v2.BidiReadObjectResponse;
3631
import com.google.storage.v2.Object;
3732
import com.google.storage.v2.ObjectRangeData;
38-
import io.grpc.Metadata;
39-
import io.grpc.StatusRuntimeException;
40-
import io.grpc.protobuf.ProtoUtils;
4133
import java.io.IOException;
4234
import java.util.List;
4335
import java.util.concurrent.ExecutionException;
@@ -48,14 +40,6 @@
4840
final class BlobDescriptorStream
4941
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, AutoCloseable {
5042

51-
private static final Metadata.Key<BidiReadObjectRedirectedError> REDIRECT_KEY =
52-
Metadata.Key.of(
53-
"redirect-bin",
54-
ProtoUtils.metadataMarshaller(BidiReadObjectRedirectedError.getDefaultInstance()));
55-
static final Metadata.Key<Status> GRPC_STATUS_DETAILS_KEY =
56-
Metadata.Key.of(
57-
"grpc-status-details-bin", ProtoUtils.metadataMarshaller(Status.getDefaultInstance()));
58-
5943
private final SettableApiFuture<Void> blobDescriptorResolveFuture;
6044

6145
private final BlobDescriptorState state;
@@ -273,42 +257,16 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
273257

274258
@Override
275259
protected void onErrorImpl(Throwable t) {
276-
if (t instanceof ApiException) {
277-
ApiException apiE = (ApiException) t;
278-
// https://cloud.google.com/apis/design/errors
279-
ErrorDetails errorDetails = apiE.getErrorDetails();
280-
281-
t = t.getCause();
282-
}
283-
if (t instanceof StatusRuntimeException) {
284-
StatusRuntimeException sre = (StatusRuntimeException) t;
285-
Metadata trailers = sre.getTrailers();
286-
if (trailers != null) {
287-
Status status = trailers.get(GRPC_STATUS_DETAILS_KEY);
288-
if (status != null) {
289-
290-
List<Any> detailsList = status.getDetailsList();
291-
for (Any any : detailsList) {
292-
if (any.is(BidiReadObjectRedirectedError.class)) {
293-
try {
294-
BidiReadObjectRedirectedError bidiReadObjectRedirectedError =
295-
any.unpack(BidiReadObjectRedirectedError.class);
296-
if (bidiReadObjectRedirectedError.hasReadHandle()) {
297-
state.setBidiReadHandle(bidiReadObjectRedirectedError.getReadHandle());
298-
}
299-
if (bidiReadObjectRedirectedError.hasRoutingToken()) {
300-
state.setRoutingToken(bidiReadObjectRedirectedError.getRoutingToken());
301-
}
302-
303-
executor.execute(BlobDescriptorStream.this::restart);
304-
break;
305-
} catch (InvalidProtocolBufferException e) {
306-
// ignore it, falling back to regular retry behavior
307-
}
308-
}
309-
}
310-
}
260+
BidiReadObjectRedirectedError bidiReadObjectRedirectedError =
261+
GrpcUtils.getBidiReadObjectRedirectedError(t);
262+
if (bidiReadObjectRedirectedError != null) {
263+
if (bidiReadObjectRedirectedError.hasReadHandle()) {
264+
state.setBidiReadHandle(bidiReadObjectRedirectedError.getReadHandle());
311265
}
266+
if (bidiReadObjectRedirectedError.hasRoutingToken()) {
267+
state.setRoutingToken(bidiReadObjectRedirectedError.getRoutingToken());
268+
}
269+
executor.execute(BlobDescriptorStream.this::restart);
312270
}
313271
}
314272

@@ -342,10 +300,14 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
342300

343301
@Override
344302
protected void onErrorImpl(Throwable t) {
345-
delegate.onError(t);
346-
blobDescriptorResolveFuture.setException(t);
347-
openSignal.setException(t);
348-
closeSignal.setException(t);
303+
if (GrpcUtils.isBidiReadObjectRedirect(t)) {
304+
delegate.onError(t);
305+
} else {
306+
delegate.onError(t);
307+
blobDescriptorResolveFuture.setException(t);
308+
openSignal.setException(t);
309+
closeSignal.setException(t);
310+
}
349311
}
350312

351313
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,30 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.gax.grpc.GrpcCallContext;
20+
import com.google.api.gax.rpc.ApiException;
2021
import com.google.common.collect.ImmutableList;
2122
import com.google.common.collect.ImmutableMap;
23+
import com.google.protobuf.Any;
24+
import com.google.protobuf.InvalidProtocolBufferException;
25+
import com.google.rpc.Status;
26+
import com.google.storage.v2.BidiReadObjectRedirectedError;
27+
import io.grpc.Metadata;
28+
import io.grpc.StatusRuntimeException;
29+
import io.grpc.protobuf.ProtoUtils;
2230
import java.io.Closeable;
2331
import java.io.IOException;
2432
import java.util.Collection;
33+
import java.util.List;
2534
import java.util.Locale;
2635
import java.util.Objects;
36+
import org.checkerframework.checker.nullness.qual.Nullable;
2737

2838
final class GrpcUtils {
2939

40+
static final Metadata.Key<Status> GRPC_STATUS_DETAILS_KEY =
41+
Metadata.Key.of(
42+
"grpc-status-details-bin", ProtoUtils.metadataMarshaller(Status.getDefaultInstance()));
43+
3044
private GrpcUtils() {}
3145

3246
static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext baseContext) {
@@ -72,4 +86,67 @@ static <C extends Closeable> void closeAll(Collection<C> closeables) throws IOEx
7286
throw ioException;
7387
}
7488
}
89+
90+
/**
91+
* Returns {@code true} if the throwable is or is caused by a {@link StatusRuntimeException} that
92+
* contains trailers, the trailers contain an entry {@code grpc-status-details-bin}, which
93+
* contains a valid {@link Status}, and the status contains an entry in its details that is a
94+
* {@link BidiReadObjectRedirectedError} (evaluated from index 0 to length). {@code false}
95+
* otherwise.
96+
*/
97+
static boolean isBidiReadObjectRedirect(Throwable t) {
98+
if (t instanceof ApiException) {
99+
t = t.getCause();
100+
}
101+
if (t instanceof StatusRuntimeException) {
102+
StatusRuntimeException sre = (StatusRuntimeException) t;
103+
Metadata trailers = sre.getTrailers();
104+
if (trailers != null) {
105+
Status status = trailers.get(GRPC_STATUS_DETAILS_KEY);
106+
if (status != null) {
107+
List<Any> detailsList = status.getDetailsList();
108+
for (Any any : detailsList) {
109+
if (any.is(BidiReadObjectRedirectedError.class)) {
110+
return true;
111+
}
112+
}
113+
}
114+
}
115+
}
116+
return false;
117+
}
118+
119+
/**
120+
* Returns the first occurrence of a {@link BidiReadObjectRedirectedError} if the throwable is or
121+
* is caused by a {@link StatusRuntimeException} that contains trailers, the trailers contain an
122+
* entry {@code grpc-status-details-bin}, which contains a valid {@link Status}, and the status
123+
* contains an entry in its details that is a {@link BidiReadObjectRedirectedError} (evaluated
124+
* from index 0 to length). {@code null} otherwise.
125+
*/
126+
@Nullable
127+
static BidiReadObjectRedirectedError getBidiReadObjectRedirectedError(Throwable t) {
128+
if (isBidiReadObjectRedirect(t)) {
129+
if (t instanceof ApiException) {
130+
t = t.getCause();
131+
}
132+
if (t instanceof StatusRuntimeException) {
133+
StatusRuntimeException sre = (StatusRuntimeException) t;
134+
Metadata trailers = sre.getTrailers();
135+
//noinspection DataFlowIssue guarded by isBidiReadObjectRedirect
136+
Status status = trailers.get(GRPC_STATUS_DETAILS_KEY);
137+
//noinspection DataFlowIssue guarded by isBidiReadObjectRedirect
138+
List<Any> detailsList = status.getDetailsList();
139+
for (Any any : detailsList) {
140+
if (any.is(BidiReadObjectRedirectedError.class)) {
141+
try {
142+
return any.unpack(BidiReadObjectRedirectedError.class);
143+
} catch (InvalidProtocolBufferException e) {
144+
// ignore it, falling back to regular retry behavior
145+
}
146+
}
147+
}
148+
}
149+
}
150+
return null;
151+
}
75152
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorFakeTest.java

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@
4545

4646
public final class ITBlobDescriptorFakeTest {
4747

48-
static final Metadata.Key<byte[]> GRPC_STATUS_DETAILS_KEY =
49-
Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);
50-
5148
/**
5249
*
5350
*
@@ -134,7 +131,7 @@ public void onNext(BidiReadObjectRequest value) {
134131
.build();
135132

136133
Metadata trailers = new Metadata();
137-
trailers.put(GRPC_STATUS_DETAILS_KEY, grpcStatusDetails.toByteArray());
134+
trailers.put(GrpcUtils.GRPC_STATUS_DETAILS_KEY, grpcStatusDetails);
138135
StatusRuntimeException statusRuntimeException =
139136
Status.UNAVAILABLE.withDescription("redirect").asRuntimeException(trailers);
140137
respond.onError(statusRuntimeException);
@@ -171,7 +168,7 @@ public void onCompleted() {
171168
BlobId id = BlobId.of("b", "o");
172169
ApiFuture<BlobDescriptor> futureBlobDescriptor = storage.getBlobDescriptor(id);
173170

174-
try (BlobDescriptor bd = futureBlobDescriptor.get(300, TimeUnit.SECONDS)) {
171+
try (BlobDescriptor bd = futureBlobDescriptor.get(5, TimeUnit.SECONDS)) {
175172
byte[] actual =
176173
bd.readRangeAsBytes(ByteRangeSpec.relativeLength(10L, 10L)).get(1, TimeUnit.SECONDS);
177174

@@ -180,6 +177,112 @@ public void onCompleted() {
180177
}
181178
}
182179

180+
/**
181+
*
182+
*
183+
* <ol>
184+
* <li>Attempt to open blob descriptor
185+
* <li>server responds with a redirect
186+
* <li>expect a new stream open with the specified redirect token
187+
* </ol>
188+
*/
189+
@Test
190+
public void bidiReadObjectRedirectedError_onOpen() throws Exception {
191+
String routingToken = UUID.randomUUID().toString();
192+
BidiReadHandle readHandle =
193+
BidiReadHandle.newBuilder()
194+
.setHandle(ByteString.copyFromUtf8(UUID.randomUUID().toString()))
195+
.build();
196+
BidiReadObjectRequest req1 =
197+
BidiReadObjectRequest.newBuilder()
198+
.setReadObjectSpec(
199+
BidiReadObjectSpec.newBuilder()
200+
.setBucket("projects/_/buckets/b")
201+
.setObject("o")
202+
.build())
203+
.build();
204+
BidiReadObjectRequest req2 =
205+
BidiReadObjectRequest.newBuilder()
206+
.setReadObjectSpec(
207+
BidiReadObjectSpec.newBuilder()
208+
.setBucket("projects/_/buckets/b")
209+
.setObject("o")
210+
.setReadHandle(readHandle)
211+
.setRoutingToken(routingToken)
212+
.build())
213+
.build();
214+
215+
BidiReadObjectResponse res1 =
216+
BidiReadObjectResponse.newBuilder()
217+
.setMetadata(Object.newBuilder().setBucket("b").setName("o").setGeneration(1).build())
218+
.build();
219+
220+
StorageImplBase fake =
221+
new StorageImplBase() {
222+
@Override
223+
public StreamObserver<BidiReadObjectRequest> bidiReadObject(
224+
StreamObserver<BidiReadObjectResponse> respond) {
225+
return new StreamObserver<BidiReadObjectRequest>() {
226+
@Override
227+
public void onNext(BidiReadObjectRequest value) {
228+
if (req1.equals(value)) {
229+
BidiReadObjectRedirectedError redirect =
230+
BidiReadObjectRedirectedError.newBuilder()
231+
.setReadHandle(readHandle)
232+
.setRoutingToken(routingToken)
233+
.build();
234+
235+
com.google.rpc.Status grpcStatusDetails =
236+
com.google.rpc.Status.newBuilder()
237+
.setCode(com.google.rpc.Code.UNAVAILABLE_VALUE)
238+
.setMessage("redirect")
239+
.addDetails(Any.pack(redirect))
240+
.build();
241+
242+
Metadata trailers = new Metadata();
243+
trailers.put(GrpcUtils.GRPC_STATUS_DETAILS_KEY, grpcStatusDetails);
244+
StatusRuntimeException statusRuntimeException =
245+
Status.UNAVAILABLE.withDescription("redirect").asRuntimeException(trailers);
246+
respond.onError(statusRuntimeException);
247+
} else if (req2.equals(value)) {
248+
respond.onNext(res1);
249+
} else {
250+
respond.onError(TestUtils.apiException(Code.UNIMPLEMENTED, "Unexpected request"));
251+
}
252+
}
253+
254+
@Override
255+
public void onError(Throwable t) {
256+
respond.onError(t);
257+
}
258+
259+
@Override
260+
public void onCompleted() {
261+
respond.onCompleted();
262+
}
263+
};
264+
}
265+
};
266+
267+
try (FakeServer fakeServer = FakeServer.of(fake);
268+
Storage storage =
269+
fakeServer
270+
.getGrpcStorageOptions()
271+
.toBuilder()
272+
.setGrpcInterceptorProvider(
273+
GrpcPlainRequestLoggingInterceptor.getInterceptorProvider())
274+
.build()
275+
.getService()) {
276+
277+
BlobId id = BlobId.of("b", "o");
278+
ApiFuture<BlobDescriptor> futureBlobDescriptor = storage.getBlobDescriptor(id);
279+
280+
try (BlobDescriptor bd = futureBlobDescriptor.get(5, TimeUnit.SECONDS)) {
281+
assertThat(bd).isNotNull();
282+
}
283+
}
284+
}
285+
183286
private static ReadRange getReadRange(int readId, int readOffset, int readLimit) {
184287
return ReadRange.newBuilder()
185288
.setReadId(readId)

0 commit comments

Comments
 (0)