Skip to content

Commit ff8fd8f

Browse files
authored
chore: add the ability to disable crc32c validation for RangeProjectionConfigs
All existing operation perform no checksum validation, and for java 8 in particular crc32c calculation can have some ~1-2ms overhead, update things, so we can turn off crc32c for our benchmarks to validate if we want to expose a public flag.
1 parent 944e0bc commit ff8fd8f

9 files changed

Lines changed: 301 additions & 51 deletions

google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,21 +137,25 @@ abstract static class AccumulatingRead<Result>
137137
protected final List<ChildRef> childRefs;
138138
protected final SettableApiFuture<Result> complete;
139139
protected final long readId;
140+
protected final Hasher hasher;
140141

141142
private AccumulatingRead(
142143
long readId,
143144
RangeSpec rangeSpec,
145+
Hasher hasher,
144146
RetryContext retryContext,
145147
IOAutoCloseable onCloseCallback) {
146148
super(rangeSpec, retryContext, onCloseCallback);
147149
this.readId = readId;
150+
this.hasher = hasher;
148151
this.complete = SettableApiFuture.create();
149152
this.childRefs = Collections.synchronizedList(new ArrayList<>());
150153
}
151154

152155
private AccumulatingRead(
153156
long readId,
154157
RangeSpec rangeSpec,
158+
Hasher hasher,
155159
List<ChildRef> childRefs,
156160
AtomicLong readOffset,
157161
RetryContext retryContext,
@@ -162,6 +166,7 @@ private AccumulatingRead(
162166
this.readId = readId;
163167
this.childRefs = childRefs;
164168
this.complete = complete;
169+
this.hasher = hasher;
165170
}
166171

167172
@Override
@@ -195,6 +200,11 @@ public ApiFuture<?> fail(Throwable t) {
195200
return complete;
196201
}
197202

203+
@Override
204+
public Hasher hasher() {
205+
return hasher;
206+
}
207+
198208
@Override
199209
public void internalClose() throws IOException {
200210
if (!closed) {
@@ -251,6 +261,7 @@ public boolean canShareStreamWith(ObjectReadSessionStreamRead<?> other) {
251261
static class StreamingRead extends BaseObjectReadSessionStreamRead<ScatteringByteChannel>
252262
implements UnbufferedReadableByteChannel {
253263

264+
private final Hasher hasher;
254265
private final SettableApiFuture<Void> failFuture;
255266
private final BlockingQueue<Closeable> queue;
256267

@@ -261,10 +272,12 @@ static class StreamingRead extends BaseObjectReadSessionStreamRead<ScatteringByt
261272
StreamingRead(
262273
long readId,
263274
RangeSpec rangeSpec,
275+
Hasher hasher,
264276
RetryContext retryContext,
265277
IOAutoCloseable onCloseCallback) {
266278
super(rangeSpec, retryContext, onCloseCallback);
267279
this.readId = new AtomicLong(readId);
280+
this.hasher = hasher;
268281
this.closed = false;
269282
this.failFuture = SettableApiFuture.create();
270283
this.queue = new ArrayBlockingQueue<>(2);
@@ -277,6 +290,11 @@ long readId() {
277290
return readId.get();
278291
}
279292

293+
@Override
294+
public Hasher hasher() {
295+
return hasher;
296+
}
297+
280298
@Override
281299
public boolean acceptingBytes() {
282300
return !closed && !tombstoned;
@@ -477,14 +495,16 @@ static final class ByteArrayAccumulatingRead extends AccumulatingRead<byte[]> {
477495
ByteArrayAccumulatingRead(
478496
long readId,
479497
RangeSpec rangeSpec,
498+
Hasher hasher,
480499
RetryContext retryContext,
481500
IOAutoCloseable onCloseCallback) {
482-
super(readId, rangeSpec, retryContext, onCloseCallback);
501+
super(readId, rangeSpec, hasher, retryContext, onCloseCallback);
483502
}
484503

485504
private ByteArrayAccumulatingRead(
486505
long readId,
487506
RangeSpec rangeSpec,
507+
Hasher hasher,
488508
List<ChildRef> childRefs,
489509
RetryContext retryContext,
490510
AtomicLong readOffset,
@@ -494,6 +514,7 @@ private ByteArrayAccumulatingRead(
494514
super(
495515
readId,
496516
rangeSpec,
517+
hasher,
497518
childRefs,
498519
readOffset,
499520
retryContext,
@@ -527,6 +548,7 @@ public ByteArrayAccumulatingRead withNewReadId(long newReadId) {
527548
return new ByteArrayAccumulatingRead(
528549
newReadId,
529550
rangeSpec,
551+
hasher,
530552
childRefs,
531553
retryContext,
532554
readOffset,
@@ -544,14 +566,16 @@ static final class ZeroCopyByteStringAccumulatingRead
544566
ZeroCopyByteStringAccumulatingRead(
545567
long readId,
546568
RangeSpec rangeSpec,
569+
Hasher hasher,
547570
RetryContext retryContext,
548571
IOAutoCloseable onCloseCallback) {
549-
super(readId, rangeSpec, retryContext, onCloseCallback);
572+
super(readId, rangeSpec, hasher, retryContext, onCloseCallback);
550573
}
551574

552575
public ZeroCopyByteStringAccumulatingRead(
553576
long readId,
554577
RangeSpec rangeSpec,
578+
Hasher hasher,
555579
List<ChildRef> childRefs,
556580
AtomicLong readOffset,
557581
RetryContext retryContext,
@@ -562,6 +586,7 @@ public ZeroCopyByteStringAccumulatingRead(
562586
super(
563587
readId,
564588
rangeSpec,
589+
hasher,
565590
childRefs,
566591
readOffset,
567592
retryContext,
@@ -598,6 +623,7 @@ public ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
598623
return new ZeroCopyByteStringAccumulatingRead(
599624
newReadId,
600625
rangeSpec,
626+
hasher,
601627
childRefs,
602628
readOffset,
603629
retryContext,

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public void onResponse(BidiReadObjectResponse response) {
294294
// java11+ calculating the crc32c of a 2MiB segment is ~70us
295295
// java8 the same calculation is ~1600us
296296
// not something to worry about offloading to another thread at this time.
297-
Hasher.enabled().validateUnchecked(Crc32cValue.of(crc32C), content);
297+
read.hasher().validateUnchecked(Crc32cValue.of(crc32C), content);
298298
} catch (UncheckedChecksumMismatchException e) {
299299
read.recordError(
300300
e,

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStreamRead.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,24 +51,28 @@ interface ObjectReadSessionStreamRead<Projection> extends IOAutoCloseable {
5151

5252
boolean readyToSend();
5353

54+
Hasher hasher();
55+
5456
boolean canShareStreamWith(ObjectReadSessionStreamRead<?> other);
5557

5658
void setOnCloseCallback(IOAutoCloseable onCloseCallback);
5759

5860
void internalClose() throws IOException;
5961

6062
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
61-
long readId, RangeSpec rangeSpec, RetryContext retryContext) {
62-
return new ByteArrayAccumulatingRead(readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
63+
long readId, RangeSpec rangeSpec, Hasher hasher, RetryContext retryContext) {
64+
return new ByteArrayAccumulatingRead(
65+
readId, rangeSpec, hasher, retryContext, IOAutoCloseable.noOp());
6366
}
6467

6568
static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead(
66-
long readId, RangeSpec rangeSpec, RetryContext retryContext) {
69+
long readId, RangeSpec rangeSpec, Hasher hasher, RetryContext retryContext) {
6770
return new ZeroCopyByteStringAccumulatingRead(
68-
readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
71+
readId, rangeSpec, hasher, retryContext, IOAutoCloseable.noOp());
6972
}
7073

71-
static StreamingRead streamingRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
72-
return new StreamingRead(readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
74+
static StreamingRead streamingRead(
75+
long readId, RangeSpec rangeSpec, Hasher hasher, RetryContext retryContext) {
76+
return new StreamingRead(readId, rangeSpec, hasher, retryContext, IOAutoCloseable.noOp());
7377
}
7478
}

google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2020,6 +2020,11 @@ public ApiFuture<?> fail(Throwable t) {
20202020
return fail;
20212021
}
20222022

2023+
@Override
2024+
public Hasher hasher() {
2025+
return delegate.hasher();
2026+
}
2027+
20232028
@Override
20242029
public ObjectReadSessionStreamRead<Projection> withNewReadId(long newReadId) {
20252030
return new OtelDecoratingObjectReadSessionStreamRead<>(

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfigs.java

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static RangeAsChannel asChannel() {
5050
return RangeAsChannel.INSTANCE;
5151
}
5252

53-
public static RangeAsFutureBytes asFutureBytes() {
53+
static RangeAsFutureBytes asFutureBytes() {
5454
return RangeAsFutureBytes.INSTANCE;
5555
}
5656

@@ -66,11 +66,13 @@ public static final class SeekableChannelConfig
6666
extends RangeProjectionConfig<SeekableByteChannel> {
6767

6868
private static final SeekableChannelConfig INSTANCE =
69-
new SeekableChannelConfig(LinearExponentialRangeSpecFunction.INSTANCE);
69+
new SeekableChannelConfig(Hasher.enabled(), LinearExponentialRangeSpecFunction.INSTANCE);
7070

71+
private final Hasher hasher;
7172
private final RangeSpecFunction rangeSpecFunction;
7273

73-
private SeekableChannelConfig(RangeSpecFunction rangeSpecFunction) {
74+
private SeekableChannelConfig(Hasher hasher, RangeSpecFunction rangeSpecFunction) {
75+
this.hasher = hasher;
7476
this.rangeSpecFunction = rangeSpecFunction;
7577
}
7678

@@ -80,7 +82,21 @@ public RangeSpecFunction getRangeSpecFunction() {
8082

8183
public SeekableChannelConfig withRangeSpecFunction(RangeSpecFunction rangeSpecFunction) {
8284
requireNonNull(rangeSpecFunction, "rangeSpecFunction must be non null");
83-
return new SeekableChannelConfig(rangeSpecFunction);
85+
return new SeekableChannelConfig(hasher, rangeSpecFunction);
86+
}
87+
88+
boolean getCrc32cValidationEnabled() {
89+
return Hasher.enabled().equals(hasher);
90+
}
91+
92+
SeekableChannelConfig withCrc32cValidationEnabled(boolean enabled) {
93+
if (enabled && Hasher.enabled().equals(hasher)) {
94+
return this;
95+
} else if (!enabled && Hasher.noop().equals(hasher)) {
96+
return this;
97+
}
98+
return new SeekableChannelConfig(
99+
enabled ? Hasher.enabled() : Hasher.noop(), rangeSpecFunction);
84100
}
85101

86102
@Override
@@ -122,10 +138,26 @@ public String toString() {
122138

123139
public static final class RangeAsChannel
124140
extends BaseConfig<ScatteringByteChannel, StreamingRead> {
125-
private static final RangeAsChannel INSTANCE = new RangeAsChannel();
141+
private static final RangeAsChannel INSTANCE = new RangeAsChannel(Hasher.enabled());
142+
143+
private final Hasher hasher;
126144

127-
private RangeAsChannel() {
145+
private RangeAsChannel(Hasher hasher) {
128146
super();
147+
this.hasher = hasher;
148+
}
149+
150+
boolean getCrc32cValidationEnabled() {
151+
return Hasher.enabled().equals(hasher);
152+
}
153+
154+
RangeAsChannel withCrc32cValidationEnabled(boolean enabled) {
155+
if (enabled && Hasher.enabled().equals(hasher)) {
156+
return this;
157+
} else if (!enabled && Hasher.noop().equals(hasher)) {
158+
return this;
159+
}
160+
return new RangeAsChannel(enabled ? Hasher.enabled() : Hasher.noop());
129161
}
130162

131163
@Override
@@ -135,16 +167,32 @@ private RangeAsChannel() {
135167

136168
@Override
137169
StreamingRead newRead(long readId, RangeSpec range, RetryContext retryContext) {
138-
return ObjectReadSessionStreamRead.streamingRead(readId, range, retryContext);
170+
return ObjectReadSessionStreamRead.streamingRead(readId, range, hasher, retryContext);
139171
}
140172
}
141173

142174
public static final class RangeAsFutureBytes
143175
extends BaseConfig<ApiFuture<byte[]>, AccumulatingRead<byte[]>> {
144-
private static final RangeAsFutureBytes INSTANCE = new RangeAsFutureBytes();
176+
private static final RangeAsFutureBytes INSTANCE = new RangeAsFutureBytes(Hasher.enabled());
177+
178+
private final Hasher hasher;
145179

146-
private RangeAsFutureBytes() {
180+
private RangeAsFutureBytes(Hasher hasher) {
147181
super();
182+
this.hasher = hasher;
183+
}
184+
185+
boolean getCrc32cValidationEnabled() {
186+
return Hasher.enabled().equals(hasher);
187+
}
188+
189+
RangeAsFutureBytes withCrc32cValidationEnabled(boolean enabled) {
190+
if (enabled && Hasher.enabled().equals(hasher)) {
191+
return this;
192+
} else if (!enabled && Hasher.noop().equals(hasher)) {
193+
return this;
194+
}
195+
return new RangeAsFutureBytes(enabled ? Hasher.enabled() : Hasher.noop());
148196
}
149197

150198
@Override
@@ -155,16 +203,33 @@ private RangeAsFutureBytes() {
155203
@Override
156204
AccumulatingRead<byte[]> newRead(long readId, RangeSpec range, RetryContext retryContext) {
157205
return ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
158-
readId, range, retryContext);
206+
readId, range, hasher, retryContext);
159207
}
160208
}
161209

162210
static final class RangeAsFutureByteString
163211
extends BaseConfig<ApiFuture<DisposableByteString>, AccumulatingRead<DisposableByteString>> {
164-
private static final RangeAsFutureByteString INSTANCE = new RangeAsFutureByteString();
212+
private static final RangeAsFutureByteString INSTANCE =
213+
new RangeAsFutureByteString(Hasher.enabled());
214+
215+
private final Hasher hasher;
165216

166-
private RangeAsFutureByteString() {
217+
private RangeAsFutureByteString(Hasher hasher) {
167218
super();
219+
this.hasher = hasher;
220+
}
221+
222+
boolean getCrc32cValidationEnabled() {
223+
return Hasher.enabled().equals(hasher);
224+
}
225+
226+
RangeAsFutureByteString withCrc32cValidationEnabled(boolean enabled) {
227+
if (enabled && Hasher.enabled().equals(hasher)) {
228+
return this;
229+
} else if (!enabled && Hasher.noop().equals(hasher)) {
230+
return this;
231+
}
232+
return new RangeAsFutureByteString(enabled ? Hasher.enabled() : Hasher.noop());
168233
}
169234

170235
@Override
@@ -176,7 +241,7 @@ private RangeAsFutureByteString() {
176241
AccumulatingRead<DisposableByteString> newRead(
177242
long readId, RangeSpec range, RetryContext retryContext) {
178243
return ObjectReadSessionStreamRead.createZeroCopyByteStringAccumulatingRead(
179-
readId, range, retryContext);
244+
readId, range, hasher, retryContext);
180245
}
181246
}
182247
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStateTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ public void getOpenArguments_includesAllRelevantModifications() throws Exception
8080

8181
AccumulatingRead<byte[]> r1 =
8282
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
83-
1, RangeSpec.of(3, 4), neverRetry);
83+
1, RangeSpec.of(3, 4), Hasher.enabled(), neverRetry);
8484
AccumulatingRead<byte[]> r2 =
8585
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
86-
2, RangeSpec.of(19, 14), neverRetry);
86+
2, RangeSpec.of(19, 14), Hasher.enabled(), neverRetry);
8787

8888
state.putOutstandingRead(1, r1);
8989
state.putOutstandingRead(2, r2);
@@ -219,14 +219,15 @@ public void canHandleNewRead() throws Exception {
219219
state1.putOutstandingRead(1, TestObjectReadSessionStreamRead.of());
220220
state2.putOutstandingRead(
221221
3,
222-
ObjectReadSessionStreamRead.streamingRead(3, RangeSpec.all(), RetryContext.neverRetry()));
222+
ObjectReadSessionStreamRead.streamingRead(
223+
3, RangeSpec.all(), Hasher.enabled(), RetryContext.neverRetry()));
223224

224225
try (AccumulatingRead<byte[]> bytes =
225226
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
226-
2, RangeSpec.all(), RetryContext.neverRetry());
227+
2, RangeSpec.all(), Hasher.enabled(), RetryContext.neverRetry());
227228
StreamingRead streaming2 =
228229
ObjectReadSessionStreamRead.streamingRead(
229-
4, RangeSpec.all(), RetryContext.neverRetry())) {
230+
4, RangeSpec.all(), Hasher.enabled(), RetryContext.neverRetry())) {
230231
assertAll(
231232
() -> assertThat(state1.canHandleNewRead(TestObjectReadSessionStreamRead.of())).isTrue(),
232233
() -> assertThat(state1.canHandleNewRead(bytes)).isFalse(),

0 commit comments

Comments
 (0)