2929import java .nio .ByteBuffer ;
3030import java .nio .channels .ClosedChannelException ;
3131import java .nio .channels .ReadableByteChannel ;
32+ import java .util .concurrent .locks .ReentrantLock ;
3233import org .checkerframework .checker .nullness .qual .Nullable ;
3334
3435abstract class BaseStorageReadChannel <T > implements StorageReadChannel {
@@ -41,42 +42,64 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
4142 private int chunkSize = _2MiB ;
4243 private BufferHandle bufferHandle ;
4344 private LazyReadChannel <?, T > lazyReadChannel ;
45+ protected final ReentrantLock lock ;
4446
4547 protected BaseStorageReadChannel (Decoder <T , BlobInfo > objectDecoder ) {
4648 this .objectDecoder = objectDecoder ;
4749 this .result = SettableApiFuture .create ();
4850 this .open = true ;
4951 this .byteRangeSpec = ByteRangeSpec .nullRange ();
52+ this .lock = new ReentrantLock ();
5053 }
5154
5255 @ Override
53- public final synchronized void setChunkSize (int chunkSize ) {
54- StorageException .wrapIOException (() -> maybeResetChannel (true ));
55- this .chunkSize = chunkSize ;
56+ public final void setChunkSize (int chunkSize ) {
57+ lock .lock ();
58+ try {
59+ StorageException .wrapIOException (() -> maybeResetChannel (true ));
60+ this .chunkSize = chunkSize ;
61+ } finally {
62+ lock .unlock ();
63+ }
5664 }
5765
5866 @ Override
59- public final synchronized boolean isOpen () {
60- return open ;
67+ public final boolean isOpen () {
68+ lock .lock ();
69+ try {
70+ return open ;
71+ } finally {
72+ lock .unlock ();
73+ }
6174 }
6275
6376 @ Override
64- public final synchronized void close () {
65- open = false ;
66- if (internalGetLazyChannel ().isOpen ()) {
67- ReadableByteChannel channel = internalGetLazyChannel ().getChannel ();
68- StorageException .wrapIOException (channel ::close );
77+ public final void close () {
78+ lock .lock ();
79+ try {
80+ open = false ;
81+ if (internalGetLazyChannel ().isOpen ()) {
82+ ReadableByteChannel channel = internalGetLazyChannel ().getChannel ();
83+ StorageException .wrapIOException (channel ::close );
84+ }
85+ } finally {
86+ lock .unlock ();
6987 }
7088 }
7189
7290 @ Override
73- public final synchronized StorageReadChannel setByteRangeSpec (ByteRangeSpec byteRangeSpec ) {
91+ public final StorageReadChannel setByteRangeSpec (ByteRangeSpec byteRangeSpec ) {
7492 requireNonNull (byteRangeSpec , "byteRangeSpec must be non null" );
75- if (!this .byteRangeSpec .equals (byteRangeSpec )) {
76- StorageException .wrapIOException (() -> maybeResetChannel (false ));
77- this .byteRangeSpec = byteRangeSpec ;
93+ lock .lock ();
94+ try {
95+ if (!this .byteRangeSpec .equals (byteRangeSpec )) {
96+ StorageException .wrapIOException (() -> maybeResetChannel (false ));
97+ this .byteRangeSpec = byteRangeSpec ;
98+ }
99+ return this ;
100+ } finally {
101+ lock .unlock ();
78102 }
79- return this ;
80103 }
81104
82105 @ Override
@@ -85,42 +108,47 @@ public final ByteRangeSpec getByteRangeSpec() {
85108 }
86109
87110 @ Override
88- public final synchronized int read (ByteBuffer dst ) throws IOException {
89- // BlobReadChannel only considered itself closed if close had been called on it.
90- if (!open ) {
91- throw new ClosedChannelException ();
92- }
93- long diff = byteRangeSpec .length ();
94- // the check on beginOffset >= 0 used to be a precondition on seek(long)
95- // move it here to preserve existing behavior while allowing new negative offsets
96- if (diff <= 0 && byteRangeSpec .beginOffset () >= 0 ) {
97- return -1 ;
98- }
111+ public final int read (ByteBuffer dst ) throws IOException {
112+ lock .lock ();
99113 try {
100- // trap if the fact that tmp is already closed, and instead return -1
101- ReadableByteChannel tmp = internalGetLazyChannel ().getChannel ();
102- if (!tmp .isOpen ()) {
103- return -1 ;
104- }
105- int read = tmp .read (dst );
106- if (read != -1 ) {
107- byteRangeSpec = byteRangeSpec .withShiftBeginOffset (read );
114+ // BlobReadChannel only considered itself closed if close had been called on it.
115+ if (!open ) {
116+ throw new ClosedChannelException ();
108117 }
109- return read ;
110- } catch (StorageException e ) {
111- if (e .getCode () == 416 ) {
112- // HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
113- // BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
114- // Emulate that same behavior here to preserve behavior compatibility, though this should
115- // be removed in the next major version.
118+ long diff = byteRangeSpec .length ();
119+ // the check on beginOffset >= 0 used to be a precondition on seek(long)
120+ // move it here to preserve existing behavior while allowing new negative offsets
121+ if (diff <= 0 && byteRangeSpec .beginOffset () >= 0 ) {
116122 return -1 ;
117- } else {
118- throw new IOException (e );
119123 }
120- } catch (IOException e ) {
121- throw e ;
122- } catch (Exception e ) {
123- throw new IOException (StorageException .coalesce (e ));
124+ try {
125+ // trap if the fact that tmp is already closed, and instead return -1
126+ ReadableByteChannel tmp = internalGetLazyChannel ().getChannel ();
127+ if (!tmp .isOpen ()) {
128+ return -1 ;
129+ }
130+ int read = tmp .read (dst );
131+ if (read != -1 ) {
132+ byteRangeSpec = byteRangeSpec .withShiftBeginOffset (read );
133+ }
134+ return read ;
135+ } catch (StorageException e ) {
136+ if (e .getCode () == 416 ) {
137+ // HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
138+ // BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
139+ // Emulate that same behavior here to preserve behavior compatibility, though this should
140+ // be removed in the next major version.
141+ return -1 ;
142+ } else {
143+ throw new IOException (e );
144+ }
145+ } catch (IOException e ) {
146+ throw e ;
147+ } catch (Exception e ) {
148+ throw new IOException (StorageException .coalesce (e ));
149+ }
150+ } finally {
151+ lock .unlock ();
124152 }
125153 }
126154
0 commit comments