Skip to content

Commit fac7839

Browse files
committed
chore: update BlobAppendableUploadConfig to validate crc32c by default
Remove sending of ObjectChecksums from GapicBidiUnbufferedAppendableWritableByteChannel. If a takeover is performed we would have to re-constitute the overall crc32c (while possible it is also slow in java today). Instead, we are relying on the ChecksummedData level crc32c validation.
1 parent 8332e1b commit fac7839

5 files changed

Lines changed: 145 additions & 26 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static java.util.Objects.requireNonNull;
2121

2222
import com.google.api.core.BetaApi;
23+
import com.google.api.core.InternalApi;
2324
import com.google.cloud.storage.Storage.BlobWriteOption;
2425
import com.google.cloud.storage.TransportCompatibility.Transport;
2526
import javax.annotation.concurrent.Immutable;
@@ -38,12 +39,14 @@
3839
public final class BlobAppendableUploadConfig {
3940

4041
private static final BlobAppendableUploadConfig INSTANCE =
41-
new BlobAppendableUploadConfig(FlushPolicy.minFlushSize(_256KiB));
42+
new BlobAppendableUploadConfig(FlushPolicy.minFlushSize(_256KiB), Hasher.enabled());
4243

4344
private final FlushPolicy flushPolicy;
45+
private final Hasher hasher;
4446

45-
private BlobAppendableUploadConfig(FlushPolicy flushPolicy) {
47+
private BlobAppendableUploadConfig(FlushPolicy flushPolicy, Hasher hasher) {
4648
this.flushPolicy = flushPolicy;
49+
this.hasher = hasher;
4750
}
4851

4952
/**
@@ -74,7 +77,44 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
7477
if (this.flushPolicy.equals(flushPolicy)) {
7578
return this;
7679
}
77-
return new BlobAppendableUploadConfig(flushPolicy);
80+
return new BlobAppendableUploadConfig(flushPolicy, hasher);
81+
}
82+
83+
/**
84+
* Whether crc32c validation will be performed for bytes returned by Google Cloud Storage
85+
*
86+
* <p><i>Default:</i> {@code true}
87+
*
88+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
89+
*/
90+
@BetaApi
91+
boolean getCrc32cValidationEnabled() {
92+
return Hasher.enabled().equals(hasher);
93+
}
94+
95+
/**
96+
* Return an instance with crc32c validation enabled based on {@code enabled}.
97+
*
98+
* <p><i>Default:</i> {@code true}
99+
*
100+
* @param enabled Whether crc32c validation will be performed for bytes returned by Google Cloud
101+
* Storage
102+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
103+
*/
104+
@BetaApi
105+
BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
106+
if (enabled && Hasher.enabled().equals(hasher)) {
107+
return this;
108+
} else if (!enabled && Hasher.noop().equals(hasher)) {
109+
return this;
110+
}
111+
return new BlobAppendableUploadConfig(flushPolicy, enabled ? Hasher.enabled() : Hasher.noop());
112+
}
113+
114+
/** Never to be made public until {@link Hasher} is public */
115+
@InternalApi
116+
Hasher getHasher() {
117+
return hasher;
78118
}
79119

80120
/**

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.google.storage.v2.ChecksummedData;
4545
import com.google.storage.v2.GetObjectRequest;
4646
import com.google.storage.v2.Object;
47-
import com.google.storage.v2.ObjectChecksums;
4847
import java.io.IOException;
4948
import java.nio.ByteBuffer;
5049
import java.nio.channels.ClosedChannelException;
@@ -222,10 +221,6 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength)
222221
ByteString b = datum.getB();
223222
int contentSize = b.size();
224223
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
225-
Crc32cLengthKnown cumulative =
226-
writeCtx
227-
.getCumulativeCrc32c()
228-
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
229224
ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
230225
if (crc32c != null) {
231226
checksummedData.setCrc32C(crc32c.getValue());
@@ -241,13 +236,6 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength)
241236
}
242237
builder.setWriteOffset(offset).setChecksummedData(checksummedData.build());
243238

244-
if (!datum.isOnlyFullBlocks()) {
245-
if (cumulative != null) {
246-
builder.setObjectChecksums(
247-
ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
248-
}
249-
}
250-
251239
if (i == data.length - 1) {
252240
builder.setFlush(true).setStateLookup(true);
253241
}
@@ -274,16 +262,12 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength)
274262
@NonNull
275263
private BidiWriteObjectRequest finishMessage() {
276264
long offset = writeCtx.getTotalSentBytes().get();
277-
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
278265

279266
BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder();
280267

281268
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec().clearAppendObjectSpec();
282269

283270
b.setFinishWrite(true).setWriteOffset(offset);
284-
if (crc32cValue != null) {
285-
b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
286-
}
287271
BidiWriteObjectRequest message = b.build();
288272
return message;
289273
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1438,7 +1438,7 @@ public BlobAppendableUpload blobAppendableUpload(
14381438
ResumableMedia.gapic()
14391439
.write()
14401440
.bidiByteChannel(storageClient.bidiWriteObjectCallable())
1441-
.setHasher(Hasher.noop())
1441+
.setHasher(uploadConfig.getHasher())
14421442
.setByteStringStrategy(ByteStringStrategy.copy())
14431443
.appendable()
14441444
.withRetryConfig(

google-cloud-storage/src/test/java/com/google/cloud/storage/DataGenerator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,13 @@ public void fill(ByteBuffer buf) {
211211
@Override
212212
public void fill(byte[] b, int offset, int length) {
213213
int curr = offset;
214-
int rem;
215-
while ((rem = length - curr) > 0) {
214+
int rem = length;
215+
do {
216216
int min = Math.min(rem, base64Characters.length);
217217
System.arraycopy(base64Characters, 0, b, curr, min);
218218
curr += min;
219-
}
219+
rem -= min;
220+
} while (rem > 0);
220221
}
221222
}
222223

@@ -240,13 +241,13 @@ public void fill(ByteBuffer b) {
240241

241242
@Override
242243
public void fill(byte[] b, int offset, int length) {
243-
int i = offset;
244+
int i = 0;
244245
while (i < length) {
245246
byte b1 = (byte) rand.nextInt(Byte.MAX_VALUE);
246247
if (b1 == 0x00) { // exclude nul sequence
247248
continue;
248249
}
249-
b[i] = b1;
250+
b[offset + i] = b1;
250251
i++;
251252
}
252253
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.storage.v2.ChecksummedData;
4040
import com.google.storage.v2.GetObjectRequest;
4141
import com.google.storage.v2.Object;
42+
import com.google.storage.v2.ObjectChecksums;
4243
import com.google.storage.v2.StorageClient;
4344
import com.google.storage.v2.StorageGrpc;
4445
import com.google.storage.v2.WriteObjectSpec;
@@ -87,7 +88,9 @@ public class ITAppendableUploadFakeTest {
8788
.build();
8889

8990
private static final BlobAppendableUploadConfig UPLOAD_CONFIG =
90-
BlobAppendableUploadConfig.of().withFlushPolicy(FlushPolicy.maxFlushSize(5));
91+
BlobAppendableUploadConfig.of()
92+
.withFlushPolicy(FlushPolicy.maxFlushSize(5))
93+
.withCrc32cValidationEnabled(false);
9194

9295
/**
9396
*
@@ -1491,6 +1494,97 @@ req1, retryableErrorOnce(req1, res, map, 2), req2, maxRetries(req2, res, map, 1)
14911494
}
14921495
}
14931496

1497+
@Test
1498+
public void crc32cWorks() throws Exception {
1499+
byte[] b = new byte[25];
1500+
DataGenerator.base64Characters().fill(b, 0, 20);
1501+
DataGenerator.base64Characters().fill(b, 20, 5);
1502+
ChecksummedTestContent abcde = ChecksummedTestContent.of(b, 0, 5);
1503+
ChecksummedTestContent fghij = ChecksummedTestContent.of(b, 5, 5);
1504+
ChecksummedTestContent klmno = ChecksummedTestContent.of(b, 10, 5);
1505+
ChecksummedTestContent pqrst = ChecksummedTestContent.of(b, 15, 5);
1506+
ChecksummedTestContent all = ChecksummedTestContent.of(b);
1507+
1508+
BidiWriteObjectRequest req1 =
1509+
BidiWriteObjectRequest.newBuilder()
1510+
.setWriteOffset(0)
1511+
.setWriteObjectSpec(REQ_OPEN.getWriteObjectSpec())
1512+
.setChecksummedData(abcde.asChecksummedData())
1513+
.setFlush(true)
1514+
.setStateLookup(true)
1515+
.build();
1516+
BidiWriteObjectResponse res1 = incrementalResponse(5);
1517+
1518+
BidiWriteObjectRequest req2 =
1519+
BidiWriteObjectRequest.newBuilder()
1520+
.setWriteOffset(5)
1521+
.setChecksummedData(fghij.asChecksummedData())
1522+
.setFlush(true)
1523+
.setStateLookup(true)
1524+
.build();
1525+
BidiWriteObjectResponse res2 = incrementalResponse(10);
1526+
BidiWriteObjectRequest req3 =
1527+
BidiWriteObjectRequest.newBuilder()
1528+
.setWriteOffset(10)
1529+
.setChecksummedData(klmno.asChecksummedData())
1530+
.setFlush(true)
1531+
.setStateLookup(true)
1532+
.build();
1533+
BidiWriteObjectResponse res3 = incrementalResponse(15);
1534+
BidiWriteObjectRequest req4 =
1535+
BidiWriteObjectRequest.newBuilder()
1536+
.setWriteOffset(15)
1537+
.setChecksummedData(pqrst.asChecksummedData())
1538+
.setFlush(true)
1539+
.setStateLookup(true)
1540+
.build();
1541+
BidiWriteObjectResponse res4 = incrementalResponse(20);
1542+
BidiWriteObjectRequest req5 =
1543+
BidiWriteObjectRequest.newBuilder()
1544+
.setWriteOffset(20)
1545+
.setChecksummedData(abcde.asChecksummedData())
1546+
.setFlush(true)
1547+
.setStateLookup(true)
1548+
.build();
1549+
BidiWriteObjectResponse res5 = incrementalResponse(25);
1550+
BidiWriteObjectRequest req6 =
1551+
BidiWriteObjectRequest.newBuilder().setWriteOffset(25).setFinishWrite(true).build();
1552+
BidiWriteObjectResponse res6 =
1553+
BidiWriteObjectResponse.newBuilder()
1554+
.setResource(
1555+
Object.newBuilder()
1556+
.setName(METADATA.getName())
1557+
.setBucket(METADATA.getBucket())
1558+
.setGeneration(METADATA.getGeneration())
1559+
.setSize(25)
1560+
.setChecksums(ObjectChecksums.newBuilder().setCrc32C(all.getCrc32c()).build())
1561+
// real object would have some extra fields like metageneration and storage
1562+
// class
1563+
.build())
1564+
.build();
1565+
1566+
FakeStorage fake =
1567+
FakeStorage.of(
1568+
ImmutableMap.of(
1569+
req1, respond -> respond.onNext(res1),
1570+
req2, respond -> respond.onNext(res2),
1571+
req3, respond -> respond.onNext(res3),
1572+
req4, respond -> respond.onNext(res4),
1573+
req5, respond -> respond.onNext(res5),
1574+
req6, respond -> respond.onNext(res6)));
1575+
try (FakeServer fakeServer = FakeServer.of(fake);
1576+
Storage storage = fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) {
1577+
BlobId id = BlobId.of("b", "o");
1578+
1579+
BlobAppendableUploadConfig uploadConfig = UPLOAD_CONFIG.withCrc32cValidationEnabled(true);
1580+
try (BlobAppendableUpload upload =
1581+
storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), uploadConfig)) {
1582+
upload.write(ByteBuffer.wrap(b));
1583+
upload.finalizeUpload().get(5, TimeUnit.SECONDS);
1584+
}
1585+
}
1586+
}
1587+
14941588
private Consumer<StreamObserver<BidiWriteObjectResponse>> maxRetries(
14951589
BidiWriteObjectRequest req,
14961590
BidiWriteObjectResponse res,

0 commit comments

Comments
 (0)