-
Notifications
You must be signed in to change notification settings - Fork 15
The big sweep rewrite, part 4: Cassandra impl #2231
Conversation
Iterate over cells in a table without ever loading more than a given number at once regardless of how wide the rows are, but still perform efficiently on narrow tables. This should finally allow us to run sweep safely on Cassandra with decent performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, this is a massive improvement on this existing paging.
I am a little concerned about how hotspotted wide rows will be paging perf unhappy (too many tiny batches and round trips, and batch hints provided by neighbouring rows that aren't hotspotted and are tiny), but the safety of paging through these in general that this patchset provides is way better
} else { | ||
return CassandraKeyValueServices.makeCompositeBuffer( | ||
colNameAndTs.getLhSide(), | ||
colNameAndTs.getRhSide() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably deserves having a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Actually realized I missed an edge case while writing the comments
Iterator<List<CassandraRawCellValue>> rawIter = cellPager.createCellIterator( | ||
tableRef, | ||
request.startRowInclusive(), | ||
request.batchSizeHint().orElse(100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be broken out into a named constant
I'm also scared that almost nothing will set this batch hint, though I haven't traced all of the codepaths that create CandidateCellForSweepRequest
s
The ones I did weren't promising, and I imagine 100 will be pretty safe but unnecessarily quite slow for wide-row dynamic column rows with small cells
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only real caller of getCandidateCellsForSweeping should be SweepTaskRunner, which explicitly sets batchSizeHint to a value from the config, so this default number shouldn't really matter (maybe I should have made it a non-optional field in CandidateCellForSweepingRequest). That said, I'll just replace "100" with AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT
Codecov Report
@@ Coverage Diff @@
## feature/sweep-rewrite #2231 +/- ##
===========================================================
- Coverage 60.4% 60.29% -0.12%
- Complexity 4056 4684 +628
===========================================================
Files 832 839 +7
Lines 39281 39531 +250
Branches 4046 4079 +33
===========================================================
+ Hits 23727 23834 +107
- Misses 14075 14218 +143
Partials 1479 1479
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall change looks sound, and the integration test is welcomed. However, there are parts of the algorithm, particularly CellPager.PageIterator.splitFetchedRowsIntoTasks
that are sufficiently complex to deserve their own set of tests.
I'd also be interested to know what improvements we've obtained in practice - is this something our benchmarks will tell us?
|
||
@Before | ||
public void setUp() { | ||
if (kvs == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to do this in a @BeforeClass
method instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* Here, the height of the rectangle "h" is KeyRange.count and the width "w" is SliceRange.count. | ||
* We want the area of our rectangle to be on the order of "cellBatchHint" supplied by the user, i.e. | ||
* | ||
* w * h ~ cellBatchHint (*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to number these equations in decimal rather than unary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Initially I only had (*) and (**) but then it got a bit out of hand.
// separately (unless we use a Taylor approximation to x*ln(x) instead, which we don't want), | ||
// so we handle the case of small "s" separately to avoid the logarithm blow up at 0. | ||
double extraColumnsToFetch = logArg < 1e-12 ? 0.0 : sigma * Math.sqrt(-2.0 * Math.log(logArg)); | ||
return negotiateExactPageSize(columnsPerRowMean + 1.0 + extraColumnsToFetch, cellBatchHint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
negotiate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah probably not the best verb. Will replace with a plain "get"
(We do some sort of "negotiation" process - get a first estimate for the width, then compute the height, then compute the width again, given the height)
} | ||
|
||
private static PageSizes negotiateExactPageSize(double desiredColumnPerRowLimit, int cellBatchHint) { | ||
int columnPerRowLimit1 = Math.max(2, Math.min(cellBatchHint, (int) Math.round(desiredColumnPerRowLimit))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could columnPerRowLimit1
and columnPerRowLimit2
be given better names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
/* Given the batch size in cells supplied by the user, we want to figure out the optimal (or at least decent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a great foundation for a maths paper. Do we know empirically what benefits we get from doing all of this vs. something simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just don't know what "something simpler" could be. We need to somehow choose the proportions of the rectangle given its area, and I'd rather have some semi-sound theoretical foundation than try to come up with an "empirical" formula. And the actual implementation is actually very simple, I think.
One naive way to handle this would be to simply estimate the mean row width and use that. However, the way probability works, a very significant number of rows will be wider than the mean if some variance is present, and we'll end up fetching those rows one-by-one, impacting the performance severely.
Another naive idea would be to use the maximum row width. But that means that a single very wide row would set our "vertical" batch size to 1 and we'd end up processing rows one-by-one again, even if the rest of the rows are narrow. So the maximum is not robust.
Then we could try something a bit less naive, for example take the mean and multiply it but a constant greater than one / add a constant number to it. But then we'd waste a lot of time on clean fixed-column tables.
So in order to be efficient for both clean fixed-column and dirty tables, we need to take the variance into account somehow. I couldn't find a simpler way to achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an inherent need to fetch the wider_than_w rows one by one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, this is a Cassandra API deficiency: for multiget_slice, you can only specify a single slice predicate for all rows you are requesting (i.e. start with the same column for all rows) . If you have several partially fetched rows, each of them has a different column you have to restart with. So you can't move the "rectangle" horizontally, only vertically.
Second, even if you could do that, you probably still wouldn't because of memory concerns: we need to return results in order, so you can't return the second row until you returned the first one. So you would need to keep everything you fetched in memory until the first row is done, which will result in OOMs.
private final TLongList currentTimestamps = new TLongArrayList(); | ||
private boolean currentLatestValEmpty; | ||
private long numCellTsPairsExamined = 0; | ||
private boolean end = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reachedEnd
(as in a previous file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
while (candidates.isEmpty() && rawIter.hasNext()) { | ||
List<CassandraRawCellValue> cols = rawIter.next(); | ||
for (CassandraRawCellValue col : cols) { | ||
processColumn(col, candidates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the @Output
variable there. I'm sure there's something in 'Effective Java' or similar tones warning against such things. It's also easy to sidestep in this case, since candidates
is only added to.
Let's change this line to candidates.addAll(collectCandidatesForSweeping(col))
, and create + return a fresh list in collectCandidatesForSweeping
(also shining a light on the intent of processColumn
while we're at it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now se that collectCandidatesForSweeping
returns 0 or 1 candidates - so we could have that method return an Optional
instead of a List
, and maybe the method would be better named something like createCandidateForSweepingIfNecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we are on the same page that argument modification is not great in general, however in this case the method is modifying the state of our object in a non-trivial way, so naming it something like createCandidateForSweepingIfNecessary
would make it more confusing (that kind of name gives an impression that the method is "pure", which is not the case).
This is a trade-off I made consciously and I think I'll leave the code as is, given that this is a localized helper method in an already (very) stateful class.
if (candidates.isEmpty()) { | ||
end = true; | ||
if (!currentTimestamps.isEmpty()) { | ||
return ImmutableList.of(createCandidate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use a comment (or more explicit name) showing that we're fashioning a candidate out of the left-over current state.
Why wouldn't this candidate have been added by processColumn
/collectCandidatesForSweeping
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
To answer you question - processColumn
only adds a candidate when it encounters a cell boundary because it needs to group all timestamps that belong to each cell in a single candidate object. So we have to handle the last cell separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks.
} | ||
} | ||
|
||
private static final long[] EMPTY_LONG_ARRAY = new long[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tend to put constants up at the top of the class, rather than hidden away at the bottom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
(It's not like a constant you need to see, though - just trying to avoid allocating empty arrays every time)
@@ -96,6 +96,11 @@ v0.53.0 | |||
- IteratorUtils.forEach removed; it's not needed in a Java 8 codebase. | |||
(`Pull Request <https://github.com/palantir/atlasdb/pull/2207>`__) | |||
|
|||
* - |improved| | |||
- New efficient and OOM-proof implementation of sweep for Cassandra KVS. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have figures on how many round-trips this is likely to save, it would be good to mention it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have figures for the number of round-trips, but the KvsGetCandidateCellsForSweeping.fullTableScanCleanConservative benchmark should get a ~60x improvement.
Yes, we should see a ~60x improvement for KvsGetCandidateCellsForSweepingBenchmarks.fullTableScanCleanConservative |
if (candidates.isEmpty()) { | ||
end = true; | ||
if (!currentTimestamps.isEmpty()) { | ||
return ImmutableList.of(createCandidate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks.
public class SingleRowColumnPagerTest { | ||
|
||
@Test | ||
public void testGetStartColumn() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be four separate unit tests (fixed myself)
We aren't feeling any particular pain with sweep perf on develop at the moment, so we would like to wait for the Oracle implementation before merging this and #2021. Due to other priorities and team members being OOTO, we're unlikely to pick the Oracle impl up ourselves in the next few weeks. |
…ure/cassandra-sweep Note: this doesn't compile; I need to unpick the various changes made to AbstractGetCandidateCellsForSweepingTest.
168ad80
to
75610fc
Compare
- Fix a small bug in CassandraGetCandidateCellsForSweepingImpl - Remove an irrelevant test (should_not_oom_when_...) and make CassKVSeSweepTaskRunnerIntegrationTest non-parameterized. (getRangeOfTimestamps() is not used in the sweep code path anymore) - Relax assertions in testSweepBatchesUpToDeleteBatchSize
* The big sweep rewrite, part 3: Postgres impl (#2021) * The big sweep rewrite, part 3: Postgres impl - Implement KeyValueService.getCandidateCellsForSweeping() for Postgres - Add AgnosticResultRow.getBoolean() and LightAgnosticResultRow.getArray() * Address CR comments Also change the stopping condition in computeNext() * Add a release note * Add the ORDER BY that I forgot * Rewrite a fragile test * Join pages based on the number of (cell, ts) pairs * Move release note to develop * The big sweep rewrite, part 4: Cassandra impl (#2231) * The big sweep rewrite, part 4: Cassandra impl Iterate over cells in a table without ever loading more than a given number at once regardless of how wide the rows are, but still perform efficiently on narrow tables. This should finally allow us to run sweep safely on Cassandra with decent performance. * Address clockfort's comments + checkstyle + add release notes * Address code review comments * Split up test cases in SingleRowColumnPagerTest * Tests for KeyRanges * Update locks * Return Optional instead of passing in @output list * Update dependency locks * Fix TestDataBuilder after merge * Make tests green - Fix a small bug in CassandraGetCandidateCellsForSweepingImpl - Remove an irrelevant test (should_not_oom_when_...) and make CassKVSeSweepTaskRunnerIntegrationTest non-parameterized. (getRangeOfTimestamps() is not used in the sweep code path anymore) - Relax assertions in testSweepBatchesUpToDeleteBatchSize * Bring back the old variable name to make checkstyle happy * Run ./gradlew generateLock saveLock * Reapply changes * Fix compile errors * Simplify getCandidateCellsForSweeping() (#2439) * Simplify getCandidateCellsForSweeping() Initially I hoped to take advantage of in-database filtering if we get to implement the "transaction table sweeping" feature. However, seems like that wasn't a great decision on my part - unclear when (and if) we get that feature done, and also how much of improvement we would actually get. The extra logic makes the code significantly more complex, so I think we need to back off and give up the idea. This doesn't invalidate the sweep rewrite project. The new impls still bring significant performance improvements. * The big sweep rewrite, part 5: Oracle impl - Extract the paging logic from the Postgres impl into a separate class named CandidatePagingState so that we could reuse it in the Oracle impl (and also test the logic with unit tests) - Add a new benchmark fullTableScanOneWideRowThorough that simulates sweep of a very wide row. This is important for Oracle - a naive implementation would perform extremely poorly in that scenario. * A bunch of refactors - Make a builder for FullQuery - Rename RangeBoundPredicates -> RangePredicateHelper and make it append to a FullQuery.Builder instead of returning a string + args. - Make DbKvsGetCandidateCellsForSweeping a concrete class with common logic. Now, database-specific logic is behind the CellTsPairLoader interface. - Add full test coverage for CandidatePagingState * Tautological tests for RangePredicateHelper * Import the "correct" assertThat * Static classes * Refactors * CandidatePagingState: extract state-updating methods Also, reorganised class * Expand the comment in OracleCellTsPageLoaderFactory Also filter out empty pages * Fix release notes * Fix release notes for real
Iterate over cells in a table without ever loading more than a given
number at once regardless of how wide the rows are, but still perform
efficiently on narrow tables.
This should finally allow us to run sweep safely on
Cassandra with decent performance.
This change is