diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index eb0baf385d6..7ae06920e70 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -15,10 +15,12 @@ */ package com.palantir.atlasdb.transaction.impl; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -479,15 +481,18 @@ private Iterator> getRowColumnRangePostFiltered( ImmutableMap.Builder rawBuilder = ImmutableMap.builder(); batch.forEach(rawBuilder::put); Map raw = rawBuilder.build(); + validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp()); if (raw.isEmpty()) { return Collections.emptyIterator(); } + SortedMap postFiltered = ImmutableSortedMap.copyOf( getWithPostFilteringSync( tableRef, raw, - x -> x)); + x -> x), + preserveInputRowOrder(batch)); return postFiltered.entrySet().iterator(); })); } @@ -527,6 +532,22 @@ protected Iterator> computeNext() { return Iterators.concat(postFilteredBatches); } + private Comparator preserveInputRowOrder(List> inputEntries) { + // N.B. This batch could be spread across multiple rows, and those rows might extend into other + // batches. We are given cells for a row grouped together, so easiest way to ensure they stay together + // is to preserve the original row order. + return Comparator + .comparing( + (Cell cell) -> ByteBuffer.wrap(cell.getRowName()), + Ordering.explicit(inputEntries.stream() + .map(Map.Entry::getKey) + .map(Cell::getRowName) + .map(ByteBuffer::wrap) + .distinct() + .collect(ImmutableList.toImmutableList()))) + .thenComparing(Cell::getColumnName, PtBytes.BYTES_COMPARATOR); + } + /** * Partitions a {@link RowColumnRangeIterator} into contiguous blocks that share the same row name. * {@link KeyValueService#getRowsColumnRange(TableReference, Iterable, ColumnRangeSelection, int, long)} guarantees diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java index 819c51536e2..76ffc801c61 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java @@ -37,10 +37,12 @@ import static org.mockito.Mockito.when; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -87,6 +89,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -1173,6 +1176,45 @@ public void cleanup() {} assertEquals(ImmutableList.of(firstCell, secondCell), cells); } + @Test + public void getOtherRowsColumnRangesReturnsInOrderInCaseOfAbortedTxns() { + byte[] row = "foo".getBytes(); + Cell firstCell = Cell.create(row, "a".getBytes()); + Cell secondCell = Cell.create(row, "b".getBytes()); + byte[] value = new byte[1]; + + serializableTxManager.runTaskWithRetry(tx -> { + tx.put(TABLE, ImmutableMap.of(firstCell, value, secondCell, value)); + return null; + }); + + // this will write into the DB, because the protocol demands we write before we get a commit timestamp + RuntimeException conditionFailure = new RuntimeException(); + assertThatThrownBy(() -> serializableTxManager.runTaskWithConditionWithRetry(() -> + new PreCommitCondition() { + @Override + public void throwIfConditionInvalid(long timestamp) { + throw conditionFailure; + } + + @Override + public void cleanup() {} + }, (tx, condition) -> { + tx.put(TABLE, ImmutableMap.of(firstCell, value)); + return null; + })).isSameAs(conditionFailure); + + List cells = serializableTxManager.runTaskReadOnly(tx -> + Lists.transform( + Lists.newArrayList(tx.getRowsColumnRange( + TABLE, + ImmutableList.of(row), + new ColumnRangeSelection(null, null), + 10)), + Map.Entry::getKey)); + assertEquals(ImmutableList.of(firstCell, secondCell), cells); + } + @Test public void testRowsColumnRangesSingleIteratorVersion() { runTestForGetRowsColumnRangeSingleIteratorVersion(1, 1, 0); @@ -1184,6 +1226,9 @@ public void testRowsColumnRangesSingleIteratorVersion() { runTestForGetRowsColumnRangeSingleIteratorVersion(10, 10, 5); runTestForGetRowsColumnRangeSingleIteratorVersion(10, 10, 10); runTestForGetRowsColumnRangeSingleIteratorVersion(100, 100, 99); + // Add a test where neither the numCellsPerRow and the batch size (10) are divisible by each other. + // This tests what happens when a row's cells are spread across multiple batches. + runTestForGetRowsColumnRangeSingleIteratorVersion(101, 11, 0); } private void runTestForGetRowsColumnRangeSingleIteratorVersion( @@ -1213,14 +1258,21 @@ private void runTestForGetRowsColumnRangeSingleIteratorVersion( }); keyValueService.addGarbageCollectionSentinelValues(TABLE, expectedDeletedCells); + List shuffledRows = expectedRows; + Collections.shuffle(shuffledRows); List cells = serializableTxManager.runTaskReadOnly(tx -> ImmutableList.copyOf(Iterators.transform( tx.getRowsColumnRange( TABLE, - expectedRows, + shuffledRows, new ColumnRangeSelection(null, null), 10), Map.Entry::getKey))); + expectedCells.sort(Comparator + .comparing( + (Cell cell) -> ByteBuffer.wrap(cell.getRowName()), + Ordering.explicit(Lists.transform(shuffledRows, ByteBuffer::wrap))) + .thenComparing(Cell::getColumnName, PtBytes.BYTES_COMPARATOR)); Assertions.assertThat(cells).isEqualTo(expectedCells); keyValueService.truncateTable(TABLE); diff --git a/changelog/@unreleased/pr-4687.v2.yml b/changelog/@unreleased/pr-4687.v2.yml new file mode 100644 index 00000000000..65b4415b601 --- /dev/null +++ b/changelog/@unreleased/pr-4687.v2.yml @@ -0,0 +1,7 @@ +type: fix +fix: + description: Fixes a bug in 0.198.9. Transaction#getRowsColumnRange (the 4 parameter + version) will now properly return cells for a row all grouped together in one + batch (no cells from other rows mixed in). + links: + - https://github.com/palantir/atlasdb/pull/4687