Skip to content

Commit 65c8808

Browse files
authored
fix: update grpc single-shot uploads to validate ack'd object size (#2567)
Follow up to #2527 This updated errors from grpc single-shot uploads to have messages that include useful debugging information. When a response is received but doesn't validate with the expected state: ``` com.google.cloud.storage.StorageException: Finalized upload, but object size less than expected. |> [ |> com.google.storage.v2.WriteObjectRequest{ |> write_offset: 524288 |> finish_write: true |> } |> ] | |< com.google.storage.v2.WriteObjectResponse{ |< resource { |< name: "obj" |< size: 262144 |< } |< } | ```
1 parent f3c28f6 commit 65c8808

5 files changed

Lines changed: 287 additions & 122 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java

Lines changed: 117 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
23+
import com.google.api.gax.rpc.ApiException;
2324
import com.google.api.gax.rpc.ApiStreamObserver;
2425
import com.google.api.gax.rpc.ClientStreamingCallable;
2526
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
2627
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2728
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2829
import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory;
30+
import com.google.common.collect.ImmutableList;
2931
import com.google.protobuf.ByteString;
3032
import com.google.storage.v2.ChecksummedData;
3133
import com.google.storage.v2.ObjectChecksums;
@@ -34,11 +36,7 @@
3436
import java.io.IOException;
3537
import java.nio.ByteBuffer;
3638
import java.nio.channels.ClosedChannelException;
37-
import java.util.ArrayList;
38-
import java.util.List;
3939
import java.util.concurrent.ExecutionException;
40-
import java.util.function.Consumer;
41-
import java.util.function.LongConsumer;
4240
import org.checkerframework.checker.nullness.qual.NonNull;
4341

4442
final class GapicUnbufferedDirectWritableByteChannel implements UnbufferedWritableByteChannel {
@@ -55,34 +53,78 @@ final class GapicUnbufferedDirectWritableByteChannel implements UnbufferedWritab
5553
private boolean open = true;
5654
private boolean first = true;
5755
private boolean finished = false;
56+
private volatile WriteObjectRequest lastWrittenRequest;
5857

5958
GapicUnbufferedDirectWritableByteChannel(
6059
SettableApiFuture<WriteObjectResponse> resultFuture,
6160
ChunkSegmenter chunkSegmenter,
6261
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write,
63-
SimpleWriteObjectRequestBuilderFactory requestFactory) {
64-
String bucketName = requestFactory.bucketName();
62+
WriteCtx<SimpleWriteObjectRequestBuilderFactory> writeCtx) {
63+
String bucketName = writeCtx.getRequestFactory().bucketName();
6564
this.resultFuture = resultFuture;
6665
this.chunkSegmenter = chunkSegmenter;
6766

6867
GrpcCallContext internalContext =
6968
contextWithBucketName(bucketName, GrpcCallContext.createDefault());
7069
this.write = write.withDefaultCallContext(internalContext);
7170

72-
this.writeCtx = new WriteCtx<>(requestFactory);
73-
this.responseObserver = new Observer(writeCtx.getConfirmedBytes()::set, resultFuture::set);
71+
this.writeCtx = writeCtx;
72+
this.responseObserver = new Observer(internalContext);
7473
}
7574

7675
@Override
7776
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
78-
return internalWrite(srcs, srcsOffset, srcsLength, false);
79-
}
77+
if (!open) {
78+
throw new ClosedChannelException();
79+
}
8080

81-
@Override
82-
public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
83-
long write = internalWrite(srcs, srcsOffset, srcsLength, true);
84-
close();
85-
return write;
81+
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength);
82+
if (data.length == 0) {
83+
return 0;
84+
}
85+
86+
try {
87+
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
88+
int bytesConsumed = 0;
89+
for (ChunkSegment datum : data) {
90+
Crc32cLengthKnown crc32c = datum.getCrc32c();
91+
ByteString b = datum.getB();
92+
int contentSize = b.size();
93+
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
94+
Crc32cLengthKnown cumulative =
95+
writeCtx
96+
.getCumulativeCrc32c()
97+
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
98+
ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
99+
if (crc32c != null) {
100+
checksummedData.setCrc32C(crc32c.getValue());
101+
}
102+
WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder();
103+
if (!first) {
104+
builder.clearWriteObjectSpec();
105+
builder.clearObjectChecksums();
106+
}
107+
builder.setWriteOffset(offset).setChecksummedData(checksummedData.build());
108+
if (!datum.isOnlyFullBlocks()) {
109+
builder.setFinishWrite(true);
110+
if (cumulative != null) {
111+
builder.setObjectChecksums(
112+
ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
113+
}
114+
finished = true;
115+
}
116+
117+
WriteObjectRequest build = builder.build();
118+
first = false;
119+
bytesConsumed += contentSize;
120+
lastWrittenRequest = build;
121+
openedStream.onNext(build);
122+
}
123+
return bytesConsumed;
124+
} catch (RuntimeException e) {
125+
resultFuture.setException(e);
126+
throw e;
127+
}
86128
}
87129

88130
@Override
@@ -95,6 +137,7 @@ public void close() throws IOException {
95137
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
96138
if (!finished) {
97139
WriteObjectRequest message = finishMessage();
140+
lastWrittenRequest = message;
98141
try {
99142
openedStream.onNext(message);
100143
openedStream.onCompleted();
@@ -115,79 +158,22 @@ public void close() throws IOException {
115158
responseObserver.await();
116159
}
117160

118-
private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize)
119-
throws ClosedChannelException {
120-
if (!open) {
121-
throw new ClosedChannelException();
122-
}
123-
124-
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength);
125-
126-
List<WriteObjectRequest> messages = new ArrayList<>();
127-
128-
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
129-
int bytesConsumed = 0;
130-
for (ChunkSegment datum : data) {
131-
Crc32cLengthKnown crc32c = datum.getCrc32c();
132-
ByteString b = datum.getB();
133-
int contentSize = b.size();
134-
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
135-
Crc32cLengthKnown cumulative =
136-
writeCtx
137-
.getCumulativeCrc32c()
138-
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
139-
ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
140-
if (crc32c != null) {
141-
checksummedData.setCrc32C(crc32c.getValue());
142-
}
143-
WriteObjectRequest.Builder builder =
144-
writeCtx
145-
.newRequestBuilder()
146-
.setWriteOffset(offset)
147-
.setChecksummedData(checksummedData.build());
148-
if (!datum.isOnlyFullBlocks()) {
149-
builder.setFinishWrite(true);
150-
if (cumulative != null) {
151-
builder.setObjectChecksums(
152-
ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
153-
}
154-
finished = true;
155-
}
156-
157-
WriteObjectRequest build = possiblyPairDownRequest(builder, first).build();
158-
first = false;
159-
messages.add(build);
160-
bytesConsumed += contentSize;
161-
}
162-
if (finalize && !finished) {
163-
messages.add(finishMessage());
164-
finished = true;
165-
}
166-
167-
try {
168-
for (WriteObjectRequest message : messages) {
169-
openedStream.onNext(message);
170-
}
171-
} catch (RuntimeException e) {
172-
resultFuture.setException(e);
173-
throw e;
174-
}
175-
176-
return bytesConsumed;
177-
}
178-
179161
@NonNull
180162
private WriteObjectRequest finishMessage() {
181163
long offset = writeCtx.getTotalSentBytes().get();
182164
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
183165

184-
WriteObjectRequest.Builder b =
185-
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
166+
WriteObjectRequest.Builder b = writeCtx.newRequestBuilder();
167+
if (!first) {
168+
b.clearWriteObjectSpec();
169+
b.clearObjectChecksums();
170+
first = false;
171+
}
172+
b.setFinishWrite(true).setWriteOffset(offset);
186173
if (crc32cValue != null) {
187174
b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
188175
}
189-
WriteObjectRequest message = possiblyPairDownRequest(b, first).build();
190-
return message;
176+
return b.build();
191177
}
192178

193179
private ApiStreamObserver<WriteObjectRequest> openedStream() {
@@ -201,48 +187,20 @@ private ApiStreamObserver<WriteObjectRequest> openedStream() {
201187
return stream;
202188
}
203189

204-
/**
205-
* Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
206-
* this utility method centralizes the logic necessary to clear those fields for use by subsequent
207-
* messages.
208-
*/
209-
private static WriteObjectRequest.Builder possiblyPairDownRequest(
210-
WriteObjectRequest.Builder b, boolean firstMessageOfStream) {
211-
if (firstMessageOfStream && b.getWriteOffset() == 0) {
212-
return b;
213-
}
214-
if (b.getWriteOffset() > 0) {
215-
b.clearWriteObjectSpec();
216-
}
217-
218-
if (b.getWriteOffset() > 0 && !b.getFinishWrite()) {
219-
b.clearObjectChecksums();
220-
}
221-
return b;
222-
}
223-
224-
static class Observer implements ApiStreamObserver<WriteObjectResponse> {
190+
class Observer implements ApiStreamObserver<WriteObjectResponse> {
225191

226-
private final LongConsumer sizeCallback;
227-
private final Consumer<WriteObjectResponse> completeCallback;
192+
private final GrpcCallContext context;
228193

229194
private final SettableApiFuture<Void> invocationHandle;
230195
private volatile WriteObjectResponse last;
231196

232-
Observer(LongConsumer sizeCallback, Consumer<WriteObjectResponse> completeCallback) {
233-
this.sizeCallback = sizeCallback;
234-
this.completeCallback = completeCallback;
197+
Observer(GrpcCallContext context) {
198+
this.context = context;
235199
this.invocationHandle = SettableApiFuture.create();
236200
}
237201

238202
@Override
239203
public void onNext(WriteObjectResponse value) {
240-
// incremental update
241-
if (value.hasPersistedSize()) {
242-
sizeCallback.accept(value.getPersistedSize());
243-
} else if (value.hasResource()) {
244-
sizeCallback.accept(value.getResource().getSize());
245-
}
246204
last = value;
247205
}
248206

@@ -257,15 +215,58 @@ public void onNext(WriteObjectResponse value) {
257215
*/
258216
@Override
259217
public void onError(Throwable t) {
260-
invocationHandle.setException(t);
218+
if (t instanceof ApiException) {
219+
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
220+
// things fall in line with our retry handlers.
221+
// This is suboptimal, as it will initialize a second exception, however this is the
222+
// unusual case, and it should not cause a significant overhead given its rarity.
223+
StorageException tmp = StorageException.asStorageException((ApiException) t);
224+
StorageException storageException =
225+
ResumableSessionFailureScenario.toStorageException(
226+
tmp.getCode(), tmp.getMessage(), tmp.getReason(), getRequests(), null, context, t);
227+
invocationHandle.setException(storageException);
228+
} else {
229+
invocationHandle.setException(t);
230+
}
261231
}
262232

263233
@Override
264234
public void onCompleted() {
265-
if (last != null && last.hasResource()) {
266-
completeCallback.accept(last);
235+
try {
236+
if (last == null) {
237+
throw new StorageException(
238+
0, "onComplete without preceding onNext, unable to determine success.");
239+
} else if (last.hasResource()) {
240+
long totalSentBytes = writeCtx.getTotalSentBytes().get();
241+
long finalSize = last.getResource().getSize();
242+
if (totalSentBytes == finalSize) {
243+
writeCtx.getConfirmedBytes().set(finalSize);
244+
resultFuture.set(last);
245+
} else if (finalSize < totalSentBytes) {
246+
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
247+
getRequests(), last, context, null);
248+
} else {
249+
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(
250+
getRequests(), last, context, null);
251+
}
252+
} else {
253+
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
254+
getRequests(), last, context, null);
255+
}
256+
} catch (Throwable se) {
257+
open = false;
258+
invocationHandle.setException(se);
259+
} finally {
260+
invocationHandle.set(null);
261+
}
262+
}
263+
264+
private @NonNull ImmutableList<@NonNull WriteObjectRequest> getRequests() {
265+
if (lastWrittenRequest == null) {
266+
return ImmutableList.of();
267+
} else {
268+
return ImmutableList.of(lastWrittenRequest);
267269
}
268-
invocationHandle.set(null);
269270
}
270271

271272
void await() {

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
185185
resultFuture,
186186
getChunkSegmenter(),
187187
write,
188-
WriteObjectRequestBuilderFactory.simple(start)))
188+
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
189189
.andThen(StorageByteChannels.writable()::createSynchronized));
190190
}
191191
}
@@ -213,7 +213,7 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
213213
resultFuture,
214214
getChunkSegmenter(),
215215
write,
216-
WriteObjectRequestBuilderFactory.simple(start)))
216+
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
217217
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
218218
.andThen(StorageByteChannels.writable()::createSynchronized));
219219
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ enum ResumableSessionFailureScenario {
6464
SCENARIO_4_1(
6565
BaseServiceException.UNKNOWN_CODE,
6666
"dataLoss",
67-
"Finalized resumable session, but object size less than expected."),
67+
"Finalized upload, but object size less than expected."),
6868
SCENARIO_4_2(
6969
BaseServiceException.UNKNOWN_CODE,
7070
"dataLoss",
71-
"Finalized resumable session, but object size greater than expected."),
71+
"Finalized upload, but object size greater than expected."),
7272
SCENARIO_5(
7373
BaseServiceException.UNKNOWN_CODE,
7474
"dataLoss",

0 commit comments

Comments
 (0)