Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cache to track liveness of validators #4489

Merged
merged 10 commits into from
Oct 18, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.api.StorageQueryChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -89,6 +90,7 @@ public abstract class AbstractBeaconRestAPIIntegrationTest {
protected DataProvider dataProvider;
protected BeaconRestApi beaconRestApi;
protected OkHttpClient client;
private final ActiveValidatorChannel activeValidatorChannel = mock(ActiveValidatorChannel.class);

@BeforeEach
public void setup() {
Expand All @@ -103,6 +105,8 @@ public void setup() {
attestationPool,
blockManager,
attestationManager,
false,
activeValidatorChannel,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorCache;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.client.ChainUpdater;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand All @@ -78,6 +80,9 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
protected Spec spec = TestSpecFactory.createMinimalPhase0();
protected SpecConfig specConfig = spec.getGenesisSpecConfig();

protected final ActiveValidatorChannel activeValidatorChannel =
new ActiveValidatorCache(spec, 10);

private static final okhttp3.MediaType JSON =
okhttp3.MediaType.parse("application/json; charset=utf-8");
private static final BeaconRestApiConfig CONFIG =
Expand All @@ -87,6 +92,7 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
.restApiDocsEnabled(true)
.restApiHostAllowlist(List.of("127.0.0.1", "localhost"))
.restApiCorsAllowedOrigins(new ArrayList<>())
.beaconLivenessTrackingEnabled(true)
.eth1DepositContractAddress(Eth1Address.ZERO)
.build();

Expand All @@ -104,7 +110,7 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
protected final AggregatingAttestationPool attestationPool =
mock(AggregatingAttestationPool.class);
protected final BlockManager blockManager = mock(BlockManager.class);
private final AttestationManager attestationManager = mock(AttestationManager.class);
protected final AttestationManager attestationManager = mock(AttestationManager.class);
protected final OperationPool<AttesterSlashing> attesterSlashingPool = mock(OperationPool.class);
protected final OperationPool<ProposerSlashing> proposerSlashingPool = mock(OperationPool.class);
protected final OperationPool<SignedVoluntaryExit> voluntaryExitPool = mock(OperationPool.class);
Expand Down Expand Up @@ -171,6 +177,8 @@ private void setupAndStartRestAPI(BeaconRestApiConfig config) {
attestationPool,
blockManager,
attestationManager,
true,
activeValidatorChannel,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.v1.validator;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_BAD_REQUEST;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_OK;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_SERVICE_UNAVAILABLE;

import java.io.IOException;
import java.util.List;
import okhttp3.Response;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.api.request.v1.validator.ValidatorLivenessRequest;
import tech.pegasys.teku.api.response.v1.validator.PostValidatorLivenessResponse;
import tech.pegasys.teku.api.response.v1.validator.ValidatorLivenessAtEpoch;
import tech.pegasys.teku.beaconrestapi.AbstractDataBackedRestAPIIntegrationTest;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostValidatorLiveness;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.sync.events.SyncState;

public class PostValidatorLivenessIntegrationTest extends AbstractDataBackedRestAPIIntegrationTest {

@Test
public void shouldReturnBadRequestWhenRequestBodyIsEmpty() throws Exception {
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(""));
assertThat(response.code()).isEqualTo(SC_BAD_REQUEST);
}

@Test
public void shouldReturnUnavailableWhenChainDataNotAvailable() throws Exception {
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.SYNCING);
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(""));
assertThat(response.code()).isEqualTo(SC_SERVICE_UNAVAILABLE);
}

@Test
public void shouldDetectActiveValidator() throws IOException {
final UInt64 epoch = UInt64.ZERO;
final UInt64 validatorIndex = UInt64.ONE;
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
blockReceivedFromProposer(spec.computeStartSlotAtEpoch(epoch).plus(1), validatorIndex);
setCurrentSlot(12);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);

final ValidatorLivenessRequest request =
new ValidatorLivenessRequest(epoch, List.of(validatorIndex));
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(request));
assertThat(response.code()).isEqualTo(SC_OK);
final PostValidatorLivenessResponse result =
jsonProvider.jsonToObject(response.body().string(), PostValidatorLivenessResponse.class);
assertThat(result.data.get(0))
.isEqualTo(new ValidatorLivenessAtEpoch(validatorIndex, epoch, true));
}

private void blockReceivedFromProposer(final UInt64 slot, final UInt64 proposerIndex) {
// prime the cache for validator, would be the same as seeing a block come in.
final SignedBeaconBlock block = mock(SignedBeaconBlock.class);
when(block.getSlot()).thenReturn(slot);
when(block.getProposerIndex()).thenReturn(proposerIndex);

activeValidatorChannel.onBlockImported(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSubscribeToBeaconCommitteeSubnet;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSyncCommitteeSubscriptions;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSyncDuties;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostValidatorLiveness;
import tech.pegasys.teku.beaconrestapi.handlers.v2.debug.GetState;
import tech.pegasys.teku.beaconrestapi.schema.BadRequest;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand Down Expand Up @@ -384,6 +385,7 @@ private void addBeaconHandlers(final DataProvider dataProvider) {
app.get(GetVoluntaryExits.ROUTE, new GetVoluntaryExits(dataProvider, jsonProvider));
app.post(PostVoluntaryExit.ROUTE, new PostVoluntaryExit(dataProvider, jsonProvider));
app.post(PostSyncCommittees.ROUTE, new PostSyncCommittees(dataProvider, jsonProvider));
app.post(PostValidatorLiveness.ROUTE, new PostValidatorLiveness(dataProvider, jsonProvider));
}

private void addEventHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class BeaconRestApiConfig {
private final int restApiPort;
private final boolean restApiDocsEnabled;
private final boolean restApiEnabled;
private final boolean beaconLivenessTrackingEnabled;
private final String restApiInterface;
private final List<String> restApiHostAllowlist;
private final List<String> restApiCorsAllowedOrigins;
Expand All @@ -41,7 +42,8 @@ private BeaconRestApiConfig(
final Eth1Address eth1DepositContractAddress,
final int maxUrlLength,
final int maxPendingEvents,
final int validatorThreads) {
final int validatorThreads,
final boolean beaconLivenessTrackingEnabled) {
this.restApiPort = restApiPort;
this.restApiDocsEnabled = restApiDocsEnabled;
this.restApiEnabled = restApiEnabled;
Expand All @@ -52,6 +54,7 @@ private BeaconRestApiConfig(
this.maxUrlLength = maxUrlLength;
this.maxPendingEvents = maxPendingEvents;
this.validatorThreads = validatorThreads;
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
}

public int getRestApiPort() {
Expand All @@ -66,6 +69,10 @@ public boolean isRestApiEnabled() {
return restApiEnabled;
}

public boolean isBeaconLivenessTrackingEnabled() {
return beaconLivenessTrackingEnabled;
}

public String getRestApiInterface() {
return restApiInterface;
}
Expand Down Expand Up @@ -103,6 +110,7 @@ public static final class BeaconRestApiConfigBuilder {
private int restApiPort;
private boolean restApiDocsEnabled;
private boolean restApiEnabled;
private boolean beaconLivenessTrackingEnabled;
private String restApiInterface;
private List<String> restApiHostAllowlist;
private List<String> restApiCorsAllowedOrigins;
Expand Down Expand Up @@ -157,6 +165,12 @@ public BeaconRestApiConfigBuilder maxPendingEvents(final int maxEventQueueSize)
return this;
}

public BeaconRestApiConfigBuilder beaconLivenessTrackingEnabled(
final boolean beaconLivenessTrackingEnabled) {
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
return this;
}

public BeaconRestApiConfigBuilder validatorThreads(final int validatorThreads) {
this.validatorThreads = validatorThreads;
return this;
Expand All @@ -173,7 +187,8 @@ public BeaconRestApiConfig build() {
eth1DepositContractAddress,
maxUrlLength,
maxPendingEvents,
validatorThreads);
validatorThreads,
beaconLivenessTrackingEnabled);
}

public BeaconRestApiConfigBuilder maxUrlLength(final int maxUrlLength) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2021 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.handlers.v1.validator;

import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.RES_BAD_REQUEST;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.RES_INTERNAL_ERROR;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.RES_OK;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.RES_SERVICE_UNAVAILABLE;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.SERVICE_UNAVAILABLE;
import static tech.pegasys.teku.beaconrestapi.RestApiConstants.TAG_EXPERIMENTAL;
import static tech.pegasys.teku.infrastructure.async.SafeFuture.failedFuture;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
import io.javalin.http.Context;
import io.javalin.plugin.openapi.annotations.HttpMethod;
import io.javalin.plugin.openapi.annotations.OpenApi;
import io.javalin.plugin.openapi.annotations.OpenApiContent;
import io.javalin.plugin.openapi.annotations.OpenApiRequestBody;
import io.javalin.plugin.openapi.annotations.OpenApiResponse;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.api.ChainDataProvider;
import tech.pegasys.teku.api.DataProvider;
import tech.pegasys.teku.api.NodeDataProvider;
import tech.pegasys.teku.api.SyncDataProvider;
import tech.pegasys.teku.api.request.v1.validator.ValidatorLivenessRequest;
import tech.pegasys.teku.api.response.v1.validator.PostValidatorLivenessResponse;
import tech.pegasys.teku.beaconrestapi.handlers.AbstractHandler;
import tech.pegasys.teku.beaconrestapi.schema.BadRequest;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.provider.JsonProvider;

public class PostValidatorLiveness extends AbstractHandler {
private static final Logger LOG = LogManager.getLogger();
public static final String ROUTE = "/eth/v1/validator/liveness";

public final ChainDataProvider chainDataProvider;
public final NodeDataProvider nodeDataProvider;
public final SyncDataProvider syncDataProvider;

public PostValidatorLiveness(
final ChainDataProvider chainDataProvider,
final NodeDataProvider nodeDataProvider,
final SyncDataProvider syncDataProvider,
final JsonProvider jsonProvider) {
super(jsonProvider);
this.chainDataProvider = chainDataProvider;
this.nodeDataProvider = nodeDataProvider;
this.syncDataProvider = syncDataProvider;
}

public PostValidatorLiveness(final DataProvider provider, final JsonProvider jsonProvider) {
this(
provider.getChainDataProvider(),
provider.getNodeDataProvider(),
provider.getSyncDataProvider(),
jsonProvider);
}

@OpenApi(
path = ROUTE,
method = HttpMethod.GET,
summary = "Get Validator Liveness",
tags = {TAG_EXPERIMENTAL},
requestBody =
@OpenApiRequestBody(content = {@OpenApiContent(from = ValidatorLivenessRequest.class)}),
description =
"Requests the beacon node to indicate if a validator has been"
+ " observed to be live in a given epoch. The beacon node might detect liveness by"
+ " observing messages from the validator on the network, in the beacon chain,"
+ " from its API or from any other source. It is important to note that the"
+ " values returned by the beacon node are not canonical; they are best-effort"
+ " and based upon a subjective view of the network.",
responses = {
@OpenApiResponse(
status = RES_OK,
content = @OpenApiContent(from = PostValidatorLivenessResponse.class)),
@OpenApiResponse(status = RES_BAD_REQUEST, description = "Invalid parameter supplied"),
@OpenApiResponse(status = RES_INTERNAL_ERROR),
@OpenApiResponse(status = RES_SERVICE_UNAVAILABLE, description = SERVICE_UNAVAILABLE)
})
@Override
public void handle(Context ctx) throws Exception {
if (!chainDataProvider.isStoreAvailable() || syncDataProvider.isSyncing()) {
ctx.status(SC_SERVICE_UNAVAILABLE);
return;
}
try {
final ValidatorLivenessRequest request =
parseRequestBody(ctx.body(), ValidatorLivenessRequest.class);
SafeFuture<Optional<PostValidatorLivenessResponse>> future =
nodeDataProvider.getValidatorLiveness(request, chainDataProvider.getCurrentEpoch());
handleOptionalResult(
ctx, future, this::handleResult, this::handleError, SC_SERVICE_UNAVAILABLE);
} catch (IllegalArgumentException ex) {
LOG.trace("Illegal argument in PostValidatorLiveness", ex);
ctx.status(SC_BAD_REQUEST);
ctx.result(BadRequest.badRequest(jsonProvider, ex.getMessage()));
}
}

private Optional<String> handleResult(Context ctx, final PostValidatorLivenessResponse response)
throws JsonProcessingException {
return Optional.of(jsonProvider.objectToJSON(response));
}

private SafeFuture<String> handleError(final Context ctx, final Throwable error) {
final Throwable rootCause = Throwables.getRootCause(error);
if (rootCause instanceof IllegalArgumentException) {
ctx.status(SC_BAD_REQUEST);
return SafeFuture.of(() -> BadRequest.badRequest(jsonProvider, rootCause.getMessage()));
} else {
return failedFuture(error);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand All @@ -62,6 +63,7 @@ class BeaconRestApiTest {
private final OperationPool<SignedVoluntaryExit> voluntaryExitPool = mock(OperationPool.class);
private final SyncCommitteeContributionPool syncCommitteeContributionPool =
mock(SyncCommitteeContributionPool.class);
private final ActiveValidatorChannel activeValidatorChannel = mock(ActiveValidatorChannel.class);

@BeforeEach
public void setup() {
Expand All @@ -84,6 +86,8 @@ public void setup() {
attestationPool,
blockManager,
attestationManager,
false,
activeValidatorChannel,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Loading