Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
♻️ Improve CCU storage management
Browse files Browse the repository at this point in the history
  • Loading branch information
ishantiw committed Feb 20, 2024
1 parent b21e624 commit 6aecb55
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
codec,
ChainAccount,
getMainchainID,
TransactionJSON,
} from 'lisk-sdk';
import { ChainAPIClient } from './chain_api_client';
import { ChainConnectorDB } from './db';
Expand Down Expand Up @@ -121,6 +122,7 @@ export class BlockEventHandler {
async (data?: Record<string, unknown>) => this._deleteBlockHandler(data),
);

// Initialize the receiving chain client in the end of load so not to miss the initial new blocks
this._initializeReceivingChainClient().catch(this._logger.error);
}

Expand All @@ -140,7 +142,7 @@ export class BlockEventHandler {
return;
}
let chainAccount: ChainAccount | undefined;
// Save blockHeader, aggregateCommit, validatorsData and cross chain messages if any.

// Fetch last certificate from the receiving chain and update the _lastCertificate
try {
chainAccount = await this._receivingChainAPIClient.getChainAccount(this._ownChainID);
Expand All @@ -163,27 +165,21 @@ export class BlockEventHandler {
return;
}

try {
this._lastCertificate = chainAccount.lastCertificate;
this._lastCertificate = chainAccount.lastCertificate;
this._logger.debug(
`Last certificate value has been set with height ${this._lastCertificate.height}`,
);

const numOfBlocksSinceLastCertificate = newBlockHeader.height - this._lastCertificate.height;
if (this._ccuFrequency > numOfBlocksSinceLastCertificate) {
this._logger.debug(
`Last certificate value has been set with height ${this._lastCertificate.height}`,
{
ccuFrequency: this._ccuFrequency,
nextPossibleCCUHeight: this._ccuFrequency - numOfBlocksSinceLastCertificate,
},
'No attempt to create CCU either due to provided ccuFrequency',
);

const numOfBlocksSinceLastCertificate = newBlockHeader.height - this._lastCertificate.height;
if (nodeInfo.syncing || this._ccuFrequency > numOfBlocksSinceLastCertificate) {
this._logger.debug(
{
syncing: nodeInfo.syncing,
ccuFrequency: this._ccuFrequency,
nextPossibleCCUHeight: this._ccuFrequency - numOfBlocksSinceLastCertificate,
},
'No attempt to create CCU either due to ccuFrequency or the node is syncing',
);

return;
}
} catch (error) {
this._logger.error(error, 'Failed while saving on new block');
return;
}

Expand All @@ -201,7 +197,6 @@ export class BlockEventHandler {
try {
// Compute CCU when there is no pending CCU that was sent earlier
if (this._lastSentCCUTxID === '') {
// When all the relevant data is saved successfully then try to create CCU
computedCCUParams = await this._ccuHandler.computeCCU(
this._lastCertificate,
this._lastIncludedCCMOnReceivingChain,
Expand Down Expand Up @@ -252,7 +247,6 @@ export class BlockEventHandler {
}

public async _saveOnNewBlock(newBlockHeader: BlockHeader) {
// Save block header if a new block header arrives
await this._db.saveToDBOnNewBlock(newBlockHeader);
// Check for events if any and store them
const events = await this._sendingChainAPIClient.getEvents(newBlockHeader.height);
Expand Down Expand Up @@ -392,80 +386,77 @@ export class BlockEventHandler {
}
await this._cleanup();
} catch (error) {
this._logger.debug(
error,
'No Channel or Chain Data: Sending chain is not registered yet on receiving chain.',
);
this._logger.debug(error, 'Error occured while receiving block from receiving chain.');
}
}

private async _cleanup() {
// Delete CCUs
// When given -1 then there is no limit
if (this._ccuSaveLimit !== -1) {
const listOfCCUs = await this._db.getListOfCCUs();
if (listOfCCUs.length > this._ccuSaveLimit) {
await this._db.setListOfCCUs(
// Takes the last ccuSaveLimit elements
listOfCCUs.slice(-this._ccuSaveLimit),
);
const { list: listOfCCUs, total } = await this._db.getListOfCCUs();
if (total > this._ccuSaveLimit) {
// listOfCCUs is a descending list of CCUs by nonce
for (let i = total; i > this._ccuSaveLimit; i -= 1) {
await this._db.deleteCCUTransaction(
chain.Transaction.fromJSON(listOfCCUs[i] as TransactionJSON).toObject(),
);
}
}
let finalizedInfoAtHeight = this._heightToDeleteIndex.get(
this._receivingChainFinalizedHeight,
);
if (!finalizedInfoAtHeight) {
for (let i = 1; i < this._heightToDeleteIndex.size; i += 1) {
if (this._heightToDeleteIndex.get(this._receivingChainFinalizedHeight - i)) {
finalizedInfoAtHeight = this._heightToDeleteIndex.get(
this._receivingChainFinalizedHeight - i,
);
break;
}
}
let finalizedInfoAtHeight = this._heightToDeleteIndex.get(this._receivingChainFinalizedHeight);
if (!finalizedInfoAtHeight) {
for (let i = 1; i < this._heightToDeleteIndex.size; i += 1) {
if (this._heightToDeleteIndex.get(this._receivingChainFinalizedHeight - i)) {
finalizedInfoAtHeight = this._heightToDeleteIndex.get(
this._receivingChainFinalizedHeight - i,
);
break;
}
}
}

const endDeletionHeightByLastCertificate = finalizedInfoAtHeight
? finalizedInfoAtHeight.lastCertificateHeight
: 0;

if (this._lastCertificate.height > 0) {
// Delete CCMs
await this._db.deleteCCMsBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

// Delete blockHeaders
await this._db.deleteBlockHeadersBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);
const endDeletionHeightByLastCertificate = finalizedInfoAtHeight
? finalizedInfoAtHeight.lastCertificateHeight
: 0;

// Delete aggregateCommits
await this._db.deleteAggregateCommitsBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);
if (this._lastCertificate.height > 0) {
// Delete CCMs
await this._db.deleteCCMsBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

// Delete validatorsHashPreimage
await this._db.deleteValidatorsHashBetweenHeights(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);
// Delete blockHeaders
await this._db.deleteBlockHeadersBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

this._lastDeletionHeight = endDeletionHeightByLastCertificate;
}
// Delete aggregateCommits
await this._db.deleteAggregateCommitsBetweenHeight(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);

this._logger.debug(
`Deleted data on cleanup between heights 1 and ${endDeletionHeightByLastCertificate}`,
// Delete validatorsHashPreimage
await this._db.deleteValidatorsHashBetweenHeights(
this._lastDeletionHeight,
endDeletionHeightByLastCertificate - 1,
);
// Delete info less than finalized height
this._heightToDeleteIndex.forEach((_, key) => {
if (key < this._receivingChainFinalizedHeight) {
this._heightToDeleteIndex.delete(key);
}
});

this._lastDeletionHeight = endDeletionHeightByLastCertificate;
}

this._logger.debug(
`Deleted data on cleanup between heights 1 and ${endDeletionHeightByLastCertificate}`,
);
// Delete info less than finalized height
this._heightToDeleteIndex.forEach((_, key) => {
if (key < this._receivingChainFinalizedHeight) {
this._heightToDeleteIndex.delete(key);
}
});
}

private async _deleteBlockHandler(data?: Record<string, unknown>) {
Expand All @@ -477,5 +468,6 @@ export class BlockEventHandler {
await this._db.deleteCCMsByHeight(deletedBlockHeader.height);
await this._db.deleteBlockHeaderByHeight(deletedBlockHeader.height);
await this._db.deleteAggregateCommitByHeight(deletedBlockHeader.height);
await this._db.deleteValidatorsHashByHeight(deletedBlockHeader.height);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ interface ComputeCCUInitArgs {
}

export class CCUHandler {
private readonly _registrationHeight: number;
private readonly _ownChainID: Buffer;
private readonly _receivingChainID: Buffer;
private readonly _maxCCUSize: number;
private readonly _isReceivingChainMainchain: boolean;
private readonly _isSaveCCU: boolean;
private readonly _ccuFee: string;
private _db!: ChainConnectorDB;
private _logger!: Logger;
private _sendingChainAPIClient!: ChainAPIClient;
private _receivingChainAPIClient!: ChainAPIClient;
private _lastCertificate!: LastCertificate;
private readonly _registrationHeight: number;
private readonly _ownChainID!: Buffer;
private readonly _receivingChainID!: Buffer;
private readonly _maxCCUSize!: number;
private readonly _isReceivingChainMainchain!: boolean;
private _interoperabilityMetadata!: ModuleMetadata;
private _outboxKeyForInclusionProof!: Buffer;
private readonly _isSaveCCU!: boolean;
private readonly _ccuFee!: string;

public constructor(config: ComputeCCUConfig) {
this._registrationHeight = config.registrationHeight;
Expand Down Expand Up @@ -167,7 +167,6 @@ export class CCUHandler {
};

// Use the old certificateThreshold

const validatorsDataAtLastCertificate = await this._db.getValidatorsDataByHash(
this._lastCertificate.validatorsHash,
);
Expand Down Expand Up @@ -222,8 +221,6 @@ export class CCUHandler {
certificateThreshold = validatorsDataAtLastCertificate?.certificateThreshold;
}

// Get the inclusionProof for outboxRoot on stateRoot

if (crossChainMessages.length === 0) {
outboxRootWitness = {
bitmap: EMPTY_BYTES,
Expand Down Expand Up @@ -252,7 +249,6 @@ export class CCUHandler {

certificate = codec.encode(certificateSchema, newCertificate);

// eslint-disable-next-line consistent-return
return {
ccuParams: {
sendingChainID: this._ownChainID,
Expand All @@ -270,7 +266,7 @@ export class CCUHandler {
}

private async _findCertificate() {
// Find certificate
// First certificate can be picked directly from first valid aggregateCommit taking registration height into account
if (this._lastCertificate.height === 0) {
const aggreggateCommits = await this._db.getAggregateCommitBetweenHeights(
this._registrationHeight,
Expand Down Expand Up @@ -366,14 +362,8 @@ export class CCUHandler {
} else {
result = await this._receivingChainAPIClient.postTransaction(tx.getBytes());
}
/**
* TODO: As of now we save it in memory but going forward it should be saved in DB,
* as the array size can grow after sometime.
*/
// Save the sent CCU
const listOfCCUs = await this._db.getListOfCCUs();
listOfCCUs.push(tx.toObject());
await this._db.setListOfCCUs(listOfCCUs);
await this._db.setCCUTransaction(tx.toObject());
// Update logs
this._logger.info({ transactionID: result.transactionId }, 'Sent CCU transaction');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class ChainConnectorPlugin extends BasePlugin<ChainConnectorPluginConfig>
public configSchema = configSchema;

private _chainConnectorPluginDB!: liskDB.Database;
private _chainConnectorStore!: ChainConnectorDB;
private _chainConnectorDB!: ChainConnectorDB;
private _receivingChainClient!: ChainAPIClient;
private _sendingChainClient!: ChainAPIClient;
private _ownChainID!: Buffer;
Expand All @@ -37,7 +37,6 @@ export class ChainConnectorPlugin extends BasePlugin<ChainConnectorPluginConfig>
return __filename;
}

// eslint-disable-next-line @typescript-eslint/require-await
public async init(context: PluginInitContext): Promise<void> {
await super.init(context);
if (this.config.maxCCUSize > CCU_TOTAL_CCM_SIZE) {
Expand All @@ -57,8 +56,8 @@ export class ChainConnectorPlugin extends BasePlugin<ChainConnectorPluginConfig>

public async load(): Promise<void> {
this._chainConnectorPluginDB = await getDBInstance(this.dataPath);
this._chainConnectorStore = new ChainConnectorDB(this._chainConnectorPluginDB);
this.endpoint.load(this.config, this._chainConnectorStore);
this._chainConnectorDB = new ChainConnectorDB(this._chainConnectorPluginDB);
this.endpoint.load(this.config, this._chainConnectorDB);

this._sendingChainClient = new ChainAPIClient({
ipcPath: this.appConfig.system.dataPath,
Expand All @@ -75,7 +74,7 @@ export class ChainConnectorPlugin extends BasePlugin<ChainConnectorPluginConfig>
wsConnectionString: this.config.receivingChainWsURL,
});
await this._blockEventHandler.load({
db: this._chainConnectorStore,
db: this._chainConnectorDB,
logger: this.logger,
receivingChainAPIClient: this._receivingChainClient,
sendingChainAPIClient: this._sendingChainClient,
Expand All @@ -90,6 +89,6 @@ export class ChainConnectorPlugin extends BasePlugin<ChainConnectorPluginConfig>
await this._sendingChainClient.disconnect();
}

this._chainConnectorStore.close();
this._chainConnectorDB.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ export const DEFAULT_LAST_CCM_SENT_NONCE = BigInt(-1);
export const DEFAULT_CCU_SAVE_LIMIT = 300;
export const DEFAULT_SENT_CCU_TIMEOUT = 3600000; // 1 hour

export const DB_KEY_CROSS_CHAIN_MESSAGES = Buffer.from([1]);
export const DB_KEY_BLOCK_HEADERS = Buffer.from([2]);
export const DB_KEY_AGGREGATE_COMMITS = Buffer.from([3]);
export const DB_KEY_VALIDATORS_HASH_PREIMAGE = Buffer.from([4]);
export const DB_KEY_LAST_SENT_CCM = Buffer.from([5]);
export const DB_KEY_LIST_OF_CCU = Buffer.from([6]);
export const DB_KEY_BLOCK_HEADER_BY_HEIGHT = Buffer.from([1]);
export const DB_KEY_AGGREGATE_COMMIT_BY_HEIGHT = Buffer.from([2]);
export const DB_KEY_VALIDATORS_DATA_BY_HASH = Buffer.from([3]);
export const DB_KEY_VALIDATORS_DATA_BY_HEIGHT = Buffer.from([4]);
export const DB_KEY_CROSS_CHAIN_MESSAGES = Buffer.from([5]);
export const DB_KEY_LAST_SENT_CCM = Buffer.from([6]);
export const DB_KEY_LIST_OF_CCU = Buffer.from([7]);

/**
* It’s not really MAX_CCU_SIZE, coz CCU includes other properties
Expand Down
Loading

0 comments on commit 6aecb55

Please sign in to comment.