Skip to content

Commit

Permalink
feat: implement silent block broadcasting
Browse files Browse the repository at this point in the history
fixed event emitter issue

fix: Add a delay function for silent block events

fix: silent block broadcasting

Fix: Use a more explicit event name for indexing blocks

feat: implement silent block broadcasting

fix: Make event emitter protected

feat: implement silent block broadcasting

fix: Add event emitter and gateway dependencies to tests
  • Loading branch information
seekersoftec committed Oct 22, 2024
1 parent 861eef8 commit 4caad8e
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 7 deletions.
1 change: 0 additions & 1 deletion config/dev.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ bitcoinCore:
rpcPass: polarpass
rpcUser: polaruser
rpcPort: 18445

72 changes: 72 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@nestjs/common": "^10.4.1",
"@nestjs/config": "^3.2.3",
"@nestjs/core": "^10.4.1",
"@nestjs/event-emitter": "^2.0.4",
"@nestjs/microservices": "^10.4.1",
"@nestjs/passport": "^10.0.0",
"@nestjs/platform-express": "^10.3.7",
Expand All @@ -36,6 +37,7 @@
"@nestjs/swagger": "^7.3.1",
"@nestjs/typeorm": "^10.0.2",
"@nestjs/websockets": "^10.3.7",
"@nestjs/platform-ws": "^10.4.4",
"axios": "^1.7.2",
"currency.js": "^2.0.4",
"js-yaml": "^4.1.0",
Expand All @@ -55,6 +57,7 @@
"@types/node": "18.15.11",
"@types/secp256k1": "^4.0.6",
"@types/supertest": "^2.0.11",
"@types/ws": "^8.5.12",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"bip32": "^2.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import { SilentBlocksModule } from '@/silent-blocks/silent-blocks.module';
import { OperationStateModule } from '@/operation-state/operation-state.module';
import { ScheduleModule } from '@nestjs/schedule';
import { BlockProviderModule } from '@/block-data-providers/block-provider.module';
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
imports: [
ScheduleModule.forRoot(),
EventEmitterModule.forRoot(),
ConfigModule.forRoot({
ignoreEnvFile: true,
load: [configuration],
Expand Down
2 changes: 2 additions & 0 deletions src/block-data-providers/base-block-data-provider.abstract.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { OperationStateService } from '@/operation-state/operation-state.service';
import { Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
IndexerService,
TransactionInput,
TransactionOutput,
} from '@/indexer/indexer.service';

export abstract class BaseBlockDataProvider<OperationState> {
protected readonly eventEmitter: EventEmitter2 = new EventEmitter2();
protected abstract readonly logger: Logger;
protected abstract readonly operationStateKey: string;

Expand Down
5 changes: 5 additions & 0 deletions src/block-data-providers/bitcoin-core/provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
rawTransactions,
} from '@/block-data-providers/bitcoin-core/provider-fixtures';
import { Test, TestingModule } from '@nestjs/testing';
import { EventEmitter2 } from '@nestjs/event-emitter';

describe('Bitcoin Core Provider', () => {
let provider: BitcoinCoreProvider;
Expand Down Expand Up @@ -46,6 +47,10 @@ describe('Bitcoin Core Provider', () => {
getOperationState: jest.fn(),
},
},
{
provide: EventEmitter2,
useValue: jest.fn(),
},
],
}).compile();

Expand Down
5 changes: 5 additions & 0 deletions src/block-data-providers/bitcoin-core/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import {
import { AxiosRequestConfig } from 'axios';
import * as currency from 'currency.js';
import { AxiosRetryConfig, makeRequest } from '@/common/request';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { INDEXED_BLOCK_EVENT } from '@/common/events';

@Injectable()
export class BitcoinCoreProvider
Expand All @@ -44,6 +46,7 @@ export class BitcoinCoreProvider
private readonly configService: ConfigService,
indexerService: IndexerService,
operationStateService: OperationStateService,
protected readonly eventEmitter: EventEmitter2,
) {
super(indexerService, operationStateService);

Expand Down Expand Up @@ -123,6 +126,8 @@ export class BitcoinCoreProvider

state.indexedBlockHeight = height;
await this.setState(state);

this.eventEmitter.emit(INDEXED_BLOCK_EVENT, height);
}
} finally {
this.isSyncing = false;
Expand Down
18 changes: 16 additions & 2 deletions src/block-data-providers/block-provider.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,45 @@ import { IndexerService } from '@/indexer/indexer.service';
import { ProviderType } from '@/common/enum';
import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provider';
import { EsploraProvider } from '@/block-data-providers/esplora/provider';
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';

@Module({
imports: [OperationStateModule, IndexerModule, ConfigModule],
imports: [
OperationStateModule,
IndexerModule,
ConfigModule,
EventEmitterModule,
],
controllers: [],
providers: [
{
provide: 'BlockDataProvider',
inject: [ConfigService, IndexerService, OperationStateService],
inject: [
ConfigService,
IndexerService,
OperationStateService,
EventEmitter2,
],
useFactory: (
configService: ConfigService,
indexerService: IndexerService,
operationStateService: OperationStateService,
eventEmitter: EventEmitter2,
) => {
switch (configService.get<ProviderType>('providerType')) {
case ProviderType.ESPLORA:
return new EsploraProvider(
configService,
indexerService,
operationStateService,
eventEmitter,
);
case ProviderType.BITCOIN_CORE_RPC:
return new BitcoinCoreProvider(
configService,
indexerService,
operationStateService,
eventEmitter,
);
default:
throw Error('unrecognised provider type in config');
Expand Down
5 changes: 5 additions & 0 deletions src/block-data-providers/esplora/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
} from '@/block-data-providers/esplora/interface';
import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants';
import { Cron, CronExpression } from '@nestjs/schedule';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { INDEXED_BLOCK_EVENT } from '@/common/events';

@Injectable()
export class EsploraProvider
Expand All @@ -29,6 +31,7 @@ export class EsploraProvider
private readonly configService: ConfigService,
indexerService: IndexerService,
operationStateService: OperationStateService,
protected readonly eventEmitter: EventEmitter2,
) {
super(indexerService, operationStateService);

Expand Down Expand Up @@ -157,6 +160,8 @@ export class EsploraProvider
state.indexedBlockHeight = height;
state.lastProcessedTxIndex = i + this.batchSize - 1;
await this.setState(state);

this.eventEmitter.emit(INDEXED_BLOCK_EVENT, height);
} catch (error) {
this.logger.error(
`Error processing transactions in block at height ${height}, hash ${hash}: ${error.message}`,
Expand Down
4 changes: 4 additions & 0 deletions src/common/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,7 @@ export const varIntSize = (value: number): number => {
else if (value <= 0xffffffff) return 5;
else return 9;
};

export const delay = (ms: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
1 change: 1 addition & 0 deletions src/common/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const INDEXED_BLOCK_EVENT = 'INDEXED_BLOCK_EVENT';
2 changes: 2 additions & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { ConfigService } from '@nestjs/config';
import { WsAdapter } from '@nestjs/platform-ws';

declare const module: any;

async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));

const configService = app.get<ConfigService>(ConfigService);
const port = configService.get<number>('app.port');
Expand Down
36 changes: 36 additions & 0 deletions src/silent-blocks/silent-blocks.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Injectable, Logger } from '@nestjs/common';
import {
WebSocketGateway,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, WebSocket } from 'ws';

@Injectable()
@WebSocketGateway()
export class SilentBlocksGateway
implements OnGatewayConnection, OnGatewayDisconnect
{
private readonly logger = new Logger(SilentBlocksGateway.name);

@WebSocketServer() server: Server;

handleConnection(client: WebSocket) {
const remoteAddress = (client as any)._socket.remoteAddress;
this.logger.debug(`Client connected: ${remoteAddress}`);
}

handleDisconnect(client: WebSocket) {
const remoteAddress = (client as any)._socket.remoteAddress;
this.logger.debug(`Client disconnected: ${remoteAddress}`);
}

broadcastSilentBlock(silentBlock: Buffer) {
for (const client of this.server.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(silentBlock);
}
}
}
}
3 changes: 2 additions & 1 deletion src/silent-blocks/silent-blocks.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import { Transaction } from '@/transactions/transaction.entity';
import { TransactionsService } from '@/transactions/transactions.service';
import { SilentBlocksController } from '@/silent-blocks/silent-blocks.controller';
import { SilentBlocksService } from '@/silent-blocks/silent-blocks.service';
import { SilentBlocksGateway } from '@/silent-blocks/silent-blocks.gateway';

@Module({
imports: [TypeOrmModule.forFeature([Transaction])],
providers: [TransactionsService, SilentBlocksService],
providers: [TransactionsService, SilentBlocksService, SilentBlocksGateway],
controllers: [SilentBlocksController],
exports: [SilentBlocksService],
})
Expand Down
5 changes: 5 additions & 0 deletions src/silent-blocks/silent-blocks.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { TransactionsService } from '@/transactions/transactions.service';
import { SilentBlocksService } from '@/silent-blocks/silent-blocks.service';
import { silentBlockEncodingFixture } from '@/silent-blocks/silent-blocks.service.fixtures';
import { SilentBlocksGateway } from '@/silent-blocks/silent-blocks.gateway';
import { DataSource, Repository } from 'typeorm';
import { Transaction } from '@/transactions/transaction.entity';
import { getRepositoryToken } from '@nestjs/typeorm';
Expand Down Expand Up @@ -31,6 +32,10 @@ describe('SilentBlocksService', () => {
provide: getRepositoryToken(Transaction),
useValue: transactionRepository,
},
{
provide: SilentBlocksGateway,
useValue: jest.fn(),
},
],
}).compile();

Expand Down
Loading

0 comments on commit 4caad8e

Please sign in to comment.