Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PaxosStateLog] Parallel reads and processing #4758

Merged
merged 6 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/@unreleased/pr-4758.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: improvement
improvement:
description: '`PaxosStateLogImpl` now uses a reentrant read write lock for synchronisation
instead of a reentrant lock. This slightly improves performance and allows for
higher read throughput by parallelising reads.'
links:
- https://github.com/palantir/atlasdb/pull/4758
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ public abstract class PaxosRound<V extends Persistable & Versionable> {
public byte[] valueBytes() {
return value().persistToBytes();
}

public static <V extends Persistable & Versionable> PaxosRound<V> of(long sequence, V value) {
return ImmutablePaxosRound.<V>builder().sequence(sequence).value(value).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.paxos;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.atlasdb.futures.AtlasFutures;
import com.palantir.common.base.Throwables;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.common.persist.Persistable;

public class PaxosStateLogBatchReader<V extends Persistable & Versionable> implements AutoCloseable {
private final PaxosStateLog<V> delegate;
private final Persistable.Hydrator<V> hydrator;
private final ListeningExecutorService executor;

public PaxosStateLogBatchReader(PaxosStateLog<V> delegate, Persistable.Hydrator<V> hydrator, int numThreads) {
this.delegate = delegate;
this.hydrator = hydrator;
this.executor = MoreExecutors.listeningDecorator(
PTExecutors.newFixedThreadPool(numThreads, new NamedThreadFactory("psl-reader", true)));
}

/**
* Reads entries from startSequence (inclusive) to startSequence + numEntries (exclusive) from the delegate log.
*
* @param startSequence first sequence to read
* @param numEntries number of entries to read
* @return a list of paxos rounds for all the present entries in the delegate log
*/
public List<PaxosRound<V>> readBatch(long startSequence, int numEntries) {
return AtlasFutures.getUnchecked(
Futures.allAsList(
LongStream.range(startSequence, startSequence + numEntries)
.mapToObj(sequence -> executor.submit(() -> singleRead(sequence)))
.collect(Collectors.toList())))
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private Optional<PaxosRound<V>> singleRead(long sequence) {
try {
return Optional.ofNullable(delegate.readRound(sequence))
Copy link
Contributor

@jkozlowski jkozlowski May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't #readRound have a exclusive lock in PaxosStateLogImpl? How are you going to rewire this stuff, will there be a different impl just for reading the files that doesn't log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's something we have to discuss, but it's a solvable problem just outside the scope of this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think we should be good to change that to a R/W lock (though be careful when reading that class to make sure it's safe!)

.map(bytes -> PaxosRound.of(sequence, hydrator.hydrateFromBytes(bytes)));
} catch (IOException e) {
throw Throwables.rewrapAndThrowUncheckedException(e);
}
}

@Override
public void close() {
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
Expand All @@ -50,8 +51,8 @@

public class PaxosStateLogImpl<V extends Persistable & Versionable> implements PaxosStateLog<V> {

private final ReentrantLock lock = new ReentrantLock();
private final Map<Long, Long> seqToVersionMap = Maps.newHashMap();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Long, Long> seqToVersionMap = Maps.newConcurrentMap();

private static final String TMP_FILE_SUFFIX = ".tmp";
private static final Logger log = LoggerFactory.getLogger(PaxosStateLogImpl.class);
Expand Down Expand Up @@ -99,7 +100,7 @@ public PaxosStateLogImpl(String path) {

@Override
public void writeRound(long seq, V round) {
lock.lock();
lock.writeLock().lock();
try {
// reject old state
Long latestVersion = seqToVersionMap.get(seq);
Expand All @@ -110,7 +111,7 @@ public void writeRound(long seq, V round) {
// do write
writeRoundInternal(seq, round);
} finally {
lock.unlock();
lock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -150,12 +151,12 @@ private void writeRoundInternal(long seq, V round) {

@Override
public byte[] readRound(long seq) throws IOException {
lock.lock();
lock.readLock().lock();
try {
File file = new File(path, getFilenameFromSeq(seq));
return getBytesAndCheckChecksum(file);
} finally {
lock.unlock();
lock.readLock().unlock();
}
}

Expand All @@ -178,7 +179,7 @@ public long getGreatestLogEntry() {
}

public long getExtremeLogEntry(Extreme extreme) {
lock.lock();
lock.readLock().lock();
try {
File dir = new File(path);
List<File> files = getLogEntries(dir);
Expand All @@ -196,14 +197,14 @@ public long getExtremeLogEntry(Extreme extreme) {
return PaxosAcceptor.NO_LOG_ENTRY;
}
} finally {
lock.unlock();
lock.readLock().unlock();
}
}

@SuppressWarnings("ParameterAssignment")
@Override
public void truncate(long toDeleteInclusive) {
lock.lock();
lock.writeLock().lock();
try {
long greatestLogEntry = getGreatestLogEntry();
if (greatestLogEntry >= 0) {
Expand All @@ -224,7 +225,7 @@ public void truncate(long toDeleteInclusive) {
}
}
} finally {
lock.unlock();
lock.writeLock().unlock();
}
}

Expand All @@ -244,7 +245,7 @@ private List<File> getLogEntries(File dir) {
* @throws IOException when the data checksum fails or there is another problem reading from disk
*/
private byte[] getBytesAndCheckChecksum(File file) throws IOException {
lock.lock();
lock.readLock().lock();
try {
InputStream fileIn = null;
PaxosPersistence.PaxosHeader.Builder headerBuilder =
Expand Down Expand Up @@ -273,7 +274,7 @@ private byte[] getBytesAndCheckChecksum(File file) throws IOException {
IOUtils.closeQuietly(fileIn);
}
} finally {
lock.unlock();
lock.readLock().unlock();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.paxos;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.junit.Test;

import com.google.common.util.concurrent.Uninterruptibles;

public class PaxosStateLogBatchReaderTest {
private static final int START_SEQUENCE = 123;
private static final int BATCH_SIZE = 250;
private static final List<PaxosRound<PaxosValue>> EXPECTED_ROUNDS = LongStream
.range(START_SEQUENCE, START_SEQUENCE + BATCH_SIZE)
.mapToObj(PaxosStateLogBatchReaderTest::valueForRound)
.map(value -> PaxosRound.of(value.seq, value))
.collect(Collectors.toList());

private PaxosStateLog<PaxosValue> mockLog = mock(PaxosStateLog.class);

@Test
public void readConsecutiveBatch() throws IOException {
when(mockLog.readRound(anyLong()))
.thenAnswer(invocation -> valueForRound((long) invocation.getArguments()[0]).persistToBytes());

try (PaxosStateLogBatchReader<PaxosValue> reader = createReader()) {
assertThat(reader.readBatch(START_SEQUENCE, BATCH_SIZE)).isEqualTo(EXPECTED_ROUNDS);
}
}

@Test
public void exceptionsArePropagated() throws IOException {
IOException ioException = new IOException("test");
when(mockLog.readRound(anyLong()))
.thenAnswer(invocation -> {
long sequence = (long) invocation.getArguments()[0];
if (sequence == 200) {
throw ioException;
}
return valueForRound(sequence).persistToBytes();
});

try (PaxosStateLogBatchReader<PaxosValue> reader = createReader()) {
assertThatThrownBy(() -> reader.readBatch(START_SEQUENCE, BATCH_SIZE)).isInstanceOf(RuntimeException.class);
}
}

@Test
public void readBatchFiltersOutNulls() throws IOException {
Predicate<Long> isOdd = num -> num % 2 != 0;
when(mockLog.readRound(anyLong()))
.thenAnswer(invocation -> {
long sequence = (long) invocation.getArguments()[0];
if (!isOdd.test(sequence)) {
return null;
}
return valueForRound(sequence).persistToBytes();
});

try (PaxosStateLogBatchReader<PaxosValue> reader = createReader()) {
assertThat(reader.readBatch(START_SEQUENCE, BATCH_SIZE))
.isEqualTo(EXPECTED_ROUNDS.stream()
.filter(round -> isOdd.test(round.sequence()))
.collect(Collectors.toList()));
}
}

@Test
public void noResultsReturnsEmptyList() throws IOException {
when(mockLog.readRound(anyLong())).thenReturn(null);

try (PaxosStateLogBatchReader<PaxosValue> reader = createReader()) {
assertThat(reader.readBatch(START_SEQUENCE, BATCH_SIZE)).isEmpty();
}
}

@Test
public void executionsGetBatched() throws IOException {
when(mockLog.readRound(anyLong())).thenAnswer(invocation -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return valueForRound((long) invocation.getArguments()[0]).persistToBytes();
});

try (PaxosStateLogBatchReader<PaxosValue> reader = createReader()) {
Instant startInstant = Instant.now();
reader.readBatch(START_SEQUENCE, BATCH_SIZE);
assertThat(Duration.between(Instant.now(), startInstant)).isLessThan(Duration.ofSeconds(1));
}
}

private PaxosStateLogBatchReader<PaxosValue> createReader() {
return new PaxosStateLogBatchReader<>(mockLog, PaxosValue.BYTES_HYDRATOR, 100);
}

private static PaxosValue valueForRound(long round) {
byte[] bytes = new byte[] { 1, 2, 3 };
return new PaxosValue("someLeader", round, bytes);
}
}