Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
♻️ Manage validatorsHash by height
Browse files Browse the repository at this point in the history
  • Loading branch information
ishantiw committed Feb 19, 2024
1 parent 7abf244 commit b21e624
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
*/
/* eslint-disable no-bitwise */
import { ActiveValidator, utils, ActiveValidatorsUpdate } from 'lisk-sdk';
import { ValidatorsData } from './types';
import { ValidatorsDataWithHeight } from './types';

/**
* @see https://github.com/LiskHQ/lips/blob/main/proposals/lip-0053.md#computing-the-validators-update
*/

export const calculateActiveValidatorsUpdate = (
validatorsDataAtLastCertificate: ValidatorsData,
validatorsDataAtNewCertificate: ValidatorsData,
validatorsDataAtLastCertificate: ValidatorsDataWithHeight,
validatorsDataAtNewCertificate: ValidatorsDataWithHeight,
): { activeValidatorsUpdate: ActiveValidatorsUpdate; certificateThreshold: bigint } => {
let certificateThreshold;
// If the certificate threshold is not changed from last certificate then we assign zero
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
import { ChainAPIClient } from './chain_api_client';
import { ChainConnectorDB } from './db';
import { BlockHeader, LastSentCCM, Logger, ModuleMetadata } from './types';
import { CCM_PROCESSED, CCM_SEND_SUCCESS } from './constants';
import { CCM_PROCESSED, CCM_SEND_SUCCESS, DEFAULT_SENT_CCU_TIMEOUT } from './constants';
import { CCUHandler } from './ccu_handler';

export interface NewBlockHandlerConfig {
Expand Down Expand Up @@ -54,23 +54,24 @@ type FinalizedHeightInfo = { inboxSize: number; lastCertificateHeight: number };
export class BlockEventHandler {
private readonly _ownChainID!: Buffer;
private readonly _ccuHandler!: CCUHandler;
private readonly _ccuFrequency!: number;
private readonly _ccuSaveLimit: number;
private readonly _receivingChainID: Buffer;
private readonly _isReceivingChainMainchain!: boolean;
private _db!: ChainConnectorDB;
private _logger!: Logger;
private _sendingChainAPIClient!: ChainAPIClient;
private _receivingChainAPIClient!: ChainAPIClient;
private _lastCertificate!: LastCertificate;
private _interoperabilityMetadata!: ModuleMetadata;
private _heightToDeleteIndex!: Map<number, FinalizedHeightInfo>;
private readonly _ccuFrequency!: number;
private _receivingChainFinalizedHeight!: number;
private readonly _ccuSaveLimit: number;
private readonly _receivingChainID: Buffer;
private _isReceivingChainRegistered = false;
private _lastSentCCUTxID = '';
private _lastSentCCM!: LastSentCCM;
private _lastIncludedCCMOnReceivingChain!: LastSentCCM | undefined;
private readonly _isReceivingChainMainchain!: boolean;
private _lastDeletionHeight!: number;
private _sentCCUTxTimeout!: NodeJS.Timer;

public constructor(config: NewBlockHandlerConfig) {
this._ownChainID = config.ownChainID;
Expand All @@ -94,11 +95,10 @@ export class BlockEventHandler {
this._db = args.db;
this._sendingChainAPIClient = args.sendingChainAPIClient;
this._receivingChainAPIClient = args.receivingChainAPIClient;
await this._receivingChainAPIClient.connect();
this._heightToDeleteIndex = new Map();
this._interoperabilityMetadata = await this._sendingChainAPIClient.getMetadataByModuleName(
MODULE_NAME_INTEROPERABILITY,
);

this._ccuHandler.load({
db: args.db,
lastCertificate: this._lastCertificate,
Expand All @@ -107,7 +107,8 @@ export class BlockEventHandler {
sendingChainAPIClient: args.sendingChainAPIClient,
interoperabilityMetadata: this._interoperabilityMetadata,
});
this._heightToDeleteIndex = new Map();

await this._receivingChainAPIClient.connect();
this._lastIncludedCCMOnReceivingChain = await this._db.getLastSentCCM();
// On a new block start with CCU creation process
this._sendingChainAPIClient.subscribe(
Expand All @@ -125,17 +126,26 @@ export class BlockEventHandler {

public async handleNewBlock(data?: Record<string, unknown>) {
const { blockHeader: receivedBlock } = data as unknown as Data;

const newBlockHeader = chain.BlockHeader.fromJSON(receivedBlock).toObject();

try {
await this._saveOnNewBlock(newBlockHeader);
} catch (error) {
this._logger.error({ err: error as Error }, 'Error occurred while saving data on new block.');
return;
}
const nodeInfo = await this._sendingChainAPIClient.getNodeInfo();

if (nodeInfo.syncing) {
return;
}
let chainAccount: ChainAccount | undefined;
// Save blockHeader, aggregateCommit, validatorsData and cross chain messages if any.
const nodeInfo = await this._sendingChainAPIClient.getNodeInfo();
// Fetch last certificate from the receiving chain and update the _lastCertificate
try {
chainAccount = await this._receivingChainAPIClient.getChainAccount(this._ownChainID);
} catch (error) {
// If receivingChainAPIClient is not ready then still save data on new block
await this._saveOnNewBlock(newBlockHeader);
await this._initializeReceivingChainClient();
this._logger.error(
{ err: error as Error },
Expand All @@ -150,15 +160,6 @@ export class BlockEventHandler {
this._logger.info(
'Sending chain is not registered to the receiving chain yet and has no chain data.',
);
try {
await this._saveOnNewBlock(newBlockHeader);
} catch (error) {
this._logger.error(
{ err: error as Error },
'Error occurred while saving data on new block.',
);
}

return;
}

Expand All @@ -168,8 +169,6 @@ export class BlockEventHandler {
`Last certificate value has been set with height ${this._lastCertificate.height}`,
);

await this._saveOnNewBlock(newBlockHeader);

const numOfBlocksSinceLastCertificate = newBlockHeader.height - this._lastCertificate.height;
if (nodeInfo.syncing || this._ccuFrequency > numOfBlocksSinceLastCertificate) {
this._logger.debug(
Expand Down Expand Up @@ -229,6 +228,11 @@ export class BlockEventHandler {
);
if (ccuSubmitResult) {
this._lastSentCCUTxID = ccuSubmitResult;
// Wait until 1 hour
this._sentCCUTxTimeout = setTimeout(() => {
this._lastSentCCUTxID = '';
clearTimeout(this._sentCCUTxTimeout);
}, DEFAULT_SENT_CCU_TIMEOUT);
// If CCU was sent successfully then save the lastSentCCM if any
if (computedCCUParams.lastCCMToBeSent) {
this._lastSentCCM = computedCCUParams.lastCCMToBeSent;
Expand All @@ -249,9 +253,7 @@ export class BlockEventHandler {

public async _saveOnNewBlock(newBlockHeader: BlockHeader) {
// Save block header if a new block header arrives
await this._db.saveOnNewBlock(newBlockHeader);

this._logger.info('Saved data on new block');
await this._db.saveToDBOnNewBlock(newBlockHeader);
// Check for events if any and store them
const events = await this._sendingChainAPIClient.getEvents(newBlockHeader.height);

Expand Down Expand Up @@ -328,8 +330,11 @@ export class BlockEventHandler {
newBlockHeader.height,
);

await this._db.setValidatorsDataByHash(validatorsData.validatorsHash, validatorsData);
this._logger.info('set getBFTParametersAtHeight');
await this._db.setValidatorsDataByHash(
validatorsData.validatorsHash,
{ ...validatorsData, height: newBlockHeader.height },
newBlockHeader.height,
);
}

private async _initializeReceivingChainClient() {
Expand All @@ -349,7 +354,11 @@ export class BlockEventHandler {

private async _newBlockReceivingChainHandler(_?: Record<string, unknown>) {
try {
const { finalizedHeight } = await this._receivingChainAPIClient.getNodeInfo();
const { finalizedHeight, syncing } = await this._receivingChainAPIClient.getNodeInfo();
// If receiving node is syncing then return
if (syncing) {
return;
}
this._receivingChainFinalizedHeight = finalizedHeight;
const { inbox } = await this._receivingChainAPIClient.getChannelAccount(this._ownChainID);
if (!inbox) {
Expand All @@ -371,6 +380,7 @@ export class BlockEventHandler {
);
// Reset last sent CCU to be blank
this._lastSentCCUTxID = '';
clearTimeout(this._sentCCUTxTimeout);
// Update last included CCM if there was any in the last sent CCU
if (this._lastSentCCM) {
this._lastIncludedCCMOnReceivingChain = this._lastSentCCM;
Expand Down Expand Up @@ -436,7 +446,12 @@ export class BlockEventHandler {
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

// Delete validatorsHashPreimage
await this._db.deleteValidatorsHashBetweenHeights(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

this._lastDeletionHeight = endDeletionHeightByLastCertificate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const CHAIN_ID_LENGTH = 4;
export const DEFAULT_REGISTRATION_HEIGHT = 1;
export const DEFAULT_LAST_CCM_SENT_NONCE = BigInt(-1);
export const DEFAULT_CCU_SAVE_LIMIT = 300;
export const DEFAULT_SENT_CCU_TIMEOUT = 3600000; // 1 hour

export const DB_KEY_CROSS_CHAIN_MESSAGES = Buffer.from([1]);
export const DB_KEY_BLOCK_HEADERS = Buffer.from([2]);
Expand Down
92 changes: 82 additions & 10 deletions framework-plugins/lisk-framework-chain-connector-plugin/src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import {
listOfCCUsSchema,
validatorsDataSchema,
} from './schemas';
import { BlockHeader, CCMWithHeight, LastSentCCM, ValidatorsData } from './types';
import { BlockHeader, CCMWithHeight, LastSentCCM, ValidatorsDataWithHeight } from './types';

const { Database } = liskDB;
type KVStore = liskDB.Database;

const DB_KEY_BLOCK_HEADER_BY_HEIGHT = Buffer.from([10]);
const DB_KEY_AGGREGATE_COMMIT_BY_HEIGHT = Buffer.from([20]);
const DB_KEY_VALIDATORS_DATA = Buffer.from([30]);
const DB_KEY_VALIDATORS_DATA_BY_HASH = Buffer.from([30]);
const DB_KEY_VALIDATORS_DATA_BY_HEIGHT = Buffer.from([40]);

export const getDBInstance = async (
dataPath: string,
Expand Down Expand Up @@ -81,7 +82,7 @@ export class ChainConnectorDB {
return this._privateKey;
}

public async saveOnNewBlock(blockHeader: BlockHeader) {
public async saveToDBOnNewBlock(blockHeader: BlockHeader) {
const heightBuf = uint32BE(blockHeader.height);
const batch = new liskDB.Batch();
const newBlockHeaderBytes = codec.encode(blockHeaderSchemaWithID, blockHeader);
Expand Down Expand Up @@ -253,9 +254,11 @@ export class ChainConnectorDB {

public async getValidatorsDataByHash(validatorsHash: Buffer) {
try {
const bytes = await this._db.get(concatDBKeys(DB_KEY_VALIDATORS_DATA, validatorsHash));
const bytes = await this._db.get(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, validatorsHash),
);

return codec.decode<ValidatorsData>(validatorsDataSchema, bytes);
return codec.decode<ValidatorsDataWithHeight>(validatorsDataSchema, bytes);
} catch (error) {
if (!(error instanceof liskDB.NotFoundError)) {
throw error;
Expand All @@ -265,15 +268,84 @@ export class ChainConnectorDB {
}
}

public async setValidatorsDataByHash(validatorsHash: Buffer, validatorsData: ValidatorsData) {
public async setValidatorsDataByHash(
validatorsHash: Buffer,
validatorsData: ValidatorsDataWithHeight,
height: number,
) {
const bytes = codec.encode(validatorsDataSchema, validatorsData);
await this._db.set(concatDBKeys(DB_KEY_VALIDATORS_DATA, validatorsHash), bytes);
await this._db.set(concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, validatorsHash), bytes);
await this._db.set(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(height)),
validatorsHash,
);
}
public async getValidatorsDataByHeight(height: number) {
try {
const validatorHash = await this._db.get(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(height)),
);

return this.getValidatorsDataByHash(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, validatorHash),
);
} catch (error) {
if (!(error instanceof liskDB.NotFoundError)) {
throw error;
}

return undefined;
}
}

public async deleteValidatorsHashByHeight(height: number) {
const validatorHash = await this._db.get(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(height)),
);
await this.deleteValidatorsDataByHash(concatDBKeys(validatorHash));
}

public async deleteValidatorsHashBetweenHeights(fromHeight: number, toHeight: number) {
const stream = this._db.createReadStream({
gte: concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(fromHeight)),
lte: concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(toHeight)),
reverse: true,
});
const validatorsHashes = await new Promise<Buffer[]>((resolve, reject) => {
const list: Buffer[] = [];
stream
.on('data', ({ key }: { key: Buffer }) => {
list.push(key);
})
.on('error', error => {
reject(error);
})
.on('end', () => {
resolve(list);
});
});

for (const hash of validatorsHashes) {
await this.deleteValidatorsDataByHash(hash);
}
}

public async deleteValidatorsDataByHash(validatorsHash: Buffer) {
const validatorData = await this.getValidatorsDataByHash(validatorsHash);

if (!validatorData) {
return;
}
await this._db.del(concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, validatorsHash));
await this._db.del(
concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HEIGHT, uint32BE(validatorData.height)),
);
}

public async getAllValidatorsData() {
const stream = this._db.createReadStream({
gte: concatDBKeys(DB_KEY_VALIDATORS_DATA, Buffer.alloc(4, 0)),
lte: concatDBKeys(DB_KEY_VALIDATORS_DATA, Buffer.alloc(4, 255)),
gte: concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, Buffer.alloc(4, 0)),
lte: concatDBKeys(DB_KEY_VALIDATORS_DATA_BY_HASH, Buffer.alloc(4, 255)),
reverse: true,
});
const validatorsData = await new Promise<Buffer[]>((resolve, reject) => {
Expand All @@ -290,7 +362,7 @@ export class ChainConnectorDB {
});
});

return validatorsData.map(v => codec.decode<ValidatorsData>(validatorsDataSchema, v));
return validatorsData.map(v => codec.decode<ValidatorsDataWithHeight>(validatorsDataSchema, v));
}

public async getCCMsByHeight(height: number): Promise<CCMsg[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
ChainConnectorPluginConfig,
LastSentCCMWithHeightJSON,
SentCCUsJSON,
ValidatorsDataJSON,
ValidatorsDataHeightJSON,
} from './types';
import { aggregateCommitToJSON, ccmsWithHeightToJSON, validatorsHashPreimagetoJSON } from './utils';
import { authorizeRequestSchema } from './schemas';
Expand Down Expand Up @@ -94,7 +94,9 @@ export class ChainConnectorEndpoint extends BasePluginEndpoint {
};
}

public async getValidatorsdata(_context: PluginEndpointContext): Promise<ValidatorsDataJSON[]> {
public async getValidatorsdata(
_context: PluginEndpointContext,
): Promise<ValidatorsDataHeightJSON[]> {
const validatorsHashPreimage = await this.db.getAllValidatorsData();
return validatorsHashPreimagetoJSON(validatorsHashPreimage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export const validatorsDataSchema = {
},
certificateThreshold: { dataType: 'uint64', fieldNumber: 2 },
validatorsHash: { dataType: 'bytes', fieldNumber: 3 },
height: { dataType: 'uint32', fieldNumber: 4 },
},
};

Expand Down
Loading

0 comments on commit b21e624

Please sign in to comment.