Skip to content

Commit a4d919e

Browse files
committed
chore: make BlobDescriptorStream re-open logic smarter
If #send(BidiReadObjectRequest) is called and the stream isn't open, automatically add the necessary BidiReadObjectSpec into the request.
1 parent f437c0a commit a4d919e

2 files changed

Lines changed: 36 additions & 10 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.concurrent.atomic.AtomicLong;
3030
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.ReentrantLock;
3132
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334

@@ -39,6 +40,7 @@ final class BlobDescriptorState {
3940
private final AtomicReference<@MonotonicNonNull Object> metadata;
4041
private final AtomicLong readIdSeq;
4142
private final Map<Long, BlobDescriptorStreamRead> outstandingReads;
43+
private final ReentrantLock lock;
4244

4345
BlobDescriptorState(BidiReadObjectRequest openRequest) {
4446
this.openRequest = openRequest;
@@ -47,9 +49,16 @@ final class BlobDescriptorState {
4749
this.metadata = new AtomicReference<>();
4850
this.readIdSeq = new AtomicLong(1);
4951
this.outstandingReads = new HashMap<>();
52+
this.lock = new ReentrantLock();
5053
}
5154

5255
BidiReadObjectRequest getOpenRequest() {
56+
Object obj = metadata.get();
57+
if (obj != null && obj.getGeneration() != openRequest.getReadObjectSpec().getGeneration()) {
58+
BidiReadObjectRequest.Builder b = openRequest.toBuilder();
59+
b.getReadObjectSpecBuilder().setGeneration(obj.getGeneration());
60+
return b.build();
61+
}
5362
return openRequest;
5463
}
5564

@@ -76,20 +85,29 @@ long newReadId() {
7685

7786
@Nullable
7887
BlobDescriptorStreamRead getOutstandingRead(long key) {
79-
synchronized (this) {
88+
lock.lock();
89+
try {
8090
return outstandingReads.get(key);
91+
} finally {
92+
lock.unlock();
8193
}
8294
}
8395

8496
void putOutstandingRead(long key, BlobDescriptorStreamRead value) {
85-
synchronized (this) {
97+
lock.lock();
98+
try {
8699
outstandingReads.put(key, value);
100+
} finally {
101+
lock.unlock();
87102
}
88103
}
89104

90105
void removeOutstandingRead(long key) {
91-
synchronized (this) {
106+
lock.lock();
107+
try {
92108
outstandingReads.remove(key);
109+
} finally {
110+
lock.unlock();
93111
}
94112
}
95113

@@ -103,22 +121,28 @@ String getRoutingToken() {
103121
}
104122

105123
BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
106-
synchronized (this) {
124+
lock.lock();
125+
try {
107126
BlobDescriptorStreamRead remove = outstandingReads.remove(oldReadId);
108-
checkState(remove != null, "unable to locate old");
127+
checkState(remove != null, "unable to locate old read");
109128
long newReadId = newReadId();
110129
BlobDescriptorStreamRead withNewReadId = remove.withNewReadId(newReadId);
111130
outstandingReads.put(newReadId, withNewReadId);
112131
return withNewReadId;
132+
} finally {
133+
lock.unlock();
113134
}
114135
}
115136

116137
List<ReadRange> getOutstandingReads() {
117-
synchronized (this) {
138+
lock.lock();
139+
try {
118140
return outstandingReads.values().stream()
119141
.filter(BlobDescriptorStreamRead::readyToSend)
120142
.map(BlobDescriptorStreamRead::makeReadRange)
121143
.collect(ImmutableList.toImmutableList());
144+
} finally {
145+
lock.unlock();
122146
}
123147
}
124148
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ public void close() throws IOException {
127127
public void send(BidiReadObjectRequest request) {
128128
checkOpen();
129129
if (requestStream == null) {
130-
restart();
130+
BidiReadObjectRequest merged =
131+
state.getOpenRequest().toBuilder().clearReadRanges().mergeFrom(request).build();
132+
getRequestStream().send(merged);
131133
} else {
132134
getRequestStream().send(request);
133135
}
@@ -369,7 +371,7 @@ private OnSuccess restartReadFromCurrentOffset(long id) {
369371
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
370372
BidiReadObjectRequest requestWithNewReadId =
371373
BidiReadObjectRequest.newBuilder().addReadRanges(readWithNewId.makeReadRange()).build();
372-
requestStream.send(requestWithNewReadId);
374+
BlobDescriptorStream.this.send(requestWithNewReadId);
373375
};
374376
}
375377

@@ -378,11 +380,11 @@ public void onComplete() {}
378380
}
379381

380382
private class MonitoringResponseObserver implements ResponseObserver<BidiReadObjectResponse> {
381-
private final BlobDescriptorStream.BidiReadObjectResponseObserver delegate;
383+
private final ResponseObserver<BidiReadObjectResponse> delegate;
382384
private final SettableApiFuture<Void> openSignal;
383385
private final SettableApiFuture<Void> closeSignal;
384386

385-
private MonitoringResponseObserver(BidiReadObjectResponseObserver delegate) {
387+
private MonitoringResponseObserver(ResponseObserver<BidiReadObjectResponse> delegate) {
386388
this.delegate = delegate;
387389
this.openSignal = SettableApiFuture.create();
388390
this.closeSignal = SettableApiFuture.create();

0 commit comments

Comments
 (0)