Skip to content

Commit 85049b9

Browse files
committed
chore: update read session api to move to a parameterized config type carrying it's projection rather than having N methods
* chore: update type parameters of ObjectReadSessionStreamRead to make it responsible for projecting it's publicly visible type * chore: refactor ReadSession api to a config parameter to choose which projection type rather than having N methods for each projection type.
1 parent 2a92e35 commit 85049b9

13 files changed

Lines changed: 285 additions & 112 deletions

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,19 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.BetaApi;
2020
import java.io.IOException;
21-
import java.nio.channels.ScatteringByteChannel;
2221

22+
@BetaApi
2323
public interface BlobReadSession extends IOAutoCloseable {
2424

25+
@BetaApi
2526
BlobInfo getBlobInfo();
2627

27-
ApiFuture<byte[]> readRangeAsBytes(RangeSpec range);
28-
29-
/**
30-
* Read the provided range as a non-blocking Channel.
31-
*
32-
* <p>The returned channel will be non-blocking for all read calls. If bytes have not yet
33-
* asynchronously been delivered from gcs the method will return rather than waiting for the bytes
34-
* to arrive.
35-
*/
36-
ScatteringByteChannel readRangeAsChannel(RangeSpec range);
28+
@BetaApi
29+
<Projection> Projection readRange(RangeSpec range, RangeProjectionConfig<Projection> config);
3730

3831
@Override
32+
@BetaApi
3933
void close() throws IOException;
4034
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.util.concurrent.MoreExecutors;
2323
import java.io.IOException;
24-
import java.nio.channels.ScatteringByteChannel;
2524

2625
final class BlobReadSessionAdapter implements BlobReadSession {
2726

@@ -36,14 +35,18 @@ public BlobInfo getBlobInfo() {
3635
return Conversions.grpc().blobInfo().decode(session.getResource());
3736
}
3837

38+
// ApiFutures type is erased, but that's okay. We're only decorating the errors. not changing
39+
// the return type.
40+
@SuppressWarnings({"rawtypes", "unchecked"})
3941
@Override
40-
public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
41-
return StorageException.coalesceAsync(session.readRangeAsBytes(range));
42-
}
43-
44-
@Override
45-
public ScatteringByteChannel readRangeAsChannel(RangeSpec range) {
46-
return session.readRangeAsChannel(range);
42+
public <Projection> Projection readRange(
43+
RangeSpec range, RangeProjectionConfig<Projection> config) {
44+
Projection projection = session.readRange(range, config);
45+
if (projection instanceof ApiFuture) {
46+
ApiFuture apiFuture = (ApiFuture) projection;
47+
return (Projection) StorageException.coalesceAsync(apiFuture);
48+
}
49+
return projection;
4750
}
4851

4952
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -855,13 +855,14 @@ public Storage create(StorageOptions options) {
855855
stub.bidiReadObjectCallable(), stub.bidiReadObjectResponseMarshaller),
856856
retryContextProvider,
857857
IOAutoCloseable.noOp());
858-
GrpcStorageImpl grpcStorage = new GrpcStorageImpl(
859-
grpcStorageOptions,
860-
client,
861-
dataClient,
862-
stub.readObjectResponseMarshaller,
863-
grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()),
864-
defaultOpts);
858+
GrpcStorageImpl grpcStorage =
859+
new GrpcStorageImpl(
860+
grpcStorageOptions,
861+
client,
862+
dataClient,
863+
stub.readObjectResponseMarshaller,
864+
grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()),
865+
defaultOpts);
865866
return OtelStorageDecorator.decorate(
866867
grpcStorage, options.getOpenTelemetry(), Transport.GRPC);
867868
} else {

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,11 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import com.google.api.core.ApiFuture;
2019
import com.google.storage.v2.Object;
21-
import java.nio.channels.ScatteringByteChannel;
2220

2321
public interface ObjectReadSession extends IOAutoCloseable {
2422

2523
Object getResource();
2624

27-
ApiFuture<byte[]> readRangeAsBytes(RangeSpec range);
28-
29-
/**
30-
* Read the provided range as a non-blocking Channel.
31-
*
32-
* <p>The returned channel will be non-blocking for all read calls. If bytes have not yet
33-
* asynchronously been delivered from gcs the method will return rather than waiting for the bytes
34-
* to arrive.
35-
*/
36-
ScatteringByteChannel readRangeAsChannel(RangeSpec range);
25+
<Projection> Projection readRange(RangeSpec range, RangeProjectionConfig<Projection> config);
3726
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,13 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.gax.grpc.GrpcCallContext;
2424
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
25-
import com.google.cloud.storage.ObjectReadSessionStreamRead.AccumulatingRead;
26-
import com.google.cloud.storage.ObjectReadSessionStreamRead.StreamingRead;
2725
import com.google.cloud.storage.RetryContext.RetryContextProvider;
28-
import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
2926
import com.google.common.annotations.VisibleForTesting;
3027
import com.google.storage.v2.BidiReadObjectRequest;
3128
import com.google.storage.v2.BidiReadObjectResponse;
3229
import com.google.storage.v2.Object;
3330
import java.io.IOException;
3431
import java.io.InterruptedIOException;
35-
import java.nio.channels.ScatteringByteChannel;
3632
import java.util.ArrayList;
3733
import java.util.IdentityHashMap;
3834
import java.util.Iterator;
@@ -83,46 +79,16 @@ public Object getResource() {
8379
}
8480

8581
@Override
86-
public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
82+
public <Projection> Projection readRange(
83+
RangeSpec range, RangeProjectionConfig<Projection> config) {
8784
lock.lock();
8885
try {
8986
checkState(open, "stream already closed");
9087
long readId = state.newReadId();
91-
AccumulatingRead<byte[]> read =
92-
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
93-
readId, range, retryContextProvider.create());
88+
ObjectReadSessionStreamRead<Projection, ?> read =
89+
config.cast().newRead(readId, range, retryContextProvider.create());
9490
registerReadInState(readId, read);
95-
return read;
96-
} finally {
97-
lock.unlock();
98-
}
99-
}
100-
101-
@Override
102-
public ScatteringByteChannel readRangeAsChannel(RangeSpec range) {
103-
lock.lock();
104-
try {
105-
checkState(open, "stream already closed");
106-
long readId = state.newReadId();
107-
StreamingRead read =
108-
ObjectReadSessionStreamRead.streamingRead(readId, range, retryContextProvider.create());
109-
registerReadInState(readId, read);
110-
return read;
111-
} finally {
112-
lock.unlock();
113-
}
114-
}
115-
116-
public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
117-
lock.lock();
118-
try {
119-
checkState(open, "stream already closed");
120-
long readId = state.newReadId();
121-
AccumulatingRead<DisposableByteString> read =
122-
ObjectReadSessionStreamRead.createZeroCopyByteStringAccumulatingRead(
123-
readId, range, retryContextProvider.create());
124-
registerReadInState(readId, read);
125-
return read;
91+
return read.project();
12692
} finally {
12793
lock.unlock();
12894
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStreamRead.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.nio.ByteBuffer;
3434
import java.nio.channels.ClosedChannelException;
3535
import java.nio.channels.ReadableByteChannel;
36+
import java.nio.channels.ScatteringByteChannel;
3637
import java.util.ArrayList;
3738
import java.util.Collections;
3839
import java.util.List;
@@ -46,7 +47,9 @@
4647
import java.util.concurrent.atomic.AtomicLong;
4748
import org.checkerframework.checker.nullness.qual.Nullable;
4849

49-
abstract class ObjectReadSessionStreamRead implements IOAutoCloseable {
50+
abstract class ObjectReadSessionStreamRead<
51+
Projection, Self extends ObjectReadSessionStreamRead<Projection, Self>>
52+
implements IOAutoCloseable {
5053

5154
protected final long readId;
5255
protected final RangeSpec rangeSpec;
@@ -81,6 +84,8 @@ abstract class ObjectReadSessionStreamRead implements IOAutoCloseable {
8184
this.onCloseCallback = onCloseCallback;
8285
}
8386

87+
abstract Projection project();
88+
8489
long readOffset() {
8590
return readOffset.get();
8691
}
@@ -97,7 +102,7 @@ final void preFail() {
97102

98103
abstract ApiFuture<?> fail(Throwable t);
99104

100-
abstract ObjectReadSessionStreamRead withNewReadId(long newReadId);
105+
abstract Self withNewReadId(long newReadId);
101106

102107
final ReadRange makeReadRange() {
103108
long currentOffset = readOffset.get();
@@ -120,10 +125,11 @@ boolean readyToSend() {
120125
return !tombstoned && !retryContext.inBackoff();
121126
}
122127

123-
boolean canShareStreamWith(ObjectReadSessionStreamRead other) {
128+
boolean canShareStreamWith(ObjectReadSessionStreamRead<?, ?> other) {
124129
return canShareStreamWith(other.getClass());
125130
}
126131

132+
@SuppressWarnings("rawtypes")
127133
protected boolean canShareStreamWith(Class<? extends ObjectReadSessionStreamRead> clazz) {
128134
return clazz == this.getClass();
129135
}
@@ -159,7 +165,8 @@ static StreamingRead streamingRead(long readId, RangeSpec rangeSpec, RetryContex
159165
}
160166

161167
/** Base class of a read that will accumulate before completing by resolving a future */
162-
abstract static class AccumulatingRead<Result> extends ObjectReadSessionStreamRead
168+
abstract static class AccumulatingRead<Result>
169+
extends ObjectReadSessionStreamRead<ApiFuture<Result>, AccumulatingRead<Result>>
163170
implements ApiFuture<Result> {
164171
protected final List<ChildRef> childRefs;
165172
protected final SettableApiFuture<Result> complete;
@@ -257,6 +264,7 @@ public boolean isDone() {
257264
return complete.isDone();
258265
}
259266

267+
@SuppressWarnings("rawtypes")
260268
@Override
261269
protected boolean canShareStreamWith(Class<? extends ObjectReadSessionStreamRead> clazz) {
262270
return AccumulatingRead.class.isAssignableFrom(clazz);
@@ -267,7 +275,8 @@ protected boolean canShareStreamWith(Class<? extends ObjectReadSessionStreamRead
267275
* Base class of a read that will be processed in a streaming manner (e.g. {@link
268276
* ReadableByteChannel})
269277
*/
270-
static class StreamingRead extends ObjectReadSessionStreamRead
278+
static class StreamingRead
279+
extends ObjectReadSessionStreamRead<ScatteringByteChannel, StreamingRead>
271280
implements UnbufferedReadableByteChannel {
272281
private final SettableApiFuture<Void> failFuture;
273282
private final BlockingQueue<Closeable> queue;
@@ -360,7 +369,7 @@ StreamingRead withNewReadId(long newReadId) {
360369
}
361370

362371
@Override
363-
boolean canShareStreamWith(ObjectReadSessionStreamRead other) {
372+
boolean canShareStreamWith(ObjectReadSessionStreamRead<?, ?> other) {
364373
return false;
365374
}
366375

@@ -381,6 +390,21 @@ public boolean isOpen() {
381390
return !closed;
382391
}
383392

393+
@Override
394+
UnbufferedReadableByteChannel project() {
395+
return this;
396+
}
397+
398+
@Override
399+
public int read(ByteBuffer dst) throws IOException {
400+
return Math.toIntExact(read(new ByteBuffer[] {dst}, 0, 1));
401+
}
402+
403+
@Override
404+
public long read(ByteBuffer[] dsts) throws IOException {
405+
return read(dsts, 0, dsts.length);
406+
}
407+
384408
@Override
385409
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
386410
if (closed) {
@@ -532,6 +556,11 @@ private ByteArrayAccumulatingRead(
532556
onCloseCallback);
533557
}
534558

559+
@Override
560+
ApiFuture<byte[]> project() {
561+
return this;
562+
}
563+
535564
@Override
536565
void eof() throws IOException {
537566
retryContext.reset();
@@ -596,6 +625,11 @@ public ZeroCopyByteStringAccumulatingRead(
596625
this.byteString = byteString;
597626
}
598627

628+
@Override
629+
ApiFuture<DisposableByteString> project() {
630+
return this;
631+
}
632+
599633
@Override
600634
public ByteString byteString() {
601635
return byteString;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.cloud.storage.RangeProjectionConfigs.BaseConfig;
20+
21+
public abstract class RangeProjectionConfig<Projection> {
22+
23+
RangeProjectionConfig() {}
24+
25+
abstract BaseConfig<Projection, ?> cast();
26+
}

0 commit comments

Comments
 (0)