diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java index 25fbd31dcca..d21fee98306 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java @@ -694,25 +694,10 @@ private void cellsCleanedUp() { line("for (Cell cell : cells) {"); { line("rows.add(", StreamMetadataRow, ".BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName()));"); } line("}"); - line(StreamIndexTable, " indexTable = tables.get", StreamIndexTable, "(t);"); - line("Set<", StreamIndexRow, "> indexRows = rows.stream()"); - line(" .map(", StreamMetadataRow, "::getId)"); - line(" .map(", StreamIndexRow, "::of)"); - line(" .collect(Collectors.toSet());"); - line("Map<", StreamIndexRow, ", Iterator<", StreamIndexColumnValue, ">> referenceIteratorByStream"); - line(" = indexTable.getRowsColumnRangeIterator(indexRows,"); - line(" BatchColumnRangeSelection.create(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, 1));"); - line("Set<", StreamMetadataRow, "> streamsWithNoReferences"); - line(" = KeyedStream.stream(referenceIteratorByStream)"); - line(" .filter(valueIterator -> !valueIterator.hasNext())"); - line(" .keys() // (authorized)"); // required for large internal product - line(" .map(", StreamIndexRow, "::getId)"); - line(" .map(", StreamMetadataRow, "::of)"); - line(" .collect(Collectors.toSet());"); line("Map<", StreamMetadataRow, ", StreamMetadata> currentMetadata = metaTable.getMetadatas(rows);"); line("Set<", StreamId, "> toDelete = Sets.newHashSet();"); line("for (Map.Entry<", StreamMetadataRow, ", StreamMetadata> e : currentMetadata.entrySet()) {"); { - line("if (e.getValue().getStatus() != Status.STORED || streamsWithNoReferences.contains(e.getKey())) {"); { + line("if (e.getValue().getStatus() != Status.STORED) {"); { line("toDelete.add(e.getKey().getId());"); } line("}"); } line("}"); diff --git a/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsMetadataCleanupTask.java b/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsMetadataCleanupTask.java index bd5d095d6b4..8be52334edd 100644 --- a/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsMetadataCleanupTask.java +++ b/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsMetadataCleanupTask.java @@ -32,25 +32,10 @@ public boolean cellsCleanedUp(Transaction t, Set cells) { for (Cell cell : cells) { rows.add(SnapshotsStreamMetadataTable.SnapshotsStreamMetadataRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName())); } - SnapshotsStreamIdxTable indexTable = tables.getSnapshotsStreamIdxTable(t); - Set indexRows = rows.stream() - .map(SnapshotsStreamMetadataTable.SnapshotsStreamMetadataRow::getId) - .map(SnapshotsStreamIdxTable.SnapshotsStreamIdxRow::of) - .collect(Collectors.toSet()); - Map> referenceIteratorByStream - = indexTable.getRowsColumnRangeIterator(indexRows, - BatchColumnRangeSelection.create(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, 1)); - Set streamsWithNoReferences - = KeyedStream.stream(referenceIteratorByStream) - .filter(valueIterator -> !valueIterator.hasNext()) - .keys() // (authorized) - .map(SnapshotsStreamIdxTable.SnapshotsStreamIdxRow::getId) - .map(SnapshotsStreamMetadataTable.SnapshotsStreamMetadataRow::of) - .collect(Collectors.toSet()); Map currentMetadata = metaTable.getMetadatas(rows); Set toDelete = Sets.newHashSet(); for (Map.Entry e : currentMetadata.entrySet()) { - if (e.getValue().getStatus() != Status.STORED || streamsWithNoReferences.contains(e.getKey())) { + if (e.getValue().getStatus() != Status.STORED) { toDelete.add(e.getKey().getId()); } } diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataMetadataCleanupTask.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataMetadataCleanupTask.java index 729a5da27eb..0a59103d2a6 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataMetadataCleanupTask.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataMetadataCleanupTask.java @@ -32,25 +32,10 @@ public boolean cellsCleanedUp(Transaction t, Set cells) { for (Cell cell : cells) { rows.add(DataStreamMetadataTable.DataStreamMetadataRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName())); } - DataStreamIdxTable indexTable = tables.getDataStreamIdxTable(t); - Set indexRows = rows.stream() - .map(DataStreamMetadataTable.DataStreamMetadataRow::getId) - .map(DataStreamIdxTable.DataStreamIdxRow::of) - .collect(Collectors.toSet()); - Map> referenceIteratorByStream - = indexTable.getRowsColumnRangeIterator(indexRows, - BatchColumnRangeSelection.create(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, 1)); - Set streamsWithNoReferences - = KeyedStream.stream(referenceIteratorByStream) - .filter(valueIterator -> !valueIterator.hasNext()) - .keys() // (authorized) - .map(DataStreamIdxTable.DataStreamIdxRow::getId) - .map(DataStreamMetadataTable.DataStreamMetadataRow::of) - .collect(Collectors.toSet()); Map currentMetadata = metaTable.getMetadatas(rows); Set toDelete = Sets.newHashSet(); for (Map.Entry e : currentMetadata.entrySet()) { - if (e.getValue().getStatus() != Status.STORED || streamsWithNoReferences.contains(e.getKey())) { + if (e.getValue().getStatus() != Status.STORED) { toDelete.add(e.getKey().getId()); } } diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataMetadataCleanupTask.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataMetadataCleanupTask.java index bb975e93334..3ee1d1b4aa6 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataMetadataCleanupTask.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataMetadataCleanupTask.java @@ -32,25 +32,10 @@ public boolean cellsCleanedUp(Transaction t, Set cells) { for (Cell cell : cells) { rows.add(HotspottyDataStreamMetadataTable.HotspottyDataStreamMetadataRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName())); } - HotspottyDataStreamIdxTable indexTable = tables.getHotspottyDataStreamIdxTable(t); - Set indexRows = rows.stream() - .map(HotspottyDataStreamMetadataTable.HotspottyDataStreamMetadataRow::getId) - .map(HotspottyDataStreamIdxTable.HotspottyDataStreamIdxRow::of) - .collect(Collectors.toSet()); - Map> referenceIteratorByStream - = indexTable.getRowsColumnRangeIterator(indexRows, - BatchColumnRangeSelection.create(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, 1)); - Set streamsWithNoReferences - = KeyedStream.stream(referenceIteratorByStream) - .filter(valueIterator -> !valueIterator.hasNext()) - .keys() // (authorized) - .map(HotspottyDataStreamIdxTable.HotspottyDataStreamIdxRow::getId) - .map(HotspottyDataStreamMetadataTable.HotspottyDataStreamMetadataRow::of) - .collect(Collectors.toSet()); Map currentMetadata = metaTable.getMetadatas(rows); Set toDelete = Sets.newHashSet(); for (Map.Entry e : currentMetadata.entrySet()) { - if (e.getValue().getStatus() != Status.STORED || streamsWithNoReferences.contains(e.getKey())) { + if (e.getValue().getStatus() != Status.STORED) { toDelete.add(e.getKey().getId()); } } diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/TargetedSweepEteTest.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/TargetedSweepEteTest.java index 3908c113590..484987a75b1 100644 --- a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/TargetedSweepEteTest.java +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/TargetedSweepEteTest.java @@ -23,6 +23,7 @@ import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import com.palantir.atlasdb.keyvalue.api.Namespace; @@ -94,17 +95,26 @@ public void targetedSweepLargeStreamsTest() { } @Test - public void targetedSweepCleanupUnmarkedStreamsTest() { + @Ignore // TODO (jkong): This is obviously not the desired behaviour, but we are doing this for safety. + public void targetedSweepCleansUpUnmarkedStreamsTest() { todoClient.storeUnmarkedSnapshot("snap"); todoClient.storeUnmarkedSnapshot("crackle"); todoClient.storeUnmarkedSnapshot("pop"); todoClient.runIterationOfTargetedSweep(); - // Nothing can be deleted from Index because it wasn't written. There should be 3 entries in the other tables - // (hash, metadata and value), one per stream, all of which should be cleaned up. assertDeleted(0, 3, 3, 3); } + @Test + public void targetedSweepCurrentlyDoesNotCleanupUnmarkedStreamsTest() { + todoClient.storeUnmarkedSnapshot("snap"); + todoClient.storeUnmarkedSnapshot("crackle"); + todoClient.storeUnmarkedSnapshot("pop"); + todoClient.runIterationOfTargetedSweep(); + + assertDeleted(0, 0, 0, 0); + } + private void assertDeleted(long idx, long hash, long meta, long val) { Assert.assertThat(todoClient.numberOfCellsDeleted(INDEX_TABLE), equalTo(idx)); Assert.assertThat(todoClient.numberOfCellsDeleted(HASH_TABLE), equalTo(hash)); diff --git a/changelog/@unreleased/pr-4434.v2.yml b/changelog/@unreleased/pr-4434.v2.yml new file mode 100644 index 00000000000..53b7787a7fa --- /dev/null +++ b/changelog/@unreleased/pr-4434.v2.yml @@ -0,0 +1,8 @@ +type: manualTask +manualTask: + description: |- + Stream stores now no longer automatically clean up streams that were stored but never marked as used. We have noticed an increasing incidence of said cleaning being more aggressive than we anticipated. + + Users that use stream stores *must* regenerate their schemas as part of upgrading to this version. + links: + - https://github.com/palantir/atlasdb/pull/4434 diff --git a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosMetadataCleanupTask.java b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosMetadataCleanupTask.java index 8f324b8a4dc..6ee527365f7 100644 --- a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosMetadataCleanupTask.java +++ b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosMetadataCleanupTask.java @@ -32,25 +32,10 @@ public boolean cellsCleanedUp(Transaction t, Set cells) { for (Cell cell : cells) { rows.add(UserPhotosStreamMetadataTable.UserPhotosStreamMetadataRow.BYTES_HYDRATOR.hydrateFromBytes(cell.getRowName())); } - UserPhotosStreamIdxTable indexTable = tables.getUserPhotosStreamIdxTable(t); - Set indexRows = rows.stream() - .map(UserPhotosStreamMetadataTable.UserPhotosStreamMetadataRow::getId) - .map(UserPhotosStreamIdxTable.UserPhotosStreamIdxRow::of) - .collect(Collectors.toSet()); - Map> referenceIteratorByStream - = indexTable.getRowsColumnRangeIterator(indexRows, - BatchColumnRangeSelection.create(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, 1)); - Set streamsWithNoReferences - = KeyedStream.stream(referenceIteratorByStream) - .filter(valueIterator -> !valueIterator.hasNext()) - .keys() // (authorized) - .map(UserPhotosStreamIdxTable.UserPhotosStreamIdxRow::getId) - .map(UserPhotosStreamMetadataTable.UserPhotosStreamMetadataRow::of) - .collect(Collectors.toSet()); Map currentMetadata = metaTable.getMetadatas(rows); Set toDelete = Sets.newHashSet(); for (Map.Entry e : currentMetadata.entrySet()) { - if (e.getValue().getStatus() != Status.STORED || streamsWithNoReferences.contains(e.getKey())) { + if (e.getValue().getStatus() != Status.STORED) { toDelete.add(e.getKey().getId()); } } diff --git a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java index 8561b6b58aa..9c8331f6885 100644 --- a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java +++ b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java @@ -60,7 +60,6 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; @@ -70,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class UserPhotosStreamStore extends AbstractPersistentStreamStore { @@ -412,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -434,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier}