2828import java .util .Map ;
2929import java .util .concurrent .atomic .AtomicLong ;
3030import java .util .concurrent .atomic .AtomicReference ;
31+ import java .util .concurrent .locks .ReentrantLock ;
3132import org .checkerframework .checker .nullness .qual .MonotonicNonNull ;
3233import org .checkerframework .checker .nullness .qual .Nullable ;
3334
@@ -39,6 +40,7 @@ final class BlobDescriptorState {
3940 private final AtomicReference <@ MonotonicNonNull Object > metadata ;
4041 private final AtomicLong readIdSeq ;
4142 private final Map <Long , BlobDescriptorStreamRead > outstandingReads ;
43+ private final ReentrantLock lock ;
4244
4345 BlobDescriptorState (BidiReadObjectRequest openRequest ) {
4446 this .openRequest = openRequest ;
@@ -47,9 +49,16 @@ final class BlobDescriptorState {
4749 this .metadata = new AtomicReference <>();
4850 this .readIdSeq = new AtomicLong (1 );
4951 this .outstandingReads = new HashMap <>();
52+ this .lock = new ReentrantLock ();
5053 }
5154
5255 BidiReadObjectRequest getOpenRequest () {
56+ Object obj = metadata .get ();
57+ if (obj != null && obj .getGeneration () != openRequest .getReadObjectSpec ().getGeneration ()) {
58+ BidiReadObjectRequest .Builder b = openRequest .toBuilder ();
59+ b .getReadObjectSpecBuilder ().setGeneration (obj .getGeneration ());
60+ return b .build ();
61+ }
5362 return openRequest ;
5463 }
5564
@@ -76,20 +85,29 @@ long newReadId() {
7685
7786 @ Nullable
7887 BlobDescriptorStreamRead getOutstandingRead (long key ) {
79- synchronized (this ) {
88+ lock .lock ();
89+ try {
8090 return outstandingReads .get (key );
91+ } finally {
92+ lock .unlock ();
8193 }
8294 }
8395
8496 void putOutstandingRead (long key , BlobDescriptorStreamRead value ) {
85- synchronized (this ) {
97+ lock .lock ();
98+ try {
8699 outstandingReads .put (key , value );
100+ } finally {
101+ lock .unlock ();
87102 }
88103 }
89104
90105 void removeOutstandingRead (long key ) {
91- synchronized (this ) {
106+ lock .lock ();
107+ try {
92108 outstandingReads .remove (key );
109+ } finally {
110+ lock .unlock ();
93111 }
94112 }
95113
@@ -103,22 +121,28 @@ String getRoutingToken() {
103121 }
104122
105123 BlobDescriptorStreamRead assignNewReadId (long oldReadId ) {
106- synchronized (this ) {
124+ lock .lock ();
125+ try {
107126 BlobDescriptorStreamRead remove = outstandingReads .remove (oldReadId );
108- checkState (remove != null , "unable to locate old" );
127+ checkState (remove != null , "unable to locate old read " );
109128 long newReadId = newReadId ();
110129 BlobDescriptorStreamRead withNewReadId = remove .withNewReadId (newReadId );
111130 outstandingReads .put (newReadId , withNewReadId );
112131 return withNewReadId ;
132+ } finally {
133+ lock .unlock ();
113134 }
114135 }
115136
116137 List <ReadRange > getOutstandingReads () {
117- synchronized (this ) {
138+ lock .lock ();
139+ try {
118140 return outstandingReads .values ().stream ()
119141 .filter (BlobDescriptorStreamRead ::readyToSend )
120142 .map (BlobDescriptorStreamRead ::makeReadRange )
121143 .collect (ImmutableList .toImmutableList ());
144+ } finally {
145+ lock .unlock ();
122146 }
123147 }
124148}
0 commit comments