Skip to content

Commit b95e51d

Browse files
committed
chore: add x-goog-gcs-idempotency-token to BlobReadSession calls
1 parent 03aa3e7 commit b95e51d

2 files changed

Lines changed: 27 additions & 8 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageDataClient.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ ApiFuture<ObjectReadSession> readSession(BidiReadObjectRequest req, GrpcCallCont
5454
"ranges included in the initial request are not supported");
5555
ObjectReadSessionState state = new ObjectReadSessionState(ctx, req);
5656

57+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable =
58+
getCallable();
5759
ObjectReadSessionStream stream =
58-
ObjectReadSessionStream.create(
59-
executor, bidiReadObject, state, retryContextProvider.create());
60+
ObjectReadSessionStream.create(executor, callable, state, retryContextProvider.create());
6061

6162
ApiFuture<ObjectReadSession> objectReadSessionFuture =
6263
ApiFutures.transform(
6364
stream,
6465
nowOpen ->
65-
new ObjectReadSessionImpl(
66-
executor, bidiReadObject, stream, state, retryContextProvider),
66+
new ObjectReadSessionImpl(executor, callable, stream, state, retryContextProvider),
6767
executor);
6868
stream.send(req);
6969
return objectReadSessionFuture;
@@ -79,9 +79,10 @@ <Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSessio
7979
"ranges included in the initial request are not supported");
8080
ObjectReadSessionState state = new ObjectReadSessionState(ctx, openRequest);
8181

82+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable =
83+
getCallable();
8284
ObjectReadSessionStream stream =
83-
ObjectReadSessionStream.create(
84-
executor, bidiReadObject, state, retryContextProvider.create());
85+
ObjectReadSessionStream.create(executor, callable, state, retryContextProvider.create());
8586

8687
long readId = state.newReadId();
8788
ObjectReadSessionStreamRead<Projection> read =
@@ -94,7 +95,7 @@ <Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSessio
9495
nowOpen ->
9596
new FastOpenObjectReadSession<>(
9697
new ObjectReadSessionImpl(
97-
executor, bidiReadObject, stream, state, retryContextProvider),
98+
executor, callable, stream, state, retryContextProvider),
9899
read,
99100
stream),
100101
executor);
@@ -113,6 +114,11 @@ public void close() throws IOException {
113114
}
114115
}
115116

117+
private ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse>
118+
getCallable() {
119+
return bidiReadObject.withDefaultCallContext(Retrying.newCallContext());
120+
}
121+
116122
static StorageDataClient create(
117123
ScheduledExecutorService executor,
118124
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> read,

google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.google.common.hash.Hashing;
5656
import com.google.common.io.BaseEncoding;
5757
import com.google.common.io.ByteStreams;
58+
import com.google.common.truth.Correspondence;
5859
import com.google.protobuf.Any;
5960
import com.google.protobuf.ByteString;
6061
import com.google.protobuf.TextFormat;
@@ -132,8 +133,12 @@ public final class ITObjectReadSessionFakeTest {
132133
private static final byte[] ALL_OBJECT_BYTES = DataGenerator.base64Characters().genBytes(64);
133134
private static final Metadata.Key<String> X_GOOG_REQUEST_PARAMS =
134135
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
136+
private static final Metadata.Key<String> X_GOOG_GCS_IDEMPOTENCY_TOKEN =
137+
Metadata.Key.of("x-goog-gcs-idempotency-token", Metadata.ASCII_STRING_MARSHALLER);
135138
private static final Metadata.Key<String> X_GOOG_USER_PROJECT =
136139
Metadata.Key.of("x-goog-user-project", Metadata.ASCII_STRING_MARSHALLER);
140+
private static final Correspondence<String, UUID> IS_UUID =
141+
Correspondence.transforming(UUID::fromString, "is a UUID");
137142

138143
/**
139144
*
@@ -796,7 +801,7 @@ public void validateReadRemovedFromStateWhenFailed() throws Exception {
796801
.hasCauseThat()
797802
.isInstanceOf(UncheckedChecksumMismatchException.class);
798803

799-
ObjectReadSessionStreamRead outstandingRead = orsi.state.getOutstandingRead(1L);
804+
ObjectReadSessionStreamRead<?> outstandingRead = orsi.state.getOutstandingRead(1L);
800805
assertThat(outstandingRead).isNull();
801806
}
802807
}
@@ -862,6 +867,14 @@ public void requestOptionsShouldBePresentInRequest() throws Exception {
862867
requestAuditing
863868
.assertRequestHeader(X_GOOG_REQUEST_PARAMS)
864869
.contains("bucket=" + METADATA.getBucket()),
870+
() -> requestAuditing.assertRequestHeader(X_GOOG_GCS_IDEMPOTENCY_TOKEN).hasSize(1),
871+
() -> {
872+
// make sure we get a UUID in our header
873+
requestAuditing
874+
.assertRequestHeader(X_GOOG_GCS_IDEMPOTENCY_TOKEN)
875+
.comparingElementsUsing(IS_UUID)
876+
.doesNotContain(UUID.randomUUID());
877+
},
865878
() ->
866879
requestAuditing
867880
.assertRequestHeader(X_GOOG_USER_PROJECT)

0 commit comments

Comments
 (0)