diff --git a/packages/daemon/__tests__/db/index.test.ts b/packages/daemon/__tests__/db/index.test.ts index de037f41..b9a389bc 100644 --- a/packages/daemon/__tests__/db/index.test.ts +++ b/packages/daemon/__tests__/db/index.test.ts @@ -562,6 +562,9 @@ describe('address and wallet related tests', () => { }; await updateAddressTablesWithTx(mysql, txId5, timestamp5, addrMap5); await expect(checkAddressBalanceTable(mysql, 5, address1, 'token1', 5, 7, lockExpires - 1, 5)).resolves.toBe(true); + + // We shouldn't throw if the addressBalanceMap is empty: + await expect(updateAddressTablesWithTx(mysql, txId5, timestamp5, {})).resolves.not.toThrow(); }); test('updateAddressLockedBalance', async () => { @@ -715,6 +718,9 @@ describe('address and wallet related tests', () => { const addressWalletMap = await getAddressWalletInfo(mysql, Object.keys(finalMap)); expect(addressWalletMap).toStrictEqual(finalMap); + + // Should not throw on empty addresses list + await expect(getAddressWalletInfo(mysql, [])).resolves.not.toThrow(); }); test('updateWalletLockedBalance', async () => { diff --git a/packages/daemon/__tests__/integration/balances.test.ts b/packages/daemon/__tests__/integration/balances.test.ts index 55580f5e..97355e64 100644 --- a/packages/daemon/__tests__/integration/balances.test.ts +++ b/packages/daemon/__tests__/integration/balances.test.ts @@ -13,18 +13,21 @@ import { cleanDatabase, fetchAddressBalances, validateBalances } from './utils'; import unvoidedScenarioBalances from './scenario_configs/unvoided_transactions.balances'; import reorgScenarioBalances from './scenario_configs/reorg.balances'; import singleChainBlocksAndTransactionsBalances from './scenario_configs/single_chain_blocks_and_transactions.balances'; +import invalidMempoolBalances from './scenario_configs/invalid_mempool_transaction.balances'; import { DB_NAME, DB_USER, DB_PORT, DB_PASS, DB_ENDPOINT, + INVALID_MEMPOOL_TRANSACTION_PORT, UNVOIDED_SCENARIO_PORT, UNVOIDED_SCENARIO_LAST_EVENT, REORG_SCENARIO_PORT, REORG_SCENARIO_LAST_EVENT, SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT, SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT, + INVALID_MEMPOOL_TRANSACTION_LAST_EVENT, } from './config'; jest.mock('../../src/config', () => { @@ -220,3 +223,53 @@ describe('single chain blocks and transactions scenario', () => { }); }); }); + +describe('invalid mempool transactions scenario', () => { + beforeAll(() => { + jest.spyOn(Services, 'fetchMinRewardBlocks').mockImplementation(async () => 300); + }); + + it('should do a full sync and the balances should match', async () => { + // @ts-ignore + getConfig.mockReturnValue({ + NETWORK: 'testnet', + SERVICE_NAME: 'daemon-test', + CONSOLE_LEVEL: 'debug', + TX_CACHE_SIZE: 100, + BLOCK_REWARD_LOCK: 300, + FULLNODE_PEER_ID: 'simulator_peer_id', + STREAM_ID: 'simulator_stream_id', + FULLNODE_NETWORK: 'simulator_network', + FULLNODE_HOST: `127.0.0.1:${INVALID_MEMPOOL_TRANSACTION_PORT}`, + USE_SSL: false, + DB_ENDPOINT, + DB_NAME, + DB_USER, + DB_PASS, + DB_PORT, + }); + + const machine = interpret(SyncMachine); + + await new Promise((resolve) => { + machine.onTransition(async (state) => { + const addressBalances = await fetchAddressBalances(mysql); + if (state.matches('CONNECTED.idle')) { + // @ts-ignore + const lastSyncedEvent = await getLastSyncedEvent(mysql); + console.log(lastSyncedEvent); + if (lastSyncedEvent?.last_event_id === INVALID_MEMPOOL_TRANSACTION_LAST_EVENT) { + // @ts-ignore + expect(validateBalances(addressBalances, invalidMempoolBalances)); + + machine.stop(); + + resolve(); + } + } + }); + + machine.start(); + }); + }); +}); diff --git a/packages/daemon/__tests__/integration/config.ts b/packages/daemon/__tests__/integration/config.ts index 73f49b41..53619d93 100644 --- a/packages/daemon/__tests__/integration/config.ts +++ b/packages/daemon/__tests__/integration/config.ts @@ -18,9 +18,14 @@ export const REORG_SCENARIO_PORT = 8082; // Same as the comment on the unvoided scenario last event export const REORG_SCENARIO_LAST_EVENT = 19; + // single chain blocks and transactions port export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT = 8083; // Same as the comment on the unvoided scenario last event export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT = 37; -export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS']; + +export const INVALID_MEMPOOL_TRANSACTION_PORT = 8085; +export const INVALID_MEMPOOL_TRANSACTION_LAST_EVENT = 40; + +export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS', 'INVALID_MEMPOOL_TRANSACTION']; diff --git a/packages/daemon/__tests__/integration/scenario_configs/invalid_mempool_transaction.balances.ts b/packages/daemon/__tests__/integration/scenario_configs/invalid_mempool_transaction.balances.ts new file mode 100644 index 00000000..216f40f2 --- /dev/null +++ b/packages/daemon/__tests__/integration/scenario_configs/invalid_mempool_transaction.balances.ts @@ -0,0 +1,15 @@ +export default { + "HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000, + "HFtz2f59Lms4p3Jfgtsr73s97MbJHsRENh": 6400, + "HJQbEERnD5Ak3f2dsi8zAmsZrCWTT8FZns": 6400, + "HRSYchTEsFFpZAkgSTMsohNGQ6eLPyhXvJ": 6400, + "HQfVqxyxQV4BHwnsMnRXpZGmwPYiNSVmMu": 6400, + "HPnkpR2vnBuCoZCEnRZNHMBtf8ygeSidbW": 6400, + "HPNvtPZaDF44i6CL91u4BvZPu6z2xPNt26": 6400, + "HQijr325t63VJFdc4vYkaTyd87oeBLpSed": 6400, + "H8fCNrYGkj4B6VzKgtRiHBgoSxM31d65JR": 6400, + "HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 6400, + "HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 0, + "HAqrADnn7GyyT68fSX8zmtsRNFyabPzoRQ": 0, + "HRTH6uGo7zn3LWrosBYn7eXkwAeHAHTRh8": 0 +} diff --git a/packages/daemon/__tests__/integration/scenario_configs/unvoided_transactions.balances.ts b/packages/daemon/__tests__/integration/scenario_configs/unvoided_transactions.balances.ts index 58e6059c..971612a7 100644 --- a/packages/daemon/__tests__/integration/scenario_configs/unvoided_transactions.balances.ts +++ b/packages/daemon/__tests__/integration/scenario_configs/unvoided_transactions.balances.ts @@ -13,4 +13,4 @@ export default { "HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 5400, "HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 1000, "HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000 -} +}; diff --git a/packages/daemon/__tests__/integration/scripts/docker-compose.yml b/packages/daemon/__tests__/integration/scripts/docker-compose.yml index 9262e1bb..8296ba8a 100644 --- a/packages/daemon/__tests__/integration/scripts/docker-compose.yml +++ b/packages/daemon/__tests__/integration/scripts/docker-compose.yml @@ -36,6 +36,15 @@ services: ] ports: - "8083:8080" + invalid_mempool_transaction: + image: hathornetwork/hathor-core:stable + command: [ + "events_simulator", + "--scenario", "INVALID_MEMPOOL_TRANSACTION", + "--seed", "1" + ] + ports: + - "8085:8080" networks: database: diff --git a/packages/daemon/__tests__/integration/utils/index.ts b/packages/daemon/__tests__/integration/utils/index.ts index 836aa063..238e0eb4 100644 --- a/packages/daemon/__tests__/integration/utils/index.ts +++ b/packages/daemon/__tests__/integration/utils/index.ts @@ -70,7 +70,7 @@ export const validateBalances = async ( if (totalBalanceA !== balanceB) { console.log(totalBalanceA); console.log(balanceB); - throw new Error(`Balances are not equal for address: ${address}`); + throw new Error(`Balances are not equal for address: ${address}, expected: ${balanceB}, received: ${totalBalanceA}`); } } }; diff --git a/packages/daemon/package.json b/packages/daemon/package.json index d950264f..8ea7395f 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -14,8 +14,8 @@ "build": "tsc -b", "start": "node dist/index.js", "watch": "tsc -w", - "test_images_up": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml up -d", - "test_images_down": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml down", + "test_images_up": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml up -d", + "test_images_down": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml down", "test_images_integration": "jest --config ./jest_integration.config.js --runInBand --forceExit", "test_images_migrate": "NODE_ENV=test DB_NAME=hathor DB_PORT=3380 DB_PASS=hathor DB_USER=hathor yarn run sequelize-cli --migrations-path ../../db/migrations --config ./__tests__/integration/scripts/sequelize-db-config.js db:migrate", "test_images_wait_for_db": "yarn dlx ts-node ./__tests__/integration/scripts/wait-for-db-up.ts", diff --git a/packages/daemon/src/db/index.ts b/packages/daemon/src/db/index.ts index 1367514d..4148792b 100644 --- a/packages/daemon/src/db/index.ts +++ b/packages/daemon/src/db/index.ts @@ -497,6 +497,10 @@ export const updateAddressTablesWithTx = async ( timestamp: number, addressBalanceMap: StringMap, ): Promise => { + if (Object.keys(addressBalanceMap).length === 0) { + // No need to do anything here + return; + } /* * update address table * @@ -506,12 +510,14 @@ export const updateAddressTablesWithTx = async ( * If address is already present, just increment the transactions counter. */ const addressEntries = Object.keys(addressBalanceMap).map((address) => [address, 1]); - await mysql.query( - `INSERT INTO \`address\`(\`address\`, \`transactions\`) - VALUES ? - ON DUPLICATE KEY UPDATE transactions = transactions + 1`, - [addressEntries], - ); + if (addressEntries.length > 0) { + await mysql.query( + `INSERT INTO \`address\`(\`address\`, \`transactions\`) + VALUES ? + ON DUPLICATE KEY UPDATE transactions = transactions + 1`, + [addressEntries], + ); + } const entries = []; for (const [address, tokenMap] of Object.entries(addressBalanceMap)) { @@ -769,6 +775,10 @@ export const updateAddressLockedBalance = async ( * @returns A map of address and corresponding wallet information */ export const getAddressWalletInfo = async (mysql: MysqlConnection, addresses: string[]): Promise> => { + if (addresses.length === 0) { + return {}; + } + const addressWalletMap: StringMap = {}; const [results] = await mysql.query( `SELECT DISTINCT a.\`address\`, @@ -1036,6 +1046,10 @@ export const incrementTokensTxCount = async ( mysql: MysqlConnection, tokenList: string[], ): Promise => { + if (tokenList.length === 0) { + return; + } + await mysql.query(` UPDATE \`token\` SET \`transactions\` = \`transactions\` + 1 diff --git a/packages/daemon/src/guards/index.ts b/packages/daemon/src/guards/index.ts index 56bf8aaf..4a062741 100644 --- a/packages/daemon/src/guards/index.ts +++ b/packages/daemon/src/guards/index.ts @@ -171,6 +171,19 @@ export const websocketDisconnected = (_context: Context, event: Event) => { return false; }; +/* + * This guard is used in the `idle` state to detect if the transaction in the + * received event is a vertex removed event, indicating that we should remove + * the transaction from our database + */ +export const vertexRemoved = (_context: Context, event: Event) => { + if (event.type !== EventTypes.FULLNODE_EVENT) { + throw new Error(`Invalid event type on vertexRemvoed guard: ${event.type}`); + } + + return event.event.event.type === FullNodeEventTypes.VERTEX_REMOVED; +}; + /* * This guard is used in the `idle` state to detect if the transaction in the * received event is voided, this can serve many functions, one of them is to diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index 10c7982d..1a125d11 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -18,6 +18,7 @@ import { } from '../types'; import { handleVertexAccepted, + handleVertexRemoved, metadataDiff, handleVoidedTx, handleTxFirstBlock, @@ -39,6 +40,7 @@ import { websocketDisconnected, voided, unchanged, + vertexRemoved, } from '../guards'; import { storeInitialState, @@ -70,6 +72,7 @@ export const CONNECTED_STATES = { handlingUnhandledEvent: 'handlingUnhandledEvent', handlingMetadataChanged: 'handlingMetadataChanged', handlingVertexAccepted: 'handlingVertexAccepted', + handlingVertexRemoved: 'handlingVertexRemoved', handlingVoidedTx: 'handlingVoidedTx', handlingUnvoidedTx: 'handlingUnvoidedTx', handlingFirstBlock: 'handlingFirstBlock', @@ -155,6 +158,10 @@ const SyncMachine = Machine({ */ cond: 'voided', target: CONNECTED_STATES.idle, + }, { + actions: ['storeEvent'], + cond: 'vertexRemoved', + target: CONNECTED_STATES.handlingVertexRemoved, }, { actions: ['storeEvent'], cond: 'vertexAccepted', @@ -210,6 +217,18 @@ const SyncMachine = Machine({ onError: `#${SYNC_MACHINE_STATES.ERROR}`, }, }, + [CONNECTED_STATES.handlingVertexRemoved]: { + id: CONNECTED_STATES.handlingVertexRemoved, + invoke: { + src: 'handleVertexRemoved', + data: (_context: Context, event: Event) => event, + onDone: { + target: 'idle', + actions: ['sendAck', 'storeEvent'], + }, + onError: `#${SYNC_MACHINE_STATES.ERROR}`, + }, + }, [CONNECTED_STATES.handlingVoidedTx]: { id: CONNECTED_STATES.handlingVoidedTx, invoke: { @@ -278,6 +297,7 @@ const SyncMachine = Machine({ websocketDisconnected, voided, unchanged, + vertexRemoved, }, delays: { BACKOFF_DELAYED_RECONNECT }, actions: { @@ -298,6 +318,7 @@ const SyncMachine = Machine({ handleVoidedTx, handleUnvoidedTx, handleVertexAccepted, + handleVertexRemoved, handleTxFirstBlock, metadataDiff, updateLastSyncedEvent, diff --git a/packages/daemon/src/services/index.ts b/packages/daemon/src/services/index.ts index 71a45c42..128f2812 100644 --- a/packages/daemon/src/services/index.ts +++ b/packages/daemon/src/services/index.ts @@ -7,6 +7,7 @@ // @ts-ignore import hathorLib from '@hathor/wallet-lib'; +import { Connection as MysqlConnection } from 'mysql2/promise'; import axios from 'axios'; import { get } from 'lodash'; import { NftUtils } from '@wallet-service/common'; @@ -19,6 +20,8 @@ import { Event, Context, FullNodeEvent, + EventTxInput, + EventTxOutput, } from '../types'; import { TxInput, @@ -225,7 +228,9 @@ export const handleVertexAccepted = async (context: Context, _event: Event) => { const blockRewardOutput = outputs[0]; // add miner to the miners table - await addMiner(mysql, blockRewardOutput.decoded.address, hash); + if (blockRewardOutput.decoded) { + await addMiner(mysql, blockRewardOutput.decoded.address, hash); + } // here we check if we have any utxos on our database that is locked but // has its timelock < now @@ -377,7 +382,7 @@ export const handleVertexAccepted = async (context: Context, _event: Event) => { } }; -export const handleVoidedTx = async (context: Context) => { +export const handleVertexRemoved = async (context: Context, _event: Event) => { const mysql = await getDbConnection(); await mysql.beginTransaction(); @@ -391,37 +396,93 @@ export const handleVoidedTx = async (context: Context) => { tokens, } = fullNodeEvent.event.data; - logger.debug(`Will handle voided tx for ${hash}`); + const dbTx: DbTransaction | null = await getTransactionById(mysql, hash); - const dbTxOutputs: DbTxOutput[] = await getTxOutputsFromTx(mysql, hash); - const txOutputs: TxOutputWithIndex[] = prepareOutputs(outputs, tokens); - const txInputs: TxInput[] = prepareInputs(inputs, tokens); + if (!dbTx) { + throw new Error(`VERTEX_REMOVED event received, but transaction ${hash} was not in the database.`); + } - const txOutputsWithLocked = txOutputs.map((output) => { - const dbTxOutput = dbTxOutputs.find((_output) => _output.index === output.index); + logger.info(`[VertexRemoved] Voiding tx: ${hash}`); + await voidTx( + mysql, + hash, + inputs, + outputs, + tokens, + ); - if (!dbTxOutput) { - throw new Error('Transaction output different from database output!'); - } + logger.info(`[VertexRemoved] Removing tx from database: ${hash}`); + await cleanupVoidedTx(mysql, hash); + await dbUpdateLastSyncedEvent(mysql, fullNodeEvent.event.id); + await mysql.commit(); + } catch (e) { + logger.debug(e); + await mysql.rollback(); - return { - ...output, - locked: dbTxOutput.locked, - }; - }); + throw e; + } finally { + mysql.destroy(); + } +}; - const addressBalanceMap: StringMap = getAddressBalanceMap(txInputs, txOutputsWithLocked); - await voidTransaction(mysql, hash, addressBalanceMap); - await markUtxosAsVoided(mysql, dbTxOutputs); +export const voidTx = async ( + mysql: MysqlConnection, + hash: string, + inputs: EventTxInput[], + outputs: EventTxOutput[], + tokens: string[], +) => { + const dbTxOutputs: DbTxOutput[] = await getTxOutputsFromTx(mysql, hash); + const txOutputs: TxOutputWithIndex[] = prepareOutputs(outputs, tokens); + const txInputs: TxInput[] = prepareInputs(inputs, tokens); + + const txOutputsWithLocked = txOutputs.map((output) => { + const dbTxOutput = dbTxOutputs.find((_output) => _output.index === output.index); + + if (!dbTxOutput) { + throw new Error('Transaction output different from database output!'); + } - const addresses = Object.keys(addressBalanceMap); - await validateAddressBalances(mysql, addresses); + return { + ...output, + locked: dbTxOutput.locked, + }; + }); - await dbUpdateLastSyncedEvent(mysql, fullNodeEvent.event.id); + const addressBalanceMap: StringMap = getAddressBalanceMap(txInputs, txOutputsWithLocked); + await voidTransaction(mysql, hash, addressBalanceMap); + await markUtxosAsVoided(mysql, dbTxOutputs); + + const addresses = Object.keys(addressBalanceMap); + await validateAddressBalances(mysql, addresses); +}; + +export const handleVoidedTx = async (context: Context) => { + const mysql = await getDbConnection(); + await mysql.beginTransaction(); + try { + const fullNodeEvent = context.event as FullNodeEvent; + + const { + hash, + outputs, + inputs, + tokens, + } = fullNodeEvent.event.data; + + logger.debug(`Will handle voided tx for ${hash}`); + await voidTx( + mysql, + hash, + inputs, + outputs, + tokens + ); logger.debug(`Voided tx ${hash}`); await mysql.commit(); + await dbUpdateLastSyncedEvent(mysql, fullNodeEvent.event.id); } catch (e) { logger.debug(e); await mysql.rollback(); diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index ca2685c6..83cdc958 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -9,11 +9,6 @@ export type WebSocketEvent = | { type: 'CONNECTED' } | { type: 'DISCONNECTED' }; -export type MetadataDecidedEvent = { - type: 'TX_VOIDED' | 'TX_UNVOIDED' | 'TX_NEW' | 'TX_FIRST_BLOCK' | 'IGNORE'; - originalEvent: FullNodeEvent; -} - export type WebSocketSendEvent = | { type: 'START_STREAM'; @@ -40,10 +35,17 @@ export enum EventTypes { export enum FullNodeEventTypes { VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED', + VERTEX_REMOVED = 'VERTEX_REMOVED', NEW_VERTEX_ACCEPTED = 'NEW_VERTEX_ACCEPTED', LOAD_STARTED = 'LOAD_STARTED', LOAD_FINISHED = 'LOAD_FINISHED', - REORG_STARTED = 'REORG_FINISHED', + REORG_STARTED = 'REORG_STARTED', + REORG_FINISHED= 'REORG_FINISHED', +} + +export type MetadataDecidedEvent = { + type: 'TX_VOIDED' | 'TX_UNVOIDED' | 'TX_NEW' | 'TX_FIRST_BLOCK' | 'IGNORE'; + originalEvent: FullNodeEvent; } export type Event = @@ -53,6 +55,11 @@ export type Event = | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent}; + +export interface VertexRemovedEventData { + vertex_id: string; +} + export type FullNodeEvent = { stream_id: string; peer_id: string; diff --git a/packages/daemon/src/utils/hash.ts b/packages/daemon/src/utils/hash.ts index 7e985308..8d041ef8 100644 --- a/packages/daemon/src/utils/hash.ts +++ b/packages/daemon/src/utils/hash.ts @@ -9,7 +9,7 @@ import * as crypto from 'crypto'; /** * Generates an MD5 hash of the provided string data. - * + * * @param data - The string data to hash. * @returns - The MD5 hash of the data in hexadecimal format. */ @@ -21,7 +21,7 @@ export const md5Hash = (data: string): string => { /** * Serializes select transaction metadata attributes into a string format. - * + * * @param meta - The transaction metadata to serialize. * @returns - A serialized string representing specific fields of the metadata. */ @@ -31,9 +31,9 @@ export const serializeTxData = (meta: unknown): string => /** * Hashes transaction metadata using MD5. - * + * * Serializes the relevant fields of transaction metadata and then computes its MD5 hash. - * + * * @param meta - The transaction metadata to hash. * @returns - The MD5 hash of the serialized metadata. */