Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Handling Errors from Blockchain #72

Merged
merged 22 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 54 additions & 54 deletions apps/worker/src/monitor/tx.status.monitor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import { Processor, InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import Redis from 'ioredis';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { MILLISECONDS_PER_SECOND } from 'time-constants';
import { BlockHash, Hash } from '@polkadot/types/interfaces';
import { RegistryError } from '@polkadot/types/types';
import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service';
import { ConfigService } from '../../../../libs/common/src/config/config.service';
import { ITxMonitorJob } from '../interfaces/status-monitor.interface';
import { QueueConstants } from '../../../../libs/common/src';
import { SECONDS_PER_BLOCK } from '../../../../libs/common/src/constants';
Expand All @@ -21,11 +19,9 @@ import { BaseConsumer } from '../BaseConsumer';
export class TxStatusMonitoringService extends BaseConsumer {
constructor(
@InjectRedis() private cacheManager: Redis,
@InjectQueue(QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue,
@InjectQueue(QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue: Queue,
@InjectQueue(QueueConstants.PUBLISH_QUEUE_NAME) private publishQueue: Queue,
private blockchainService: BlockchainService,
private configService: ConfigService,
private eventEmitter: EventEmitter2,
) {
super();
}
Expand All @@ -38,82 +34,86 @@ export class TxStatusMonitoringService extends BaseConsumer {
const previousKnownBlockNumber = (await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash)).block.header.number.toBigInt();
const currentFinalizedBlockNumber = await this.blockchainService.getLatestFinalizedBlockNumber();
const blockList: bigint[] = [];
const blockDelay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;

for (let i = previousKnownBlockNumber; i <= currentFinalizedBlockNumber && i < previousKnownBlockNumber + numberBlocksToParse; i += 1n) {
blockList.push(i);
}
const txBlockHash = await this.crawlBlockList(job.data.txHash, txCapacityEpoch, blockList);
const txResult = await this.blockchainService.crawlBlockListForTx(job.data.txHash, blockList, [{ pallet: 'messages', event: 'MessageStored' }]);

if (txBlockHash) {
this.logger.verbose(`Successfully completed job ${job.id}`);
return { success: true };
// if tx has not yet included in a block, throw error to retry till max attempts
if (!txResult.blockHash && !txResult.error) {
throw new Error(`Tx not found in block list, retrying (attempts=${job.attemptsMade})`);
}

// handle failure to find tx in block list after
// TODO - handle requeing of publish job in case of failure
// Issue: https://github.com/AmplicaLabs/content-publishing-service/issues/18
if (!txBlockHash && job.attemptsMade >= (job.opts.attempts ?? 3)) {
this.logger.error(`Job ${job.id} failed after ${job.attemptsMade} attempts`);
return { success: false };
this.setEpochCapacity(txCapacityEpoch, BigInt(txResult.capacityWithDrawn ?? 0n));

if (txResult.error && job.attemptsMade <= (job.opts.attempts ?? 3)) {
this.logger.debug(`Error found in tx result: ${JSON.stringify(txResult.error)}`);
const errorReport = await this.handleMessagesFailure(job.data.id, txResult.error);
const failedError = new Error(`Job ${job.data.id} failed with error ${JSON.stringify(txResult.error)}`);

if (errorReport.pause) {
await this.publishQueue.pause();
}

if (errorReport.retry) {
this.logger.debug(`Retrying job ${job.data.id}`);
await this.publishQueue.removeRepeatableByKey(job.data.referencePublishJob.id);
await this.publishQueue.add(job.data.referencePublishJob.id, job.data.referencePublishJob, { delay: blockDelay });
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
}
}
throw new Error(`Job ${job.id} failed, retrying`);
await this.txReceiptQueue.removeRepeatableByKey(job.data.id);
throw new Error(`Job ${job.data.id} failed with error ${JSON.stringify(txResult.error)}`);
} catch (e) {
this.logger.error(`Job ${job.id} failed (attempts=${job.attemptsMade}) with error: ${e}`);
this.logger.error(e);
throw e;
} finally {
// do some stuff
}
}

private async crawlBlockList(txHash: Hash, epoch: string, blockList: bigint[]): Promise<BlockHash | undefined> {
const txReceiptPromises: Promise<BlockHash | undefined>[] = blockList.map(async (blockNumber) => {
const blockHash = await this.blockchainService.getBlockHash(blockNumber);
const block = await this.blockchainService.getBlock(blockHash);
const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString());
this.logger.debug(`Extrinsics: ${block.block.extrinsics[0]}`);

if (txInfo !== undefined) {
this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
const at = await this.blockchainService.api.at(blockHash.toHex());
const events = await at.query.system.events();
events.subscribe((records) => {
records.forEach(async (record) => {
const { event } = record;
const eventName = event.section;
const { method } = event;
const { data } = event;
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);
if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) {
const capacityWithDrawn = BigInt(data[1].toString());
this.logger.debug(`Capacity withdrawn: ${capacityWithDrawn}`);
this.setEpochCapacity(epoch, capacityWithDrawn);
}
});
});
return blockHash;
private async handleMessagesFailure(jobId: string, moduleError: RegistryError): Promise<{ pause: boolean; retry: boolean }> {
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
try {
switch (moduleError.method) {
case 'TooManyMessagesInBlock':
// Re-try the job in the publish queue
return { pause: false, retry: true };
case 'UnAuthorizedDelegate':
// Re-try the job in the publish, could be a signing error
return { pause: false, retry: true };
case 'InvalidMessageSourceAccount':
case 'InvalidSchemaId':
case 'ExceedsMaxMessagePayloadSizeBytes':
case 'InvalidPayloadLocation':
case 'UnsupportedCid':
case 'InvalidCid':
return { pause: false, retry: false };
default:
this.logger.error(`Unknown module error ${moduleError}`);
break;
}
return undefined;
});
} catch (error) {
this.logger.error(`Error handling module error: ${error}`);
}
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved

const results = await Promise.all(txReceiptPromises);
const result = results.find((blockHash) => blockHash !== undefined);
return result;
// unknown error, pause the queue
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
return { pause: false, retry: false };
}

private async setEpochCapacity(epoch: string, capacityWithdrew: bigint) {
private async setEpochCapacity(epoch: string, capacityWithdrew: bigint): Promise<void> {
const epochCapacityKey = `epochCapacity:${epoch}`;

try {
const epochCapacity = BigInt((await this.cacheManager.get(epochCapacityKey)) ?? 0);
const savedCapacity = await this.cacheManager.get(epochCapacityKey);
const epochCapacity = BigInt(savedCapacity ?? 0);
const newEpochCapacity = epochCapacity + capacityWithdrew;

const epochDurationBlocks = await this.blockchainService.getCurrentEpochLength();
const epochDuration = epochDurationBlocks * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;

await this.cacheManager.setex(epochCapacityKey, epochDuration, newEpochCapacity.toString());
} catch (error) {
this.logger.error(`Error setting epoch capacity: ${error}`);

throw error;
}
}
}
2 changes: 1 addition & 1 deletion apps/worker/src/publisher/publishing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
};
// add a delay of 1 block to allow the tx reciept to go through before checking
const delay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
await this.txReceiptQueue.add(`Tx Receipt Job - ${job.id}`, job, { jobId: job.id, removeOnFail: false, removeOnComplete: 1000, delay });
await this.txReceiptQueue.add(job.id, job, { jobId: job.id, removeOnFail: false, removeOnComplete: 1000, delay });
}

private async checkCapacity(): Promise<void> {
Expand Down
76 changes: 71 additions & 5 deletions libs/common/src/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
/* eslint-disable no-underscore-dangle */
import { Injectable, Logger, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
import { ApiPromise, ApiRx, HttpProvider, WsProvider } from '@polkadot/api';
import { firstValueFrom } from 'rxjs';
import { firstValueFrom, from } from 'rxjs';
import { options } from '@frequency-chain/api-augment';
import { KeyringPair } from '@polkadot/keyring/types';
import { BlockHash, BlockNumber, SignedBlock } from '@polkadot/types/interfaces';
import { BlockHash, BlockNumber, DispatchError, DispatchInfo, Hash, SignedBlock } from '@polkadot/types/interfaces';
import { SubmittableExtrinsic } from '@polkadot/api/types';
import { AnyNumber, ISubmittableResult } from '@polkadot/types/types';
import { u32, Option } from '@polkadot/types';
import { AnyNumber, ISubmittableResult, RegistryError } from '@polkadot/types/types';
import { u32, Option, u128 } from '@polkadot/types';
import { PalletCapacityCapacityDetails, PalletCapacityEpochInfo, PalletSchemasSchema } from '@polkadot/types/lookup';
import { Hash } from 'crypto';
import { ConfigService } from '../config/config.service';
import { Extrinsic } from './extrinsic';

Expand Down Expand Up @@ -157,4 +156,71 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
const schema: PalletSchemasSchema = await this.query('schemas', 'schemas', schemaId);
return schema;
}

public async crawlBlockListForTx(
txHash: Hash,
blockList: bigint[],
successEvents: [{ pallet: string; event: string }],
): Promise<{ success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }> {
const txReceiptPromises: Promise<{ success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }>[] = blockList.map(async (blockNumber) => {
const blockHash = await this.getBlockHash(blockNumber);
const block = await this.getBlock(blockHash);
const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString());

if (!txInfo) {
return { success: false };
}

this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
const at = await this.api.at(blockHash.toHex());
const eventsPromise = firstValueFrom(at.query.system.events());

let isTxSuccess = false;
let totalBlockCapacity: bigint = 0n;
let txError: RegistryError | undefined;

try {
const events = await eventsPromise;

events.forEach((record) => {
const { event } = record;
const eventName = event.section;
const { method } = event;
const { data } = event;
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);

// find capacity withdrawn event
if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) {
// allow lowercase constructor for eslint
// eslint-disable-next-line new-cap
const currentCapacity: u128 = new u128(this.api.registry, data[1]);
totalBlockCapacity += currentCapacity.toBigInt();
}

// check custom success events
if (successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)) {
this.logger.debug(`Found success event ${eventName} ${method}`);
isTxSuccess = true;
}

// check for system extrinsic failure
if (eventName.search('system') !== -1 && method.search('ExtrinsicFailed') !== -1) {
const dispatchError = data[0] as DispatchError;
const moduleThatErrored = dispatchError.asModule;
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
txError = moduleError;
this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`);
}
});
} catch (error) {
this.logger.error(error);
}
this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`);
return { success: isTxSuccess, blockHash, capacityWithDrawn: totalBlockCapacity.toString(), error: txError };
});
const results = await Promise.all(txReceiptPromises);
const result = results.find((receipt) => receipt.blockHash !== undefined);
this.logger.debug(`Found tx receipt: ${JSON.stringify(result)}`);
return result ?? { success: false };
}
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
}