Skip to content

Commit

Permalink
Performance tracker (#8851)
Browse files Browse the repository at this point in the history
* use attestation bits aggregator in performance tracker

* add electra tests

* fix spotless

* fix unit test

* always clear objects

* refactor bits aggregator or operation

* revert bits aggregator or operation return type
  • Loading branch information
mehdi-aouadi authored Nov 29, 2024
1 parent 9455985 commit 49d8d48
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
Expand All @@ -42,7 +43,6 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.StatusLogger;
import tech.pegasys.teku.infrastructure.metrics.SettableGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
Expand All @@ -51,6 +51,7 @@
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.statetransition.attestation.utils.AttestationBitsAggregator;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode;
import tech.pegasys.teku.validator.coordinator.ActiveValidatorTracker;
Expand Down Expand Up @@ -169,6 +170,9 @@ private SafeFuture<?> reportBlockPerformance(final UInt64 currentEpoch) {
validatorPerformanceMetrics.updateBlockPerformanceMetrics(blockPerformance);
}
}
})
.alwaysRun(
() -> {
producedBlocksByEpoch.headMap(blockProductionEpoch, true).clear();
blockProductionAttemptsByEpoch.headMap(blockProductionEpoch, true).clear();
});
Expand Down Expand Up @@ -208,8 +212,8 @@ private SafeFuture<?> reportAttestationPerformance(final UInt64 currentEpoch) {
validatorPerformanceMetrics.updateAttestationPerformanceMetrics(
attestationPerformance);
}
producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear();
});
})
.alwaysRun(() -> producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear());
}

private SafeFuture<BlockPerformance> getBlockPerformanceForEpoch(final UInt64 currentEpoch) {
Expand Down Expand Up @@ -300,16 +304,23 @@ private AttestationPerformance calculateAttestationPerformance(

// Pre-process attestations included on chain to group them by
// data hash to inclusion slot to aggregation bitlist
final Map<Bytes32, NavigableMap<UInt64, SszBitlist>> slotAndBitlistsByAttestationDataHash =
new HashMap<>();
final Map<Bytes32, NavigableMap<UInt64, AttestationBitsAggregator>>
slotAndBitlistsByAttestationDataHash = new HashMap<>();
for (Map.Entry<UInt64, List<Attestation>> entry : attestationsIncludedOnChain.entrySet()) {
final Optional<Int2IntMap> committeesSize =
Optional.of(spec.getBeaconCommitteesSize(state, entry.getKey()));
for (Attestation attestation : entry.getValue()) {
final Bytes32 attestationDataHash = attestation.getData().hashTreeRoot();
final NavigableMap<UInt64, SszBitlist> slotToBitlists =
final NavigableMap<UInt64, AttestationBitsAggregator> slotToBitlists =
slotAndBitlistsByAttestationDataHash.computeIfAbsent(
attestationDataHash, __ -> new TreeMap<>());
slotToBitlists.merge(
entry.getKey(), attestation.getAggregationBits(), SszBitlist::nullableOr);
entry.getKey(),
AttestationBitsAggregator.of(attestation, committeesSize),
(firstBitsAggregator, secondBitsAggregator) -> {
firstBitsAggregator.or(secondBitsAggregator);
return firstBitsAggregator;
});
}
}

Expand All @@ -319,10 +330,10 @@ private AttestationPerformance calculateAttestationPerformance(
if (!slotAndBitlistsByAttestationDataHash.containsKey(sentAttestationDataHash)) {
continue;
}
final NavigableMap<UInt64, SszBitlist> slotAndBitlists =
final NavigableMap<UInt64, AttestationBitsAggregator> slotAndBitlists =
slotAndBitlistsByAttestationDataHash.get(sentAttestationDataHash);
for (UInt64 slot : slotAndBitlists.keySet()) {
if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation.getAggregationBits())) {
if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation)) {
inclusionDistances.add(slot.minus(sentAttestationSlot).intValue());
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,59 +24,72 @@
import static tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker.ATTESTATION_INCLUSION_RANGE;

import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.infrastructure.logging.LogCaptor;
import tech.pegasys.teku.bls.BLSKeyGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.bls.BLSTestUtil;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.StatusLogger;
import tech.pegasys.teku.infrastructure.metrics.SettableGauge;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.generator.AttestationGenerator;
import tech.pegasys.teku.spec.generator.ChainBuilder;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.ChainHead;
import tech.pegasys.teku.storage.client.ChainUpdater;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;
import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode;
import tech.pegasys.teku.validator.coordinator.ActiveValidatorTracker;

@TestSpecContext(milestone = {SpecMilestone.PHASE0, SpecMilestone.ELECTRA})
public class DefaultPerformanceTrackerTest {

private static final List<BLSKeyPair> VALIDATOR_KEYS = BLSKeyGenerator.generateKeyPairs(64);
private final Spec spec = TestSpecFactory.createMinimalPhase0();
protected StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec);
protected ChainBuilder chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS);
protected ChainUpdater chainUpdater =
new ChainUpdater(storageSystem.recentChainData(), chainBuilder);

private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);
private final StatusLogger log = mock(StatusLogger.class);
private final ActiveValidatorTracker validatorTracker = mock(ActiveValidatorTracker.class);
private final SyncCommitteePerformanceTracker syncCommitteePerformanceTracker =
mock(SyncCommitteePerformanceTracker.class);
private final ValidatorPerformanceMetrics validatorPerformanceMetrics =
mock(ValidatorPerformanceMetrics.class);

private final DefaultPerformanceTracker performanceTracker =
new DefaultPerformanceTracker(
storageSystem.combinedChainDataClient(),
log,
validatorPerformanceMetrics,
ValidatorPerformanceTrackingMode.ALL,
validatorTracker,
syncCommitteePerformanceTracker,
spec,
mock(SettableGauge.class));
private Spec spec;
private StorageSystem storageSystem;
private ChainBuilder chainBuilder;
private ChainUpdater chainUpdater;
private DataStructureUtil dataStructureUtil;
private DefaultPerformanceTracker performanceTracker;

@BeforeEach
void beforeEach() {
void beforeEach(final TestSpecInvocationContextProvider.SpecContext specContext) {
spec = specContext.getSpec();
dataStructureUtil = specContext.getDataStructureUtil();

storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec);
chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS);

chainUpdater = new ChainUpdater(storageSystem.recentChainData(), chainBuilder, spec);

performanceTracker =
new DefaultPerformanceTracker(
storageSystem.combinedChainDataClient(),
log,
validatorPerformanceMetrics,
ValidatorPerformanceTrackingMode.ALL,
validatorTracker,
syncCommitteePerformanceTracker,
spec,
mock(SettableGauge.class));

when(validatorTracker.getNumberOfValidatorsForEpoch(any())).thenReturn(0);
when(syncCommitteePerformanceTracker.calculatePerformance(any()))
.thenReturn(
Expand All @@ -85,7 +98,7 @@ void beforeEach() {
performanceTracker.start(UInt64.ZERO);
}

@Test
@TestTemplate
void shouldDisplayPerfectBlockInclusion() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10));
performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1)));
Expand All @@ -99,7 +112,7 @@ void shouldDisplayPerfectBlockInclusion() {
verify(log).performance(expectedBlockPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayBlockInclusionWhenProducedBlockIsChainHead() {
final UInt64 lastSlot = spec.computeStartSlotAtEpoch(UInt64.ONE);
final SignedBlockAndState bestBlock = chainUpdater.advanceChainUntil(2);
Expand All @@ -111,7 +124,7 @@ void shouldDisplayBlockInclusionWhenProducedBlockIsChainHead() {
verify(log).performance(expectedBlockPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayOneMissedBlock() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10));
performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1)));
Expand All @@ -128,7 +141,7 @@ void shouldDisplayOneMissedBlock() {
verify(log).performance(expectedBlockPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayPerfectAttestationInclusion() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand All @@ -151,7 +164,7 @@ void shouldDisplayPerfectAttestationInclusion() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayInclusionDistanceOfMax2Min1() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand Down Expand Up @@ -180,7 +193,7 @@ void shouldDisplayInclusionDistanceOfMax2Min1() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayIncorrectTargetRoot() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand Down Expand Up @@ -219,7 +232,7 @@ void shouldDisplayIncorrectTargetRoot() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldDisplayIncorrectHeadBlockRoot() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand All @@ -235,7 +248,9 @@ void shouldDisplayIncorrectHeadBlockRoot() {
chainUpdater.saveBlock(blockAndState1);
chainUpdater.updateBestBlock(blockAndState1);

SignedBlockAndState blockAndState = chainUpdaterFork.advanceChainUntil(8);
chainUpdaterFork.advanceChainUntil(7);
SignedBlockAndState blockAndState = chainBuilder.getBlockAndStateAtSlot(8);
chainUpdaterFork.updateBestBlock(blockAndState);
ChainBuilder.BlockOptions block2Options = ChainBuilder.BlockOptions.create();
AttestationGenerator attestationGenerator =
new AttestationGenerator(spec, chainBuilder.getValidatorKeys());
Expand All @@ -258,7 +273,7 @@ void shouldDisplayIncorrectHeadBlockRoot() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldClearOldSentObjects() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10));
performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1)));
Expand All @@ -267,20 +282,51 @@ void shouldClearOldSentObjects() {
chainUpdater.chainBuilder.getBlockAtSlot(1).getSlotAndBlockRoot());
performanceTracker.saveProducedBlock(
chainUpdater.chainBuilder.getBlockAtSlot(2).getSlotAndBlockRoot());
performanceTracker.saveProducedAttestation(
spec.getGenesisSchemaDefinitions()
.getAttestationSchema()
.create(
dataStructureUtil.randomBitlist(),
dataStructureUtil.randomAttestationData(UInt64.ONE),
BLSTestUtil.randomSignature(0)));
performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ONE));
performanceTracker.onSlot(spec.computeStartSlotAtEpoch(UInt64.valueOf(2)));
assertThat(performanceTracker.producedAttestationsByEpoch).isEmpty();
assertThat(performanceTracker.producedBlocksByEpoch).isEmpty();
assertThat(performanceTracker.blockProductionAttemptsByEpoch).isEmpty();
}

@TestTemplate
void shouldClearObjectsAfterFailure() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10));
final CombinedChainDataClient combinedChainDataClientMock = mock(CombinedChainDataClient.class);
// Make the attestation performance calculation fail
when(combinedChainDataClientMock.getBestState())
.thenReturn(Optional.of(SafeFuture.failedFuture(new IllegalArgumentException("failure"))));
final ChainHead chainHeadMock = mock(ChainHead.class);
// Make the block performance calculation fail
when(chainHeadMock.asStateAndBlockSummary())
.thenReturn(SafeFuture.failedFuture(new IllegalArgumentException("failure")));
when(combinedChainDataClientMock.getChainHead()).thenReturn(Optional.of(chainHeadMock));
performanceTracker =
new DefaultPerformanceTracker(
combinedChainDataClientMock,
log,
validatorPerformanceMetrics,
ValidatorPerformanceTrackingMode.ALL,
validatorTracker,
syncCommitteePerformanceTracker,
spec,
mock(SettableGauge.class));
performanceTracker.start(UInt64.ZERO);
performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1)));
performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(2)));
performanceTracker.saveProducedBlock(
chainUpdater.chainBuilder.getBlockAtSlot(1).getSlotAndBlockRoot());
performanceTracker.saveProducedBlock(
chainUpdater.chainBuilder.getBlockAtSlot(2).getSlotAndBlockRoot());
performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ZERO));
performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ONE));
performanceTracker.onSlot(spec.computeStartSlotAtEpoch(UInt64.valueOf(2)));
assertThat(performanceTracker.producedAttestationsByEpoch).isEmpty();
assertThat(performanceTracker.producedBlocksByEpoch).isEmpty();
assertThat(performanceTracker.blockProductionAttemptsByEpoch).isEmpty();
}

@Test
@TestTemplate
void shouldNotCountDuplicateAttestationsIncludedOnChain() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand Down Expand Up @@ -308,7 +354,7 @@ void shouldNotCountDuplicateAttestationsIncludedOnChain() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldNotSkipValidationForAttestationsWithSameDataButDifferentBitlists() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));

Expand Down Expand Up @@ -342,7 +388,7 @@ void shouldNotSkipValidationForAttestationsWithSameDataButDifferentBitlists() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldReportExpectedAttestationOnlyForTheGivenEpoch() {
when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(2))).thenReturn(2);
when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(3))).thenReturn(1);
Expand All @@ -354,14 +400,14 @@ void shouldReportExpectedAttestationOnlyForTheGivenEpoch() {
verify(log).performance(expectedAttestationPerformance.toString());
}

@Test
@TestTemplate
void shouldNotReportAttestationPerformanceIfNoValidatorsInEpoch() {
when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(2))).thenReturn(0);
performanceTracker.onSlot(spec.computeStartSlotAtEpoch(ATTESTATION_INCLUSION_RANGE.plus(2)));
verify(log, never()).performance(anyString());
}

@Test
@TestTemplate
void shouldReportSyncCommitteePerformance() {
final UInt64 epoch = UInt64.valueOf(2);
final SyncCommitteePerformance performance = new SyncCommitteePerformance(epoch, 10, 9, 8, 7);
Expand All @@ -373,7 +419,7 @@ void shouldReportSyncCommitteePerformance() {
verify(validatorPerformanceMetrics).updateSyncCommitteePerformance(performance);
}

@Test
@TestTemplate
void shouldHandleErrorsWhenReportTasksFail() {
chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1));
final Attestation attestation = createAttestationForParentBlockOnSlot(1);
Expand Down

0 comments on commit 49d8d48

Please sign in to comment.