Skip to content

Commit 498fd0b

Browse files
authored
fix: reduce Java 21 Virtual Thread Pinning in IO operations (#2553)
1 parent a7b5fa2 commit 498fd0b

9 files changed

Lines changed: 374 additions & 185 deletions

google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncAppendingQueue.java

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.Executor;
3939
import java.util.concurrent.atomic.AtomicInteger;
4040
import java.util.concurrent.atomic.AtomicReference;
41+
import java.util.concurrent.locks.ReentrantLock;
4142
import java.util.stream.Collectors;
4243
import org.checkerframework.checker.nullness.qual.NonNull;
4344

@@ -66,6 +67,7 @@ boolean isOpen() {
6667
private final AtomicReference<Throwable> shortCircuitFailure;
6768
private final ApiFutureCallback<T> shortCircuitRegistrationCallback;
6869

70+
private final ReentrantLock lock;
6971
private volatile State state;
7072

7173
private AsyncAppendingQueue(
@@ -85,26 +87,32 @@ private AsyncAppendingQueue(
8587
shortCircuitFailure.compareAndSet(null, throwable);
8688
}
8789
};
90+
lock = new ReentrantLock();
8891
}
8992

90-
synchronized AsyncAppendingQueue<T> append(ApiFuture<T> value) throws ShortCircuitException {
91-
checkState(state.isOpen(), "already closed");
92-
Throwable throwable = shortCircuitFailure.get();
93-
if (throwable != null) {
94-
ShortCircuitException shortCircuitException = new ShortCircuitException(throwable);
95-
finalResult.cancel(true);
96-
throw shortCircuitException;
93+
AsyncAppendingQueue<T> append(ApiFuture<T> value) throws ShortCircuitException {
94+
lock.lock();
95+
try {
96+
checkState(state.isOpen(), "already closed");
97+
Throwable throwable = shortCircuitFailure.get();
98+
if (throwable != null) {
99+
ShortCircuitException shortCircuitException = new ShortCircuitException(throwable);
100+
finalResult.cancel(true);
101+
throw shortCircuitException;
102+
}
103+
checkNotNull(value, "value must not be null");
104+
105+
Element<T> newElement = newElement(value);
106+
queue.offer(newElement);
107+
boolean isFull = queue.size() == maxElementsPerCompact;
108+
if (isFull) {
109+
Element<T> compact = compact(exec);
110+
queue.offer(compact);
111+
}
112+
return this;
113+
} finally {
114+
lock.unlock();
97115
}
98-
checkNotNull(value, "value must not be null");
99-
100-
Element<T> newElement = newElement(value);
101-
queue.offer(newElement);
102-
boolean isFull = queue.size() == maxElementsPerCompact;
103-
if (isFull) {
104-
Element<T> compact = compact(exec);
105-
queue.offer(compact);
106-
}
107-
return this;
108116
}
109117

110118
ApiFuture<T> getResult() {
@@ -116,35 +124,40 @@ T await() {
116124
}
117125

118126
@Override
119-
public synchronized void close() {
120-
if (!state.isOpen()) {
121-
return;
122-
}
123-
state = State.CLOSING;
124-
125-
if (queue.isEmpty()) {
126-
NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to");
127-
finalResult.setException(neverAppendedTo);
128-
throw neverAppendedTo;
129-
} else {
130-
Element<T> transform = compact(exec);
131-
132-
ApiFutures.addCallback(
133-
transform.getValue(),
134-
new ApiFutureCallback<T>() {
135-
@Override
136-
public void onFailure(Throwable err) {
137-
finalResult.setException(err);
138-
}
139-
140-
@Override
141-
public void onSuccess(T t) {
142-
finalResult.set(t);
143-
}
144-
},
145-
exec);
127+
public void close() {
128+
lock.lock();
129+
try {
130+
if (!state.isOpen()) {
131+
return;
132+
}
133+
state = State.CLOSING;
134+
135+
if (queue.isEmpty()) {
136+
NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to");
137+
finalResult.setException(neverAppendedTo);
138+
throw neverAppendedTo;
139+
} else {
140+
Element<T> transform = compact(exec);
141+
142+
ApiFutures.addCallback(
143+
transform.getValue(),
144+
new ApiFutureCallback<T>() {
145+
@Override
146+
public void onFailure(Throwable err) {
147+
finalResult.setException(err);
148+
}
149+
150+
@Override
151+
public void onSuccess(T t) {
152+
finalResult.set(t);
153+
}
154+
},
155+
exec);
156+
}
157+
state = State.CLOSED;
158+
} finally {
159+
lock.unlock();
146160
}
147-
state = State.CLOSED;
148161
}
149162

150163
@NonNull

google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java

Lines changed: 75 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.nio.ByteBuffer;
3030
import java.nio.channels.ClosedChannelException;
3131
import java.nio.channels.ReadableByteChannel;
32+
import java.util.concurrent.locks.ReentrantLock;
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334

3435
abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
@@ -41,42 +42,64 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
4142
private int chunkSize = _2MiB;
4243
private BufferHandle bufferHandle;
4344
private LazyReadChannel<?, T> lazyReadChannel;
45+
protected final ReentrantLock lock;
4446

4547
protected BaseStorageReadChannel(Decoder<T, BlobInfo> objectDecoder) {
4648
this.objectDecoder = objectDecoder;
4749
this.result = SettableApiFuture.create();
4850
this.open = true;
4951
this.byteRangeSpec = ByteRangeSpec.nullRange();
52+
this.lock = new ReentrantLock();
5053
}
5154

5255
@Override
53-
public final synchronized void setChunkSize(int chunkSize) {
54-
StorageException.wrapIOException(() -> maybeResetChannel(true));
55-
this.chunkSize = chunkSize;
56+
public final void setChunkSize(int chunkSize) {
57+
lock.lock();
58+
try {
59+
StorageException.wrapIOException(() -> maybeResetChannel(true));
60+
this.chunkSize = chunkSize;
61+
} finally {
62+
lock.unlock();
63+
}
5664
}
5765

5866
@Override
59-
public final synchronized boolean isOpen() {
60-
return open;
67+
public final boolean isOpen() {
68+
lock.lock();
69+
try {
70+
return open;
71+
} finally {
72+
lock.unlock();
73+
}
6174
}
6275

6376
@Override
64-
public final synchronized void close() {
65-
open = false;
66-
if (internalGetLazyChannel().isOpen()) {
67-
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
68-
StorageException.wrapIOException(channel::close);
77+
public final void close() {
78+
lock.lock();
79+
try {
80+
open = false;
81+
if (internalGetLazyChannel().isOpen()) {
82+
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
83+
StorageException.wrapIOException(channel::close);
84+
}
85+
} finally {
86+
lock.unlock();
6987
}
7088
}
7189

7290
@Override
73-
public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
91+
public final StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
7492
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
75-
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
76-
StorageException.wrapIOException(() -> maybeResetChannel(false));
77-
this.byteRangeSpec = byteRangeSpec;
93+
lock.lock();
94+
try {
95+
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
96+
StorageException.wrapIOException(() -> maybeResetChannel(false));
97+
this.byteRangeSpec = byteRangeSpec;
98+
}
99+
return this;
100+
} finally {
101+
lock.unlock();
78102
}
79-
return this;
80103
}
81104

82105
@Override
@@ -85,42 +108,47 @@ public final ByteRangeSpec getByteRangeSpec() {
85108
}
86109

87110
@Override
88-
public final synchronized int read(ByteBuffer dst) throws IOException {
89-
// BlobReadChannel only considered itself closed if close had been called on it.
90-
if (!open) {
91-
throw new ClosedChannelException();
92-
}
93-
long diff = byteRangeSpec.length();
94-
// the check on beginOffset >= 0 used to be a precondition on seek(long)
95-
// move it here to preserve existing behavior while allowing new negative offsets
96-
if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) {
97-
return -1;
98-
}
111+
public final int read(ByteBuffer dst) throws IOException {
112+
lock.lock();
99113
try {
100-
// trap if the fact that tmp is already closed, and instead return -1
101-
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
102-
if (!tmp.isOpen()) {
103-
return -1;
104-
}
105-
int read = tmp.read(dst);
106-
if (read != -1) {
107-
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
114+
// BlobReadChannel only considered itself closed if close had been called on it.
115+
if (!open) {
116+
throw new ClosedChannelException();
108117
}
109-
return read;
110-
} catch (StorageException e) {
111-
if (e.getCode() == 416) {
112-
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
113-
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
114-
// Emulate that same behavior here to preserve behavior compatibility, though this should
115-
// be removed in the next major version.
118+
long diff = byteRangeSpec.length();
119+
// the check on beginOffset >= 0 used to be a precondition on seek(long)
120+
// move it here to preserve existing behavior while allowing new negative offsets
121+
if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) {
116122
return -1;
117-
} else {
118-
throw new IOException(e);
119123
}
120-
} catch (IOException e) {
121-
throw e;
122-
} catch (Exception e) {
123-
throw new IOException(StorageException.coalesce(e));
124+
try {
125+
// trap if the fact that tmp is already closed, and instead return -1
126+
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
127+
if (!tmp.isOpen()) {
128+
return -1;
129+
}
130+
int read = tmp.read(dst);
131+
if (read != -1) {
132+
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
133+
}
134+
return read;
135+
} catch (StorageException e) {
136+
if (e.getCode() == 416) {
137+
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
138+
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
139+
// Emulate that same behavior here to preserve behavior compatibility, though this should
140+
// be removed in the next major version.
141+
return -1;
142+
} else {
143+
throw new IOException(e);
144+
}
145+
} catch (IOException e) {
146+
throw e;
147+
} catch (Exception e) {
148+
throw new IOException(StorageException.coalesce(e));
149+
}
150+
} finally {
151+
lock.unlock();
124152
}
125153
}
126154

0 commit comments

Comments
 (0)