diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java index 309da695b38..fb943b3f321 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java @@ -42,6 +42,7 @@ import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; import com.palantir.common.compression.StreamCompression; +import com.palantir.common.streams.KeyedStream; import com.palantir.util.Pair; import com.palantir.util.crypto.Sha256Hash; @@ -134,17 +135,19 @@ public Map storeStreams(final Transaction tx, final Map idsToEmptyMetadata = Maps.transformValues(streams, - Functions.constant(getEmptyMetadata())); + Map idsToEmptyMetadata = KeyedStream.stream(streams) + .map($ -> getEmptyMetadata()) + .collectToMap(); putMetadataAndHashIndexTask(tx, idsToEmptyMetadata); - Map idsToMetadata = Maps.transformEntries(streams, - (id, stream) -> storeBlocksAndGetFinalMetadata(tx, id, stream)); + Map idsToMetadata = KeyedStream.stream(streams) + .map((id, stream) -> storeBlocksAndGetFinalMetadata(tx, id, stream)) + .collectToMap(); putMetadataAndHashIndexTask(tx, idsToMetadata); - Map hashes = Maps.transformValues(idsToMetadata, - metadata -> new Sha256Hash(metadata.getHash().toByteArray())); - return hashes; + return KeyedStream.stream(idsToMetadata) + .map(metadata -> new Sha256Hash(metadata.getHash().toByteArray())) + .collectToMap(); } // This method is overridden in generated code. Changes to this method may have unintended consequences. diff --git a/atlasdb-tests-shared/build.gradle b/atlasdb-tests-shared/build.gradle index 40d471f4064..6666117dd1e 100644 --- a/atlasdb-tests-shared/build.gradle +++ b/atlasdb-tests-shared/build.gradle @@ -9,6 +9,8 @@ dependencies { compile project(":atlasdb-config") testCompile project(":atlasdb-config") + testCompile project(":commons-api") + compile project(path: ":atlasdb-feign", configuration: "shadow") compile group: 'com.palantir.tracing', name: 'tracing' diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/StreamTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/StreamTest.java index 5431f717b0b..97282078e89 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/StreamTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/StreamTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -90,6 +91,7 @@ import com.palantir.atlasdb.transaction.api.TransactionConflictException; import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.common.concurrent.PTExecutors; +import com.palantir.common.io.ForwardingInputStream; import com.palantir.util.Pair; import com.palantir.util.crypto.Sha256Hash; @@ -581,16 +583,43 @@ id1, new ByteArrayInputStream(bytes1), assertNull(sha256HashLongMap.get(hash3)); } + @Test + public void readingStreamIdsByHashInTheSameTransactionIsPermitted() throws IOException { + long id = timestampService.getFreshTimestamp(); + + byte[] bytes = generateRandomTwoBlockStream(); + Sha256Hash hash = Sha256Hash.computeHash(bytes); + + Map streams = ImmutableMap.of(id, new ByteArrayInputStream(bytes)); + + storeStreamAndCheckHash(id, hash, streams); + byte[] bytesInKvs = readBytesForSingleStream(id); + assertArrayEquals(bytesInKvs, bytes); + } + + @Test + public void streamsAreNotReused() throws IOException { + long id = timestampService.getFreshTimestamp(); + + byte[] bytes = generateRandomTwoBlockStream(); + Sha256Hash hash = Sha256Hash.computeHash(bytes); + + Map streams = ImmutableMap.of(id, + new CloseEnforcingInputStream(new ByteArrayInputStream(bytes))); + + storeStreamAndCheckHash(id, hash, streams); + byte[] bytesInKvs = readBytesForSingleStream(id); + assertArrayEquals(bytesInKvs, bytes); + } + @Test public void testStoreCopy() { - final byte[] bytes = new byte[2 * StreamTestStreamStore.BLOCK_SIZE_IN_BYTES]; - Random rand = new Random(); - rand.nextBytes(bytes); + byte[] bytes = generateRandomTwoBlockStream(); long id1 = timestampService.getFreshTimestamp(); long id2 = timestampService.getFreshTimestamp(); - ImmutableMap streams = ImmutableMap.of( + Map streams = ImmutableMap.of( id1, new ByteArrayInputStream(bytes), id2, new ByteArrayInputStream(bytes)); @@ -661,6 +690,13 @@ public void testStreamCompression() throws IOException { assertEquals(expectedBlocksUsed, numBlocksUsed); } + private byte[] generateRandomTwoBlockStream() { + byte[] bytes = new byte[2 * StreamTestStreamStore.BLOCK_SIZE_IN_BYTES]; + Random rand = new Random(); + rand.nextBytes(bytes); + return bytes; + } + private StreamMetadata getStreamMetadata(long id) { return txManager.runTaskReadOnly(t -> { StreamTestWithHashStreamMetadataTable table = StreamTestTableFactory.of() @@ -682,6 +718,21 @@ private Optional getStream(long streamId) { }); } + private void storeStreamAndCheckHash(long id, Sha256Hash hash, Map streams) { + txManager.runTaskWithRetry(t -> { + Map hashes = defaultStore.storeStreams(t, streams); + assertEquals(hash, hashes.get(id)); + return null; + }); + } + + private byte[] readBytesForSingleStream(long id) throws IOException { + return txManager.runTaskWithRetry(t -> { + Optional inputStream = defaultStore.loadSingleStream(t, id); + return IOUtils.toByteArray(inputStream.get()); + }); + } + private void assertStreamDoesNotExist(final long streamId) { assertFalse("This element should have been deleted", getStream(streamId).isPresent()); } @@ -745,4 +796,27 @@ private byte[] getIncompressibleBytes(int size) { new Random(0).nextBytes(data); return data; } + + private static final class CloseEnforcingInputStream extends ForwardingInputStream { + private final InputStream delegate; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private CloseEnforcingInputStream(InputStream delegate) { + this.delegate = delegate; + } + + @Override + protected InputStream delegate() { + if (closed.get()) { + throw new UnsupportedOperationException("The underlying input stream has been closed"); + } + return delegate; + } + + @Override + public void close() throws IOException { + delegate.close(); + closed.set(true); + } + } } diff --git a/changelog/@unreleased/pr-4444.v2.yml b/changelog/@unreleased/pr-4444.v2.yml new file mode 100644 index 00000000000..8e58327abb8 --- /dev/null +++ b/changelog/@unreleased/pr-4444.v2.yml @@ -0,0 +1,13 @@ +type: fix +fix: + description: "We no longer read and physically store streams lazily in the AtlasDB + store when transactionally storing a stream (via `storeStreams()`). \n\nPreviously, + users who stored and marked streams in a single transaction and subsequently referenced + the returned `Sha256Hash` may fail in the following ways:\n\n- if the provided + `InputStream`s throw when they are read after being closed, these exceptions would + be thrown out to the user.\n- if the provided `InputStream`s do not throw when + they are read after being closed, AtlasDB would erroneously write an empty block + as the first block of the stream, and incorrect metadata of the stream being an + empty stream." + links: + - https://github.com/palantir/atlasdb/pull/4444