Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PDS-10{3841,4895}] Part 5: Don't Reuse Streams In The Stream Store #4444

Merged
merged 13 commits into from
Nov 26, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.StreamCompression;
import com.palantir.common.streams.KeyedStream;
import com.palantir.util.Pair;
import com.palantir.util.crypto.Sha256Hash;

Expand Down Expand Up @@ -134,17 +135,19 @@ public Map<Long, Sha256Hash> storeStreams(final Transaction tx, final Map<Long,
return ImmutableMap.of();
}

Map<Long, StreamMetadata> idsToEmptyMetadata = Maps.transformValues(streams,
Functions.constant(getEmptyMetadata()));
Map<Long, StreamMetadata> idsToEmptyMetadata = KeyedStream.stream(streams)
.map($ -> getEmptyMetadata())
.collectToMap();
putMetadataAndHashIndexTask(tx, idsToEmptyMetadata);

Map<Long, StreamMetadata> idsToMetadata = Maps.transformEntries(streams,
(id, stream) -> storeBlocksAndGetFinalMetadata(tx, id, stream));
Map<Long, StreamMetadata> idsToMetadata = KeyedStream.stream(streams)
.map((id, stream) -> storeBlocksAndGetFinalMetadata(tx, id, stream))
.collectToMap();
putMetadataAndHashIndexTask(tx, idsToMetadata);

Map<Long, Sha256Hash> hashes = Maps.transformValues(idsToMetadata,
metadata -> new Sha256Hash(metadata.getHash().toByteArray()));
return hashes;
return KeyedStream.stream(idsToMetadata)
.map(metadata -> new Sha256Hash(metadata.getHash().toByteArray()))
.collectToMap();
}

// This method is overridden in generated code. Changes to this method may have unintended consequences.
Expand Down
2 changes: 2 additions & 0 deletions atlasdb-tests-shared/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dependencies {
compile project(":atlasdb-config")
testCompile project(":atlasdb-config")

testCompile project(":commons-api")

compile project(path: ":atlasdb-feign", configuration: "shadow")

compile group: 'com.palantir.tracing', name: 'tracing'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -90,6 +91,7 @@
import com.palantir.atlasdb.transaction.api.TransactionConflictException;
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.common.io.ForwardingInputStream;
import com.palantir.util.Pair;
import com.palantir.util.crypto.Sha256Hash;

Expand Down Expand Up @@ -581,16 +583,43 @@ id1, new ByteArrayInputStream(bytes1),
assertNull(sha256HashLongMap.get(hash3));
}

@Test
public void readingStreamIdsByHashInTheSameTransactionIsPermitted() throws IOException {
long id = timestampService.getFreshTimestamp();

byte[] bytes = generateRandomTwoBlockStream();
Sha256Hash hash = Sha256Hash.computeHash(bytes);

Map<Long, InputStream> streams = ImmutableMap.of(id, new ByteArrayInputStream(bytes));

storeStreamAndCheckHash(id, hash, streams);
byte[] bytesInKvs = readBytesForSingleStream(id);
assertArrayEquals(bytesInKvs, bytes);
OStevan marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void streamsAreNotReused() throws IOException {
long id = timestampService.getFreshTimestamp();

byte[] bytes = generateRandomTwoBlockStream();
Sha256Hash hash = Sha256Hash.computeHash(bytes);

Map<Long, InputStream> streams = ImmutableMap.of(id,
new CloseEnforcingInputStream(new ByteArrayInputStream(bytes)));
OStevan marked this conversation as resolved.
Show resolved Hide resolved

storeStreamAndCheckHash(id, hash, streams);
byte[] bytesInKvs = readBytesForSingleStream(id);
assertArrayEquals(bytesInKvs, bytes);
}

@Test
public void testStoreCopy() {
final byte[] bytes = new byte[2 * StreamTestStreamStore.BLOCK_SIZE_IN_BYTES];
Random rand = new Random();
rand.nextBytes(bytes);
byte[] bytes = generateRandomTwoBlockStream();

long id1 = timestampService.getFreshTimestamp();
long id2 = timestampService.getFreshTimestamp();

ImmutableMap<Long, InputStream> streams = ImmutableMap.of(
Map<Long, InputStream> streams = ImmutableMap.of(
id1, new ByteArrayInputStream(bytes),
id2, new ByteArrayInputStream(bytes));

Expand Down Expand Up @@ -661,6 +690,13 @@ public void testStreamCompression() throws IOException {
assertEquals(expectedBlocksUsed, numBlocksUsed);
}

private byte[] generateRandomTwoBlockStream() {
byte[] bytes = new byte[2 * StreamTestStreamStore.BLOCK_SIZE_IN_BYTES];
Random rand = new Random();
rand.nextBytes(bytes);
return bytes;
}

private StreamMetadata getStreamMetadata(long id) {
return txManager.runTaskReadOnly(t -> {
StreamTestWithHashStreamMetadataTable table = StreamTestTableFactory.of()
Expand All @@ -682,6 +718,21 @@ private Optional<InputStream> getStream(long streamId) {
});
}

private void storeStreamAndCheckHash(long id, Sha256Hash hash, Map<Long, InputStream> streams) {
txManager.runTaskWithRetry(t -> {
Map<Long, Sha256Hash> hashes = defaultStore.storeStreams(t, streams);
assertEquals(hash, hashes.get(id));
return null;
});
}

private byte[] readBytesForSingleStream(long id) throws IOException {
return txManager.runTaskWithRetry(t -> {
Optional<InputStream> inputStream = defaultStore.loadSingleStream(t, id);
return IOUtils.toByteArray(inputStream.get());
});
}

private void assertStreamDoesNotExist(final long streamId) {
assertFalse("This element should have been deleted", getStream(streamId).isPresent());
}
Expand Down Expand Up @@ -745,4 +796,27 @@ private byte[] getIncompressibleBytes(int size) {
new Random(0).nextBytes(data);
return data;
}

private static final class CloseEnforcingInputStream extends ForwardingInputStream {
private final InputStream delegate;
private final AtomicBoolean closed = new AtomicBoolean(false);

private CloseEnforcingInputStream(InputStream delegate) {
this.delegate = delegate;
}

@Override
protected InputStream delegate() {
if (closed.get()) {
throw new UnsupportedOperationException("The underlying input stream has been closed");
}
return delegate;
}

@Override
public void close() throws IOException {
delegate.close();
closed.set(true);
}
}
}
9 changes: 9 additions & 0 deletions changelog/@unreleased/pr-4444.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: fix
fix:
description: We no longer read and physically store streams lazily in the Atlas
stream store when `storeStreams()` is called. This fixes a bug where users who
stored and marked streams in a single transaction and subsequently referenced
the returned `Sha256Hash` may throw when storing streams, and/or suffer from data
corruption.
links:
- https://github.com/palantir/atlasdb/pull/4444