2020
2121import com .google .api .core .SettableApiFuture ;
2222import com .google .api .gax .grpc .GrpcCallContext ;
23+ import com .google .api .gax .rpc .ApiException ;
2324import com .google .api .gax .rpc .ApiStreamObserver ;
2425import com .google .api .gax .rpc .ClientStreamingCallable ;
2526import com .google .cloud .storage .ChunkSegmenter .ChunkSegment ;
2627import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
2728import com .google .cloud .storage .UnbufferedWritableByteChannelSession .UnbufferedWritableByteChannel ;
2829import com .google .cloud .storage .WriteCtx .SimpleWriteObjectRequestBuilderFactory ;
30+ import com .google .common .collect .ImmutableList ;
2931import com .google .protobuf .ByteString ;
3032import com .google .storage .v2 .ChecksummedData ;
3133import com .google .storage .v2 .ObjectChecksums ;
3436import java .io .IOException ;
3537import java .nio .ByteBuffer ;
3638import java .nio .channels .ClosedChannelException ;
37- import java .util .ArrayList ;
38- import java .util .List ;
3939import java .util .concurrent .ExecutionException ;
40- import java .util .function .Consumer ;
41- import java .util .function .LongConsumer ;
4240import org .checkerframework .checker .nullness .qual .NonNull ;
4341
4442final class GapicUnbufferedDirectWritableByteChannel implements UnbufferedWritableByteChannel {
@@ -55,34 +53,78 @@ final class GapicUnbufferedDirectWritableByteChannel implements UnbufferedWritab
5553 private boolean open = true ;
5654 private boolean first = true ;
5755 private boolean finished = false ;
56+ private volatile WriteObjectRequest lastWrittenRequest ;
5857
5958 GapicUnbufferedDirectWritableByteChannel (
6059 SettableApiFuture <WriteObjectResponse > resultFuture ,
6160 ChunkSegmenter chunkSegmenter ,
6261 ClientStreamingCallable <WriteObjectRequest , WriteObjectResponse > write ,
63- SimpleWriteObjectRequestBuilderFactory requestFactory ) {
64- String bucketName = requestFactory .bucketName ();
62+ WriteCtx < SimpleWriteObjectRequestBuilderFactory > writeCtx ) {
63+ String bucketName = writeCtx . getRequestFactory () .bucketName ();
6564 this .resultFuture = resultFuture ;
6665 this .chunkSegmenter = chunkSegmenter ;
6766
6867 GrpcCallContext internalContext =
6968 contextWithBucketName (bucketName , GrpcCallContext .createDefault ());
7069 this .write = write .withDefaultCallContext (internalContext );
7170
72- this .writeCtx = new WriteCtx <>( requestFactory ) ;
73- this .responseObserver = new Observer (writeCtx . getConfirmedBytes ():: set , resultFuture :: set );
71+ this .writeCtx = writeCtx ;
72+ this .responseObserver = new Observer (internalContext );
7473 }
7574
7675 @ Override
7776 public long write (ByteBuffer [] srcs , int srcsOffset , int srcsLength ) throws IOException {
78- return internalWrite (srcs , srcsOffset , srcsLength , false );
79- }
77+ if (!open ) {
78+ throw new ClosedChannelException ();
79+ }
8080
81- @ Override
82- public long writeAndClose (ByteBuffer [] srcs , int srcsOffset , int srcsLength ) throws IOException {
83- long write = internalWrite (srcs , srcsOffset , srcsLength , true );
84- close ();
85- return write ;
81+ ChunkSegment [] data = chunkSegmenter .segmentBuffers (srcs , srcsOffset , srcsLength );
82+ if (data .length == 0 ) {
83+ return 0 ;
84+ }
85+
86+ try {
87+ ApiStreamObserver <WriteObjectRequest > openedStream = openedStream ();
88+ int bytesConsumed = 0 ;
89+ for (ChunkSegment datum : data ) {
90+ Crc32cLengthKnown crc32c = datum .getCrc32c ();
91+ ByteString b = datum .getB ();
92+ int contentSize = b .size ();
93+ long offset = writeCtx .getTotalSentBytes ().getAndAdd (contentSize );
94+ Crc32cLengthKnown cumulative =
95+ writeCtx
96+ .getCumulativeCrc32c ()
97+ .accumulateAndGet (crc32c , chunkSegmenter .getHasher ()::nullSafeConcat );
98+ ChecksummedData .Builder checksummedData = ChecksummedData .newBuilder ().setContent (b );
99+ if (crc32c != null ) {
100+ checksummedData .setCrc32C (crc32c .getValue ());
101+ }
102+ WriteObjectRequest .Builder builder = writeCtx .newRequestBuilder ();
103+ if (!first ) {
104+ builder .clearWriteObjectSpec ();
105+ builder .clearObjectChecksums ();
106+ }
107+ builder .setWriteOffset (offset ).setChecksummedData (checksummedData .build ());
108+ if (!datum .isOnlyFullBlocks ()) {
109+ builder .setFinishWrite (true );
110+ if (cumulative != null ) {
111+ builder .setObjectChecksums (
112+ ObjectChecksums .newBuilder ().setCrc32C (cumulative .getValue ()).build ());
113+ }
114+ finished = true ;
115+ }
116+
117+ WriteObjectRequest build = builder .build ();
118+ first = false ;
119+ bytesConsumed += contentSize ;
120+ lastWrittenRequest = build ;
121+ openedStream .onNext (build );
122+ }
123+ return bytesConsumed ;
124+ } catch (RuntimeException e ) {
125+ resultFuture .setException (e );
126+ throw e ;
127+ }
86128 }
87129
88130 @ Override
@@ -95,6 +137,7 @@ public void close() throws IOException {
95137 ApiStreamObserver <WriteObjectRequest > openedStream = openedStream ();
96138 if (!finished ) {
97139 WriteObjectRequest message = finishMessage ();
140+ lastWrittenRequest = message ;
98141 try {
99142 openedStream .onNext (message );
100143 openedStream .onCompleted ();
@@ -115,79 +158,22 @@ public void close() throws IOException {
115158 responseObserver .await ();
116159 }
117160
118- private long internalWrite (ByteBuffer [] srcs , int srcsOffset , int srcsLength , boolean finalize )
119- throws ClosedChannelException {
120- if (!open ) {
121- throw new ClosedChannelException ();
122- }
123-
124- ChunkSegment [] data = chunkSegmenter .segmentBuffers (srcs , srcsOffset , srcsLength );
125-
126- List <WriteObjectRequest > messages = new ArrayList <>();
127-
128- ApiStreamObserver <WriteObjectRequest > openedStream = openedStream ();
129- int bytesConsumed = 0 ;
130- for (ChunkSegment datum : data ) {
131- Crc32cLengthKnown crc32c = datum .getCrc32c ();
132- ByteString b = datum .getB ();
133- int contentSize = b .size ();
134- long offset = writeCtx .getTotalSentBytes ().getAndAdd (contentSize );
135- Crc32cLengthKnown cumulative =
136- writeCtx
137- .getCumulativeCrc32c ()
138- .accumulateAndGet (crc32c , chunkSegmenter .getHasher ()::nullSafeConcat );
139- ChecksummedData .Builder checksummedData = ChecksummedData .newBuilder ().setContent (b );
140- if (crc32c != null ) {
141- checksummedData .setCrc32C (crc32c .getValue ());
142- }
143- WriteObjectRequest .Builder builder =
144- writeCtx
145- .newRequestBuilder ()
146- .setWriteOffset (offset )
147- .setChecksummedData (checksummedData .build ());
148- if (!datum .isOnlyFullBlocks ()) {
149- builder .setFinishWrite (true );
150- if (cumulative != null ) {
151- builder .setObjectChecksums (
152- ObjectChecksums .newBuilder ().setCrc32C (cumulative .getValue ()).build ());
153- }
154- finished = true ;
155- }
156-
157- WriteObjectRequest build = possiblyPairDownRequest (builder , first ).build ();
158- first = false ;
159- messages .add (build );
160- bytesConsumed += contentSize ;
161- }
162- if (finalize && !finished ) {
163- messages .add (finishMessage ());
164- finished = true ;
165- }
166-
167- try {
168- for (WriteObjectRequest message : messages ) {
169- openedStream .onNext (message );
170- }
171- } catch (RuntimeException e ) {
172- resultFuture .setException (e );
173- throw e ;
174- }
175-
176- return bytesConsumed ;
177- }
178-
179161 @ NonNull
180162 private WriteObjectRequest finishMessage () {
181163 long offset = writeCtx .getTotalSentBytes ().get ();
182164 Crc32cLengthKnown crc32cValue = writeCtx .getCumulativeCrc32c ().get ();
183165
184- WriteObjectRequest .Builder b =
185- writeCtx .newRequestBuilder ().setFinishWrite (true ).setWriteOffset (offset );
166+ WriteObjectRequest .Builder b = writeCtx .newRequestBuilder ();
167+ if (!first ) {
168+ b .clearWriteObjectSpec ();
169+ b .clearObjectChecksums ();
170+ first = false ;
171+ }
172+ b .setFinishWrite (true ).setWriteOffset (offset );
186173 if (crc32cValue != null ) {
187174 b .setObjectChecksums (ObjectChecksums .newBuilder ().setCrc32C (crc32cValue .getValue ()).build ());
188175 }
189- WriteObjectRequest message = possiblyPairDownRequest (b , first ).build ();
190- return message ;
176+ return b .build ();
191177 }
192178
193179 private ApiStreamObserver <WriteObjectRequest > openedStream () {
@@ -201,48 +187,20 @@ private ApiStreamObserver<WriteObjectRequest> openedStream() {
201187 return stream ;
202188 }
203189
204- /**
205- * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
206- * this utility method centralizes the logic necessary to clear those fields for use by subsequent
207- * messages.
208- */
209- private static WriteObjectRequest .Builder possiblyPairDownRequest (
210- WriteObjectRequest .Builder b , boolean firstMessageOfStream ) {
211- if (firstMessageOfStream && b .getWriteOffset () == 0 ) {
212- return b ;
213- }
214- if (b .getWriteOffset () > 0 ) {
215- b .clearWriteObjectSpec ();
216- }
217-
218- if (b .getWriteOffset () > 0 && !b .getFinishWrite ()) {
219- b .clearObjectChecksums ();
220- }
221- return b ;
222- }
223-
224- static class Observer implements ApiStreamObserver <WriteObjectResponse > {
190+ class Observer implements ApiStreamObserver <WriteObjectResponse > {
225191
226- private final LongConsumer sizeCallback ;
227- private final Consumer <WriteObjectResponse > completeCallback ;
192+ private final GrpcCallContext context ;
228193
229194 private final SettableApiFuture <Void > invocationHandle ;
230195 private volatile WriteObjectResponse last ;
231196
232- Observer (LongConsumer sizeCallback , Consumer <WriteObjectResponse > completeCallback ) {
233- this .sizeCallback = sizeCallback ;
234- this .completeCallback = completeCallback ;
197+ Observer (GrpcCallContext context ) {
198+ this .context = context ;
235199 this .invocationHandle = SettableApiFuture .create ();
236200 }
237201
238202 @ Override
239203 public void onNext (WriteObjectResponse value ) {
240- // incremental update
241- if (value .hasPersistedSize ()) {
242- sizeCallback .accept (value .getPersistedSize ());
243- } else if (value .hasResource ()) {
244- sizeCallback .accept (value .getResource ().getSize ());
245- }
246204 last = value ;
247205 }
248206
@@ -257,15 +215,58 @@ public void onNext(WriteObjectResponse value) {
257215 */
258216 @ Override
259217 public void onError (Throwable t ) {
260- invocationHandle .setException (t );
218+ if (t instanceof ApiException ) {
219+ // use StorageExceptions logic to translate from ApiException to our status codes ensuring
220+ // things fall in line with our retry handlers.
221+ // This is suboptimal, as it will initialize a second exception, however this is the
222+ // unusual case, and it should not cause a significant overhead given its rarity.
223+ StorageException tmp = StorageException .asStorageException ((ApiException ) t );
224+ StorageException storageException =
225+ ResumableSessionFailureScenario .toStorageException (
226+ tmp .getCode (), tmp .getMessage (), tmp .getReason (), getRequests (), null , context , t );
227+ invocationHandle .setException (storageException );
228+ } else {
229+ invocationHandle .setException (t );
230+ }
261231 }
262232
263233 @ Override
264234 public void onCompleted () {
265- if (last != null && last .hasResource ()) {
266- completeCallback .accept (last );
235+ try {
236+ if (last == null ) {
237+ throw new StorageException (
238+ 0 , "onComplete without preceding onNext, unable to determine success." );
239+ } else if (last .hasResource ()) {
240+ long totalSentBytes = writeCtx .getTotalSentBytes ().get ();
241+ long finalSize = last .getResource ().getSize ();
242+ if (totalSentBytes == finalSize ) {
243+ writeCtx .getConfirmedBytes ().set (finalSize );
244+ resultFuture .set (last );
245+ } else if (finalSize < totalSentBytes ) {
246+ throw ResumableSessionFailureScenario .SCENARIO_4_1 .toStorageException (
247+ getRequests (), last , context , null );
248+ } else {
249+ throw ResumableSessionFailureScenario .SCENARIO_4_2 .toStorageException (
250+ getRequests (), last , context , null );
251+ }
252+ } else {
253+ throw ResumableSessionFailureScenario .SCENARIO_0 .toStorageException (
254+ getRequests (), last , context , null );
255+ }
256+ } catch (Throwable se ) {
257+ open = false ;
258+ invocationHandle .setException (se );
259+ } finally {
260+ invocationHandle .set (null );
261+ }
262+ }
263+
264+ private @ NonNull ImmutableList <@ NonNull WriteObjectRequest > getRequests () {
265+ if (lastWrittenRequest == null ) {
266+ return ImmutableList .of ();
267+ } else {
268+ return ImmutableList .of (lastWrittenRequest );
267269 }
268- invocationHandle .set (null );
269270 }
270271
271272 void await () {
0 commit comments