Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new getCheckpointStateOrBytes() api and clone states from cache #6504

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -83,13 +84,26 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
await this.db.stateArchive.put(finalizedState.slot, finalizedState);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getPreStateSync(block: allForks.BeaconBlock): CachedBeaconStateAllForks | null;
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
Expand Down
16 changes: 14 additions & 2 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
logger: Logger;
};

type RegenRequestKey = keyof IStateRegeneratorInternal;
Expand Down Expand Up @@ -54,6 +53,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.logger = modules.logger;
}

async init(): Promise<void> {
if (this.checkpointStateCache.init) {
return this.checkpointStateCache.init();
}
}

canAcceptWork(): boolean {
return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD;
}
Expand Down Expand Up @@ -105,6 +110,10 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return null;
}

async getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
return this.checkpointStateCache.getStateOrBytes(cp);
}

getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.get(cp);
}
Expand Down Expand Up @@ -145,10 +154,13 @@ export class QueuedStateRegenerator implements IStateRegenerator {
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot});
// it's important to reload state to regen head state here
const allowDiskReload = true;
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// for legacy StateContextCache only
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock).then(
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async function processSlotsToNearestCheckpoint(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData;
return stateOrStateBytesData?.clone(true) ?? null;
}
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHexString(persistedKey)};
Expand Down Expand Up @@ -242,7 +242,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState;
return newCachedState.clone(true);
} catch (e) {
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
return null;
Expand Down Expand Up @@ -312,7 +312,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
return state;
return state.clone(true);
}

return null;
Expand Down Expand Up @@ -352,9 +352,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
const inMemoryState = this.get({rootHex, epoch});
if (inMemoryState) {
return inMemoryState;
const inMemoryClonedState = this.get({rootHex, epoch});
if (inMemoryClonedState) {
return inMemoryClonedState;
}
}
}
Expand All @@ -376,9 +376,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
try {
const state = await this.getOrReload({rootHex, epoch});
if (state) {
return state;
const clonedState = await this.getOrReload({rootHex, epoch});
if (clonedState) {
return clonedState;
}
} catch (e) {
this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class StateContextCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

add(item: CachedBeaconStateAllForks): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class CheckpointStateCache implements CheckpointStateCacheInterface {

this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

add(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
8 changes: 6 additions & 2 deletions packages/beacon-node/src/util/multifork.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {ChainForkConfig} from "@lodestar/config";
import {allForks} from "@lodestar/types";
import {Slot, allForks} from "@lodestar/types";
import {bytesToInt} from "@lodestar/utils";
import {getSlotFromSignedBeaconBlockSerialized} from "./sszBytes.js";

Expand Down Expand Up @@ -36,10 +36,14 @@ export function getStateTypeFromBytes(
config: ChainForkConfig,
bytes: Buffer | Uint8Array
): allForks.AllForksSSZTypes["BeaconState"] {
const slot = bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT));
const slot = getStateSlotFromBytes(bytes);
return config.getForkTypes(slot).BeaconState;
}

export function getStateSlotFromBytes(bytes: Uint8Array): Slot {
return bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT));
}

/**
* First field in update is beacon, first field in beacon is slot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ describe("PersistentCheckpointStateCache", function () {

it("pruneFinalized and getStateOrBytes", async function () {
cache.add(cp2, states["cp2"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1);
// cp0 is persisted
expect(fileApisBuffer.size).toEqual(1);
Expand Down Expand Up @@ -484,7 +486,9 @@ describe("PersistentCheckpointStateCache", function () {

// regen needs to reload cp0b
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);

// regen generates cp1b
const cp1b = {epoch: 21, root: root0b};
Expand Down Expand Up @@ -670,7 +674,9 @@ describe("PersistentCheckpointStateCache", function () {

// simulate regen
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
// root2, regen cp0b
const cp1bState = states["cp0b"].clone();
cp1bState.slot = 21 * SLOTS_PER_EPOCH;
Expand Down Expand Up @@ -847,7 +853,9 @@ describe("PersistentCheckpointStateCache", function () {

// simulate reload cp1b
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
const root1b = Buffer.alloc(32, 101);
const state1b = states["cp0b"].clone();
state1b.slot = state1a.slot + 1;
Expand Down
Loading