feat: skip large rows#2482
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
mutianf
left a comment
There was a problem hiding this comment.
I haven't looked at RowSetUtil or any of the tests but left some comments you can work on first.
| /** | ||
| * This callable converts the "Received rst stream" exception into a retryable {@link ApiException}. | ||
| */ | ||
| public final class LargeRowReadCallable<RequestT, ResponseT, RowT> |
There was a problem hiding this comment.
Since you're modifying the ServerStreamingAttemptCallable directly we can remove this class.
| @Override | ||
| protected void onErrorImpl(Throwable t) { | ||
| // this has no impact | ||
| // if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
I think this has no effect because we create a new strategy here: https://github.com/googleapis/sdk-platform-java/blob/main/gax-java/gax/src/main/java/com/google/api/gax/rpc/RetryingServerStreamingCallable.java#L80 (which I didn't notice previously).
There was a problem hiding this comment.
correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request
sarthakbhutani
left a comment
There was a problem hiding this comment.
have resolved your comments - which are implemented in code.
unresolved comments - have not done - require a discussion - can discuss offline?
| } | ||
| } | ||
|
|
||
| public void dumpLargeRowKeys() { |
There was a problem hiding this comment.
this method can be implemented later. have added javadoc for the same
| @Override | ||
| protected void onErrorImpl(Throwable t) { | ||
| // this has no impact | ||
| // if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request
| new RowMergingCallable<>(convertException, rowAdapter); | ||
|
|
||
| LargeReadRowsResumptionStrategy<RowT> largeRowResumptionStrategy; | ||
| largeRowResumptionStrategy = new LargeReadRowsResumptionStrategy<RowT>(rowAdapter); |
There was a problem hiding this comment.
testing this
| @@ -0,0 +1,125 @@ | |||
| /* | |||
There was a problem hiding this comment.
[duplicate comment]
[can discuss offline]
it is required.
it goes to the convertableExceptionCallable layer, which converts the FailedPreCondition Exception to ApiException & set the exception as ```retryable:true``
which then gets thrown to the ResumptionStrategy layer. Hence, this is required.
I have confirmed this in testing/debugging as well.
sarthakbhutani
left a comment
There was a problem hiding this comment.
added comments
| .withRetrySettings(settings.readRowsSettings().getRetrySettings())); | ||
| } | ||
|
|
||
| public <RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsBaseCallable( |
There was a problem hiding this comment.
I don't think we need to create a base callable for this, so you can probably just extract everything from the other base callable and put it in this method. Otherwise they're all called base callables and it gets a bit confusing.
| createSkipLargeRowsBaseCallable( | ||
| settings.readRowsSettings(), | ||
| rowAdapter, | ||
| new LargeReadRowsResumptionStrategy<RowT>(rowAdapter)); |
There was a problem hiding this comment.
I don't think you need to pass in the resumption strategy here because it's already overriden in the settings in the other callable (line 580). So this variable is not doing anything.
|
|
||
| @Override | ||
| public void onErrorImpl(Throwable t) { | ||
| if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { |
There was a problem hiding this comment.
| if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { | |
| if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
This still needs to be updated.
sarthakbhutani
left a comment
There was a problem hiding this comment.
have made the changes
| remaining = | ||
| RowSetUtil.erase(originalRequest.getRows(), lastSuccessKey, !originalRequest.getReversed()); | ||
| if (!largeRowKeys.isEmpty()) { | ||
| for (ByteString largeRowKey : largeRowKeys) { |
There was a problem hiding this comment.
have made the changes as you suggested.
Open question - is there a reason - why we didn't do this earlier. thinking of any edge cases that may fail.
81dc408 to
18d453c
Compare
|
Warning: This pull request is touching the following templated files:
|
19293fa to
3397e51
Compare
… tests & wip integration tests
…haviour which returns error details,metadata on encountering large rows error
3397e51 to
76e8775
Compare
|
|
||
| @Override | ||
| public void onErrorImpl(Throwable t) { | ||
| if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { |
There was a problem hiding this comment.
This still needs to be updated.
d80c953 to
70a9329
Compare
| * @param fromStart | ||
| * @return | ||
| */ | ||
| public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) { |
There was a problem hiding this comment.
You can simplify this logic:
private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
List<RowRange> rowRanges = new ArrayList<>();
ByteString startKey = StartPoint.extract(range).value;
ByteString endKey = EndPoint.extract(range).value;
// if end key is on the left of large row key, don't split
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
rowRanges.add(range);
return rowRanges;
}
// if start key is on the right of the large row key, don't split
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
rowRanges.add(range);
return rowRanges;
}
// if start key is on the left of the large row key, set the end key to be large row key open
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
RowRange beforeSplit = range
.toBuilder()
.setEndKeyOpen(largeRowKey)
.build();
rowRanges.add(beforeSplit);
}
// if the end key is on the right of the large row key, set the start key to be large row key open
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
RowRange afterSplit = range
.toBuilder()
.setStartKeyOpen(largeRowKey)
.build();
rowRanges.add(afterSplit);
}
return rowRanges;
}
This should cover all edge cases.
There was a problem hiding this comment.
have done. testing.
| public final class RowSetUtil { | ||
| private RowSetUtil() {} | ||
|
|
||
| public static RowSet createSplitRanges( |
There was a problem hiding this comment.
I think you should also resume from the last seen row key.
There was a problem hiding this comment.
I think you want this:
public static RowSet eraseLargeRow(RowSet rowSet, ByteString lastSeenRowKey, ByteString largeRowKey, boolean fromStart) {
// first, remove everything we've already read from the RowSet
RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);
// return null if we've read everything
if (remaining == null) {
return null;
}
// second, remove the large row key from the remaining RowSet
RowSet.Builder newRowSet = RowSet.newBuilder();
// remove large row key from point reads
remaining.getRowKeysList().stream().filter(k -> !k.equals(largeRowKey)).forEach(newRowSet::addRowKeys);
// remove large row key from row ranges
for (RowRange range : remaining.getRowRangesList()) {
List<RowRange> afterSplit = splitOnLargeRowKey(range, largeRowKey);
if (!afterSplit.isEmpty()) {
afterSplit.forEach(newRowSet::addRowRanges);
}
}
if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
return null;
}
return newRowSet.build();
}
splitOnLargeRowKey is in my other comment.
And in your resumption strategy, you wouldn't need to keep the previous request anymore, because that'll be removed by the RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);
There was a problem hiding this comment.
we needed to keep the prev request, because, for the use case of multiple large row keys together.
if we dont keep the prev request, if the prev request failed because of large-row and the next request also failed because of large-row, in this request, we will remove the 2nd large-row key & the prev one would be there, this would keep failing
ex -
request [r1,r4]
r1 - success key
r2 - large-row key
r3 - large-row key
r4 - large-row key
original request -> [r1,r4]
r1 read, r2 fails -> req becomes (r1,r2),(r2,r4]
r3 fails, req becomes -> (r1,r3), (r3,r4] -> which will now fail for r2 again (if the prev request or prev failed row keys are not cached)
| @@ -0,0 +1,125 @@ | |||
| /* | |||
| * Copyright 2021 Google LLC | |||
sarthakbhutani
left a comment
There was a problem hiding this comment.
add comments
| * @param fromStart | ||
| * @return | ||
| */ | ||
| public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) { |
There was a problem hiding this comment.
have done. testing.
|
/gcbrun |
🤖 I have created a release *beep* *boop* --- ## [2.53.0](https://togithub.com/googleapis/java-bigtable/compare/v2.52.0...v2.53.0) (2025-02-21) ### Features * Skip large rows ([#2482](https://togithub.com/googleapis/java-bigtable/issues/2482)) ([cd7f82e](https://togithub.com/googleapis/java-bigtable/commit/cd7f82e4b66dc3c34262c73b26afc2fdfd1deed7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Tasks remaining - - [ ] make changes in the read request API which skips for large rows/internally calls readLargeRowsCallable() - [ ] expose large rows rowkeys in sidechannel/dlq/some other method - which can be exposed to client Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Rollback plan is reviewed and LGTMed - [ ] All new data plane features have a completed end to end testing plan Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
🤖 I have created a release *beep* *boop* --- ## [2.53.0](https://togithub.com/googleapis/java-bigtable/compare/v2.52.0...v2.53.0) (2025-02-21) ### Features * Skip large rows ([#2482](https://togithub.com/googleapis/java-bigtable/issues/2482)) ([cd7f82e](https://togithub.com/googleapis/java-bigtable/commit/cd7f82e4b66dc3c34262c73b26afc2fdfd1deed7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Tasks remaining -
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️
If you write sample code, please follow the samples format.