Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Conjure Java Runtime] Part 17: 🚨🚚🚨 Optionals for Paxos Learners (#4361)
Browse files Browse the repository at this point in the history
* Revise Leaders to use Optional

* Add generated changelog entries

* fix

* CR comments, part 1

* CR

* Fix bug in asserts

* Remove more annotations
  • Loading branch information
jeremyk-91 authored Oct 31, 2019
1 parent 842cbc9 commit 47dacb2
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class LeadersTest {
@Test
public void canCreateProxyAndLocalListOfPaxosLearners() {
PaxosLearner localLearner = mock(PaxosLearner.class);
PaxosValue value = mock(PaxosValue.class);
when(localLearner.getGreatestLearnedValue()).thenReturn(value);
Optional<PaxosValue> presentPaxosValue = Optional.of(mock(PaxosValue.class));
when(localLearner.getGreatestLearnedValue()).thenReturn(presentPaxosValue);

List<PaxosLearner> paxosLearners = Leaders.createProxyAndLocalList(
new MetricRegistry(),
Expand All @@ -59,7 +59,7 @@ public void canCreateProxyAndLocalListOfPaxosLearners() {

MatcherAssert.assertThat(paxosLearners.size(), is(REMOTE_SERVICE_ADDRESSES.size() + 1));
paxosLearners.forEach(object -> MatcherAssert.assertThat(object, not(nullValue())));
MatcherAssert.assertThat(Iterables.getLast(paxosLearners).getGreatestLearnedValue(), is(value));
MatcherAssert.assertThat(Iterables.getLast(paxosLearners).getGreatestLearnedValue(), is(presentPaxosValue));
verify(localLearner).getGreatestLearnedValue();
verifyNoMoreInteractions(localLearner);
}
Expand Down
9 changes: 9 additions & 0 deletions changelog/@unreleased/pr-4361.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: break
break:
description: The `PaxosLearner` interface now returns `Optional<PaxosValue>` instead
of a nullable `PaxosValue` for the `getLearnedValue(seq)` and `getGreatestLearnedValue()`
methods. The wire format remains unchanged (so, for example, rolling upgrades
of TimeLock, or servers in an embedded leader configuration, are safe). Users
who require the old behaviour can use `Optional.orElse(null)`.
links:
- https://github.com/palantir/atlasdb/pull/4361
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package com.palantir.paxos;

import java.util.Collection;
import java.util.Optional;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -44,30 +43,28 @@ public interface PaxosLearner {
void learn(@PathParam("seq") long seq, PaxosValue val);

/**
* Returns learned value or null if non-exists.
* Returns the learned value at the specified sequence number, or {@link Optional#empty()} if the learner
* does not know such a value.
*/
@Nullable
@GET
@Path("learned-value/{seq:.+}")
@Produces(MediaType.APPLICATION_JSON)
PaxosValue getLearnedValue(@PathParam("seq") long seq);
Optional<PaxosValue> getLearnedValue(@PathParam("seq") long seq);

/**
* Returns the learned value for the greatest known round or null if nothing has been learned.
* Returns the learned value for the greatest known round or {@link Optional#empty()} if nothing has been learned.
*/
@Nullable
@GET
@Path("greatest-learned-value")
@Produces(MediaType.APPLICATION_JSON)
PaxosValue getGreatestLearnedValue();
Optional<PaxosValue> getGreatestLearnedValue();

/**
* Returns some collection of learned values since the seq-th round (inclusive).
*
* @param seq lower round cutoff for returned values
* @return some set of learned values for rounds since the seq-th round
*/
@Nonnull
@GET
@Path("learned-values-since/{seq:.+}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.leader;

import java.util.Optional;
import java.util.UUID;

import com.palantir.paxos.PaxosLearner;
Expand All @@ -34,7 +33,7 @@ public LocalPingableLeader(PaxosLearner knowledge, UUID localUuid) {

@Override
public boolean ping() {
return getGreatestLearnedPaxosValue()
return knowledge.getGreatestLearnedValue()
.map(this::isThisNodeTheLeaderFor)
.orElse(false);
}
Expand All @@ -44,10 +43,6 @@ public String getUUID() {
return localUuid.toString();
}

private Optional<PaxosValue> getGreatestLearnedPaxosValue() {
return Optional.ofNullable(knowledge.getGreatestLearnedValue());
}

private boolean isThisNodeTheLeaderFor(PaxosValue value) {
return value.getLeaderUUID().equals(localUuid.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Optional<LeadershipToken> getCurrentTokenIfLeading() {
}

private LeadershipState determineLeadershipState() {
Optional<PaxosValue> greatestLearnedValue = getGreatestLearnedPaxosValue();
Optional<PaxosValue> greatestLearnedValue = knowledge.getGreatestLearnedValue();
StillLeadingStatus leadingStatus = determineLeadershipStatus(greatestLearnedValue);

return LeadershipState.of(greatestLearnedValue, leadingStatus);
Expand Down Expand Up @@ -181,10 +181,6 @@ private void proposeLeadershipAfter(Optional<PaxosValue> value) {
}
}

private Optional<PaxosValue> getGreatestLearnedPaxosValue() {
return Optional.ofNullable(knowledge.getGreatestLearnedValue());
}

@Override
public StillLeadingStatus isStillLeading(LeadershipToken token) {
if (!(token instanceof PaxosLeadershipToken)) {
Expand Down Expand Up @@ -224,7 +220,7 @@ private boolean isLatestRound(PaxosValue value) {
}

private boolean isLatestRound(Optional<PaxosValue> valueIfAny) {
return valueIfAny.equals(getGreatestLearnedPaxosValue());
return valueIfAny.equals(knowledge.getGreatestLearnedValue());
}

private void recordLeadershipStatus(
Expand Down Expand Up @@ -256,7 +252,7 @@ private boolean updateLearnedStateFromPeers(Optional<PaxosValue> greatestLearned
for (PaxosUpdate update : updates.get()) {
ImmutableCollection<PaxosValue> values = update.getValues();
for (PaxosValue value : values) {
if (knowledge.getLearnedValue(value.getRound()) == null) {
if (!knowledge.getLearnedValue(value.getRound()).isPresent()) {
knowledge.learn(value.getRound(), value);
learned = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package com.palantir.paxos;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.palantir.leader.PaxosKnowledgeEventRecorder;
import com.palantir.logsafe.SafeArg;

Expand Down Expand Up @@ -69,7 +72,7 @@ public void learn(long seq, PaxosValue val) {
}

@Override
public PaxosValue getLearnedValue(long seq) {
public Optional<PaxosValue> getLearnedValue(long seq) {
try {
if (!state.containsKey(seq)) {
byte[] bytes = log.readRound(seq);
Expand All @@ -78,38 +81,35 @@ public PaxosValue getLearnedValue(long seq) {
state.put(seq, value);
}
}
return state.get(seq);
return Optional.ofNullable(state.get(seq));
} catch (IOException e) {
logger.error("Unable to get corrupt learned value for sequence {}",
SafeArg.of("sequence", seq), e);
return null;
SafeArg.of("sequence", seq),
e);
return Optional.empty();
}
}

@Override
public Collection<PaxosValue> getLearnedValuesSince(long seq) {
PaxosValue greatestLearnedValue = getGreatestLearnedValue();
long greatestSeq = -1L;
if (greatestLearnedValue != null) {
greatestSeq = greatestLearnedValue.seq;
Optional<Long> greatestSeq = getGreatestLearnedValue().map(PaxosValue::getRound);
if (!greatestSeq.isPresent()) {
return ImmutableList.of();
}

Collection<PaxosValue> values = new ArrayList<>();
for (long i = seq; i <= greatestSeq; i++) {
PaxosValue value;
value = getLearnedValue(i);
if (value != null) {
values.add(value);
}
}
return values;
return LongStream.rangeClosed(seq, greatestSeq.get())
.boxed()
.map(this::getLearnedValue)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

@Override
public PaxosValue getGreatestLearnedValue() {
if (!state.isEmpty()) {
return state.get(state.lastKey());
public Optional<PaxosValue> getGreatestLearnedValue() {
if (state.isEmpty()) {
return Optional.empty();
}
return null;
return Optional.ofNullable(state.get(state.lastKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public <T extends PaxosResponse> PaxosResponses<T> getLearnedValue(
Function<Optional<PaxosValue>, T> mapper) {
return PaxosQuorumChecker.collectQuorumResponses(
allLearners,
learner -> mapper.apply(Optional.ofNullable(learner.getLearnedValue(seq))),
learner -> mapper.apply(learner.getLearnedValue(seq)),
quorumSize,
executors,
PaxosQuorumChecker.DEFAULT_REMOTE_REQUESTS_TIMEOUT).withoutRemotes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.palantir.atlasdb.timelock.paxos.Client;
import com.palantir.atlasdb.timelock.paxos.PaxosRemoteClients;
import com.palantir.atlasdb.timelock.paxos.PaxosUseCase;
Expand All @@ -50,19 +48,16 @@ public void learn(long seq, PaxosValue val) {
clientAwarePaxosLearner.learn(paxosUseCase, client, seq, val);
}

@Nullable
@Override
public PaxosValue getLearnedValue(long seq) {
public Optional<PaxosValue> getLearnedValue(long seq) {
return clientAwarePaxosLearner.getLearnedValue(paxosUseCase, client, seq);
}

@Nullable
@Override
public PaxosValue getGreatestLearnedValue() {
public Optional<PaxosValue> getGreatestLearnedValue() {
return clientAwarePaxosLearner.getGreatestLearnedValue(paxosUseCase, client);
}

@Nonnull
@Override
public Collection<PaxosValue> getLearnedValuesSince(long seq) {
return clientAwarePaxosLearner.getLearnedValuesSince(paxosUseCase, client, seq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package com.palantir.timelock.paxos;

import java.util.Collection;
import java.util.Optional;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -54,24 +53,21 @@ void learn(
@PathParam("seq") long seq,
PaxosValue val);

@Nullable
@GET
@Path("learned-value/{seq}")
@Produces(MediaType.APPLICATION_JSON)
PaxosValue getLearnedValue(
Optional<PaxosValue> getLearnedValue(
@PathParam("useCase") PaxosUseCase paxosUseCase,
@PathParam("client") String client,
@PathParam("seq") long seq);

@Nullable
@GET
@Path("greatest-learned-value")
@Produces(MediaType.APPLICATION_JSON)
PaxosValue getGreatestLearnedValue(
Optional<PaxosValue> getGreatestLearnedValue(
@PathParam("useCase") PaxosUseCase paxosUseCase,
@PathParam("client") String client);

@Nonnull
@GET
@Path("learned-values-since/{seq}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.palantir.atlasdb.timelock.paxos;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -40,7 +40,8 @@ public Set<Client> ping(Set<Client> clients) {
return KeyedStream.of(clients)
.map(components::learner)
.map(PaxosLearner::getGreatestLearnedValue)
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.filter(this::isThisNodeTheLeaderFor)
.keys()
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import com.google.common.collect.SetMultimap;
Expand All @@ -45,7 +45,8 @@ public SetMultimap<Client, PaxosValue> getLearnedValues(Set<WithSeq<Client>> cli
.mapKeys(WithSeq::value)
.map(WithSeq::seq)
.map((client, seq) -> paxosComponents.learner(client).getLearnedValue(seq))
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.collectToSetMultimap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.common.remoting.ServiceNotAvailableException;
import com.palantir.leader.NotCurrentLeaderException;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.paxos.ImmutablePaxosLong;
import com.palantir.paxos.PaxosAcceptor;
import com.palantir.paxos.PaxosAcceptorNetworkClient;
Expand Down Expand Up @@ -232,7 +233,10 @@ public synchronized void storeUpperLimit(long limit) throws MultipleRunningTimes
while (true) {
try {
proposer.propose(newSeq, PtBytes.toBytes(limit));
PaxosValue value = knowledge.getLearnedValue(newSeq);
PaxosValue value = knowledge.getLearnedValue(newSeq)
.orElseThrow(() -> new SafeIllegalStateException("Timestamp bound store: Paxos proposal"
+ " returned without learning a value. This is unexpected and would suggest a bug in"
+ " AtlasDB code. Please contact support."));
checkAgreedBoundIsOurs(limit, newSeq, value);
long newLimit = PtBytes.toLong(value.getData());
agreedState = ImmutableSequenceAndBound.of(newSeq, newLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import java.util.Optional;
import java.util.Random;
import java.util.UUID;

Expand Down Expand Up @@ -80,13 +81,13 @@ public void filtersOutClientsWhereNothingHasBeenLearnt() {
when(components.learner(CLIENT_1).getGreatestLearnedValue())
.thenReturn(paxosValue(LEADER_UUID));
when(components.learner(clientWhereNothingHasBeenLearnt).getGreatestLearnedValue())
.thenReturn(null);
.thenReturn(Optional.empty());

assertThat(resource.ping(ImmutableSet.of(CLIENT_1, clientWhereNothingHasBeenLearnt)))
.containsOnly(CLIENT_1);
}

private static PaxosValue paxosValue(UUID uuid) {
return new PaxosValue(uuid.toString(), new Random().nextLong(), null);
private static Optional<PaxosValue> paxosValue(UUID uuid) {
return Optional.of(new PaxosValue(uuid.toString(), new Random().nextLong(), null));
}
}
Loading

0 comments on commit 47dacb2

Please sign in to comment.