diff --git a/packages/bitcore-node/src/services/event.ts b/packages/bitcore-node/src/services/event.ts index 627e885d548..ab202fe282f 100644 --- a/packages/bitcore-node/src/services/event.ts +++ b/packages/bitcore-node/src/services/event.ts @@ -1,16 +1,16 @@ +import { EventEmitter } from "events"; import logger from '../logger'; import { StorageService } from './storage'; import { LoggifyClass } from '../decorators/Loggify'; import { EventStorage, IEvent, EventModel } from '../models/events'; -import { PassThrough } from 'stream'; import { Storage } from './storage'; import { Config, ConfigService } from './config'; @LoggifyClass export class EventService { - txStream = new PassThrough({ objectMode: true }); - blockStream = new PassThrough({ objectMode: true }); - addressCoinStream = new PassThrough({ objectMode: true }); + txEvent = new EventEmitter(); + blockEvent = new EventEmitter(); + addressCoinEvent = new EventEmitter(); storageService: StorageService; configService: ConfigService; eventModel: EventModel; @@ -57,7 +57,7 @@ export class EventService { const txEvent = await txCursor.next(); if (txEvent) { const tx = txEvent.payload; - this.txStream.write(tx); + this.txEvent.emit('tx', tx); lastTxUpdate = new Date(); } } @@ -73,7 +73,7 @@ export class EventService { const blockEvent = await blockCursor.next(); if (blockEvent) { const block = blockEvent.payload; - this.blockStream.write(block); + this.blockEvent.emit('block', block); lastBlockUpdate = new Date(); } } @@ -89,7 +89,7 @@ export class EventService { const addressTx = await addressTxCursor.next(); if (addressTx) { const addressCoin = addressTx.payload; - this.addressCoinStream.write(addressCoin); + this.addressCoinEvent.emit('coin', addressCoin); lastAddressTxUpdate = new Date(); } } diff --git a/packages/bitcore-node/src/services/socket.ts b/packages/bitcore-node/src/services/socket.ts index 8b5b0a700f0..6b5e227bf78 100644 --- a/packages/bitcore-node/src/services/socket.ts +++ b/packages/bitcore-node/src/services/socket.ts @@ -66,7 +66,7 @@ export class SocketService { } async wireup() { - this.eventService.txStream.on('data', (tx: IEvent.TxEvent) => { + this.eventService.txEvent.on('tx', (tx: IEvent.TxEvent) => { if (this.io) { const { chain, network } = tx; const sanitizedTx = SanitizeWallet(tx); @@ -74,14 +74,14 @@ export class SocketService { } }); - this.eventService.blockStream.on('data', (block: IEvent.BlockEvent) => { + this.eventService.blockEvent.on('block', (block: IEvent.BlockEvent) => { if (this.io) { const { chain, network } = block; this.io.sockets.in(`/${chain}/${network}/inv`).emit('block', block); } }); - this.eventService.addressCoinStream.on('data', (addressCoin: IEvent.CoinEvent) => { + this.eventService.addressCoinEvent.on('coin', (addressCoin: IEvent.CoinEvent) => { if (this.io) { const { coin, address } = addressCoin; const { chain, network } = coin;