Skip to content

Commit

Permalink
Add a cache to track liveness of validators (#4489)
Browse files Browse the repository at this point in the history
* Add a cache to track liveness of validators

This cache is specifically needed to service rest api requests, so is not required if we are not using the rest api.

The rest endpoint is yet to be approved, so leaving it marked experimental until the endpoint is finalized.

Currently feature flagged with 'Xbeacon-liveness-tracking-enabled', which is a hidden CLI option.

Enabling will keep a 2d array of as much as 3x (the max validator index+1000) to maintain where validators have been seen.

partially addresses #4453

Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone authored Oct 18, 2021
1 parent 50afa93 commit cb8446b
Show file tree
Hide file tree
Showing 23 changed files with 918 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.api.StorageQueryChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -89,6 +90,7 @@ public abstract class AbstractBeaconRestAPIIntegrationTest {
protected DataProvider dataProvider;
protected BeaconRestApi beaconRestApi;
protected OkHttpClient client;
private final ActiveValidatorChannel activeValidatorChannel = mock(ActiveValidatorChannel.class);

@BeforeEach
public void setup() {
Expand All @@ -103,6 +105,8 @@ public void setup() {
attestationPool,
blockManager,
attestationManager,
false,
activeValidatorChannel,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorCache;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.client.ChainUpdater;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand All @@ -78,6 +80,9 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
protected Spec spec = TestSpecFactory.createMinimalPhase0();
protected SpecConfig specConfig = spec.getGenesisSpecConfig();

protected final ActiveValidatorChannel activeValidatorChannel =
new ActiveValidatorCache(spec, 10);

private static final okhttp3.MediaType JSON =
okhttp3.MediaType.parse("application/json; charset=utf-8");
private static final BeaconRestApiConfig CONFIG =
Expand All @@ -87,6 +92,7 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
.restApiDocsEnabled(true)
.restApiHostAllowlist(List.of("127.0.0.1", "localhost"))
.restApiCorsAllowedOrigins(new ArrayList<>())
.beaconLivenessTrackingEnabled(true)
.eth1DepositContractAddress(Eth1Address.ZERO)
.build();

Expand All @@ -104,7 +110,7 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
protected final AggregatingAttestationPool attestationPool =
mock(AggregatingAttestationPool.class);
protected final BlockManager blockManager = mock(BlockManager.class);
private final AttestationManager attestationManager = mock(AttestationManager.class);
protected final AttestationManager attestationManager = mock(AttestationManager.class);
protected final OperationPool<AttesterSlashing> attesterSlashingPool = mock(OperationPool.class);
protected final OperationPool<ProposerSlashing> proposerSlashingPool = mock(OperationPool.class);
protected final OperationPool<SignedVoluntaryExit> voluntaryExitPool = mock(OperationPool.class);
Expand Down Expand Up @@ -171,6 +177,8 @@ private void setupAndStartRestAPI(BeaconRestApiConfig config) {
attestationPool,
blockManager,
attestationManager,
true,
activeValidatorChannel,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.v1.validator;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_BAD_REQUEST;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_OK;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_SERVICE_UNAVAILABLE;

import java.io.IOException;
import java.util.List;
import okhttp3.Response;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.api.request.v1.validator.ValidatorLivenessRequest;
import tech.pegasys.teku.api.response.v1.validator.PostValidatorLivenessResponse;
import tech.pegasys.teku.api.response.v1.validator.ValidatorLivenessAtEpoch;
import tech.pegasys.teku.beaconrestapi.AbstractDataBackedRestAPIIntegrationTest;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostValidatorLiveness;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.sync.events.SyncState;

public class PostValidatorLivenessIntegrationTest extends AbstractDataBackedRestAPIIntegrationTest {

@Test
public void shouldReturnBadRequestWhenRequestBodyIsEmpty() throws Exception {
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(""));
assertThat(response.code()).isEqualTo(SC_BAD_REQUEST);
}

@Test
public void shouldReturnUnavailableWhenChainDataNotAvailable() throws Exception {
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.SYNCING);
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(""));
assertThat(response.code()).isEqualTo(SC_SERVICE_UNAVAILABLE);
}

@Test
public void shouldDetectActiveValidator() throws IOException {
final UInt64 epoch = UInt64.ZERO;
final UInt64 validatorIndex = UInt64.ONE;
startRestAPIAtGenesis(SpecMilestone.ALTAIR);
blockReceivedFromProposer(spec.computeStartSlotAtEpoch(epoch).plus(1), validatorIndex);
setCurrentSlot(12);
when(syncService.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);

final ValidatorLivenessRequest request =
new ValidatorLivenessRequest(epoch, List.of(validatorIndex));
Response response = post(PostValidatorLiveness.ROUTE, jsonProvider.objectToJSON(request));
assertThat(response.code()).isEqualTo(SC_OK);
final PostValidatorLivenessResponse result =
jsonProvider.jsonToObject(response.body().string(), PostValidatorLivenessResponse.class);
assertThat(result.data.get(0))
.isEqualTo(new ValidatorLivenessAtEpoch(validatorIndex, epoch, true));
}

private void blockReceivedFromProposer(final UInt64 slot, final UInt64 proposerIndex) {
// prime the cache for validator, would be the same as seeing a block come in.
final SignedBeaconBlock block = mock(SignedBeaconBlock.class);
when(block.getSlot()).thenReturn(slot);
when(block.getProposerIndex()).thenReturn(proposerIndex);

activeValidatorChannel.onBlockImported(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.jetty.server.Server;
import tech.pegasys.teku.api.DataProvider;
import tech.pegasys.teku.api.exceptions.BadRequestException;
import tech.pegasys.teku.api.exceptions.ServiceUnavailableException;
import tech.pegasys.teku.beaconrestapi.handlers.tekuv1.admin.Liveness;
import tech.pegasys.teku.beaconrestapi.handlers.tekuv1.admin.PutLogLevel;
import tech.pegasys.teku.beaconrestapi.handlers.tekuv1.admin.Readiness;
Expand Down Expand Up @@ -89,6 +90,7 @@
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSubscribeToBeaconCommitteeSubnet;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSyncCommitteeSubscriptions;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostSyncDuties;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostValidatorLiveness;
import tech.pegasys.teku.beaconrestapi.handlers.v2.debug.GetState;
import tech.pegasys.teku.beaconrestapi.schema.BadRequest;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand Down Expand Up @@ -183,18 +185,9 @@ private void addHostAllowlistHandler(final BeaconRestApiConfig configuration) {

private void addExceptionHandlers() {
app.exception(ChainDataUnavailableException.class, (e, ctx) -> ctx.status(SC_NO_CONTENT));
app.exception(
NodeSyncingException.class,
(e, ctx) -> {
ctx.status(SC_SERVICE_UNAVAILABLE);
setErrorBody(ctx, () -> BadRequest.serviceUnavailable(jsonProvider));
});
app.exception(
BadRequestException.class,
(e, ctx) -> {
ctx.status(SC_BAD_REQUEST);
setErrorBody(ctx, () -> BadRequest.badRequest(jsonProvider, e.getMessage()));
});
app.exception(NodeSyncingException.class, this::serviceUnavailable);
app.exception(ServiceUnavailableException.class, this::serviceUnavailable);
app.exception(BadRequestException.class, this::badRequest);
// Add catch-all handler
app.exception(
Exception.class,
Expand All @@ -206,6 +199,16 @@ private void addExceptionHandlers() {
});
}

private void serviceUnavailable(final Throwable throwable, final Context context) {
context.status(SC_SERVICE_UNAVAILABLE);
setErrorBody(context, () -> BadRequest.serviceUnavailable(jsonProvider));
}

private void badRequest(final Throwable throwable, final Context context) {
context.status(SC_BAD_REQUEST);
setErrorBody(context, () -> BadRequest.badRequest(jsonProvider, throwable.getMessage()));
}

private void setErrorBody(final Context ctx, final ExceptionThrowingSupplier<String> body) {
try {
ctx.result(body.get());
Expand Down Expand Up @@ -384,6 +387,7 @@ private void addBeaconHandlers(final DataProvider dataProvider) {
app.get(GetVoluntaryExits.ROUTE, new GetVoluntaryExits(dataProvider, jsonProvider));
app.post(PostVoluntaryExit.ROUTE, new PostVoluntaryExit(dataProvider, jsonProvider));
app.post(PostSyncCommittees.ROUTE, new PostSyncCommittees(dataProvider, jsonProvider));
app.post(PostValidatorLiveness.ROUTE, new PostValidatorLiveness(dataProvider, jsonProvider));
}

private void addEventHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class BeaconRestApiConfig {
private final int restApiPort;
private final boolean restApiDocsEnabled;
private final boolean restApiEnabled;
private final boolean beaconLivenessTrackingEnabled;
private final String restApiInterface;
private final List<String> restApiHostAllowlist;
private final List<String> restApiCorsAllowedOrigins;
Expand All @@ -41,7 +42,8 @@ private BeaconRestApiConfig(
final Eth1Address eth1DepositContractAddress,
final int maxUrlLength,
final int maxPendingEvents,
final int validatorThreads) {
final int validatorThreads,
final boolean beaconLivenessTrackingEnabled) {
this.restApiPort = restApiPort;
this.restApiDocsEnabled = restApiDocsEnabled;
this.restApiEnabled = restApiEnabled;
Expand All @@ -52,6 +54,7 @@ private BeaconRestApiConfig(
this.maxUrlLength = maxUrlLength;
this.maxPendingEvents = maxPendingEvents;
this.validatorThreads = validatorThreads;
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
}

public int getRestApiPort() {
Expand All @@ -66,6 +69,10 @@ public boolean isRestApiEnabled() {
return restApiEnabled;
}

public boolean isBeaconLivenessTrackingEnabled() {
return beaconLivenessTrackingEnabled;
}

public String getRestApiInterface() {
return restApiInterface;
}
Expand Down Expand Up @@ -103,6 +110,7 @@ public static final class BeaconRestApiConfigBuilder {
private int restApiPort;
private boolean restApiDocsEnabled;
private boolean restApiEnabled;
private boolean beaconLivenessTrackingEnabled;
private String restApiInterface;
private List<String> restApiHostAllowlist;
private List<String> restApiCorsAllowedOrigins;
Expand Down Expand Up @@ -157,6 +165,12 @@ public BeaconRestApiConfigBuilder maxPendingEvents(final int maxEventQueueSize)
return this;
}

public BeaconRestApiConfigBuilder beaconLivenessTrackingEnabled(
final boolean beaconLivenessTrackingEnabled) {
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
return this;
}

public BeaconRestApiConfigBuilder validatorThreads(final int validatorThreads) {
this.validatorThreads = validatorThreads;
return this;
Expand All @@ -173,7 +187,8 @@ public BeaconRestApiConfig build() {
eth1DepositContractAddress,
maxUrlLength,
maxPendingEvents,
validatorThreads);
validatorThreads,
beaconLivenessTrackingEnabled);
}

public BeaconRestApiConfigBuilder maxUrlLength(final int maxUrlLength) {
Expand Down
Loading

0 comments on commit cb8446b

Please sign in to comment.