Skip to content

Commit

Permalink
feat: handling the vertex_removed event (#182)
Browse files Browse the repository at this point in the history
* feat: handling the vertex_removed event

* fix: handling empty arrays on IN conditions

* feat: handling VERTEX_REMOVED event

* tests: fix last event

* feat: voiding and removing transaction on vertex_removed event

* tests: added tests for empty array errors

* chore: using port 8085

* chore: actually using 8085

* refactor: removed misleading comment
  • Loading branch information
andreabadesso authored Oct 7, 2024
1 parent b9e6d6f commit 44b7615
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 43 deletions.
6 changes: 6 additions & 0 deletions packages/daemon/__tests__/db/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ describe('address and wallet related tests', () => {
};
await updateAddressTablesWithTx(mysql, txId5, timestamp5, addrMap5);
await expect(checkAddressBalanceTable(mysql, 5, address1, 'token1', 5, 7, lockExpires - 1, 5)).resolves.toBe(true);

// We shouldn't throw if the addressBalanceMap is empty:
await expect(updateAddressTablesWithTx(mysql, txId5, timestamp5, {})).resolves.not.toThrow();
});

test('updateAddressLockedBalance', async () => {
Expand Down Expand Up @@ -715,6 +718,9 @@ describe('address and wallet related tests', () => {

const addressWalletMap = await getAddressWalletInfo(mysql, Object.keys(finalMap));
expect(addressWalletMap).toStrictEqual(finalMap);

// Should not throw on empty addresses list
await expect(getAddressWalletInfo(mysql, [])).resolves.not.toThrow();
});

test('updateWalletLockedBalance', async () => {
Expand Down
53 changes: 53 additions & 0 deletions packages/daemon/__tests__/integration/balances.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import { cleanDatabase, fetchAddressBalances, validateBalances } from './utils';
import unvoidedScenarioBalances from './scenario_configs/unvoided_transactions.balances';
import reorgScenarioBalances from './scenario_configs/reorg.balances';
import singleChainBlocksAndTransactionsBalances from './scenario_configs/single_chain_blocks_and_transactions.balances';
import invalidMempoolBalances from './scenario_configs/invalid_mempool_transaction.balances';
import {
DB_NAME,
DB_USER,
DB_PORT,
DB_PASS,
DB_ENDPOINT,
INVALID_MEMPOOL_TRANSACTION_PORT,
UNVOIDED_SCENARIO_PORT,
UNVOIDED_SCENARIO_LAST_EVENT,
REORG_SCENARIO_PORT,
REORG_SCENARIO_LAST_EVENT,
SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT,
SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT,
INVALID_MEMPOOL_TRANSACTION_LAST_EVENT,
} from './config';

jest.mock('../../src/config', () => {
Expand Down Expand Up @@ -220,3 +223,53 @@ describe('single chain blocks and transactions scenario', () => {
});
});
});

describe('invalid mempool transactions scenario', () => {
beforeAll(() => {
jest.spyOn(Services, 'fetchMinRewardBlocks').mockImplementation(async () => 300);
});

it('should do a full sync and the balances should match', async () => {
// @ts-ignore
getConfig.mockReturnValue({
NETWORK: 'testnet',
SERVICE_NAME: 'daemon-test',
CONSOLE_LEVEL: 'debug',
TX_CACHE_SIZE: 100,
BLOCK_REWARD_LOCK: 300,
FULLNODE_PEER_ID: 'simulator_peer_id',
STREAM_ID: 'simulator_stream_id',
FULLNODE_NETWORK: 'simulator_network',
FULLNODE_HOST: `127.0.0.1:${INVALID_MEMPOOL_TRANSACTION_PORT}`,
USE_SSL: false,
DB_ENDPOINT,
DB_NAME,
DB_USER,
DB_PASS,
DB_PORT,
});

const machine = interpret(SyncMachine);

await new Promise<void>((resolve) => {
machine.onTransition(async (state) => {
const addressBalances = await fetchAddressBalances(mysql);
if (state.matches('CONNECTED.idle')) {
// @ts-ignore
const lastSyncedEvent = await getLastSyncedEvent(mysql);
console.log(lastSyncedEvent);
if (lastSyncedEvent?.last_event_id === INVALID_MEMPOOL_TRANSACTION_LAST_EVENT) {
// @ts-ignore
expect(validateBalances(addressBalances, invalidMempoolBalances));

machine.stop();

resolve();
}
}
});

machine.start();
});
});
});
7 changes: 6 additions & 1 deletion packages/daemon/__tests__/integration/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ export const REORG_SCENARIO_PORT = 8082;
// Same as the comment on the unvoided scenario last event
export const REORG_SCENARIO_LAST_EVENT = 19;


// single chain blocks and transactions port
export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT = 8083;
// Same as the comment on the unvoided scenario last event
export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT = 37;

export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS'];

export const INVALID_MEMPOOL_TRANSACTION_PORT = 8085;
export const INVALID_MEMPOOL_TRANSACTION_LAST_EVENT = 40;

export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS', 'INVALID_MEMPOOL_TRANSACTION'];
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export default {
"HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000,
"HFtz2f59Lms4p3Jfgtsr73s97MbJHsRENh": 6400,
"HJQbEERnD5Ak3f2dsi8zAmsZrCWTT8FZns": 6400,
"HRSYchTEsFFpZAkgSTMsohNGQ6eLPyhXvJ": 6400,
"HQfVqxyxQV4BHwnsMnRXpZGmwPYiNSVmMu": 6400,
"HPnkpR2vnBuCoZCEnRZNHMBtf8ygeSidbW": 6400,
"HPNvtPZaDF44i6CL91u4BvZPu6z2xPNt26": 6400,
"HQijr325t63VJFdc4vYkaTyd87oeBLpSed": 6400,
"H8fCNrYGkj4B6VzKgtRiHBgoSxM31d65JR": 6400,
"HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 6400,
"HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 0,
"HAqrADnn7GyyT68fSX8zmtsRNFyabPzoRQ": 0,
"HRTH6uGo7zn3LWrosBYn7eXkwAeHAHTRh8": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ export default {
"HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 5400,
"HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 1000,
"HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ services:
]
ports:
- "8083:8080"
invalid_mempool_transaction:
image: hathornetwork/hathor-core:stable
command: [
"events_simulator",
"--scenario", "INVALID_MEMPOOL_TRANSACTION",
"--seed", "1"
]
ports:
- "8085:8080"

networks:
database:
2 changes: 1 addition & 1 deletion packages/daemon/__tests__/integration/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export const validateBalances = async (
if (totalBalanceA !== balanceB) {
console.log(totalBalanceA);
console.log(balanceB);
throw new Error(`Balances are not equal for address: ${address}`);
throw new Error(`Balances are not equal for address: ${address}, expected: ${balanceB}, received: ${totalBalanceA}`);
}
}
};
4 changes: 2 additions & 2 deletions packages/daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"build": "tsc -b",
"start": "node dist/index.js",
"watch": "tsc -w",
"test_images_up": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml up -d",
"test_images_down": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml down",
"test_images_up": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml up -d",
"test_images_down": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml down",
"test_images_integration": "jest --config ./jest_integration.config.js --runInBand --forceExit",
"test_images_migrate": "NODE_ENV=test DB_NAME=hathor DB_PORT=3380 DB_PASS=hathor DB_USER=hathor yarn run sequelize-cli --migrations-path ../../db/migrations --config ./__tests__/integration/scripts/sequelize-db-config.js db:migrate",
"test_images_wait_for_db": "yarn dlx ts-node ./__tests__/integration/scripts/wait-for-db-up.ts",
Expand Down
26 changes: 20 additions & 6 deletions packages/daemon/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ export const updateAddressTablesWithTx = async (
timestamp: number,
addressBalanceMap: StringMap<TokenBalanceMap>,
): Promise<void> => {
if (Object.keys(addressBalanceMap).length === 0) {
// No need to do anything here
return;
}
/*
* update address table
*
Expand All @@ -506,12 +510,14 @@ export const updateAddressTablesWithTx = async (
* If address is already present, just increment the transactions counter.
*/
const addressEntries = Object.keys(addressBalanceMap).map((address) => [address, 1]);
await mysql.query(
`INSERT INTO \`address\`(\`address\`, \`transactions\`)
VALUES ?
ON DUPLICATE KEY UPDATE transactions = transactions + 1`,
[addressEntries],
);
if (addressEntries.length > 0) {
await mysql.query(
`INSERT INTO \`address\`(\`address\`, \`transactions\`)
VALUES ?
ON DUPLICATE KEY UPDATE transactions = transactions + 1`,
[addressEntries],
);
}

const entries = [];
for (const [address, tokenMap] of Object.entries(addressBalanceMap)) {
Expand Down Expand Up @@ -769,6 +775,10 @@ export const updateAddressLockedBalance = async (
* @returns A map of address and corresponding wallet information
*/
export const getAddressWalletInfo = async (mysql: MysqlConnection, addresses: string[]): Promise<StringMap<Wallet>> => {
if (addresses.length === 0) {
return {};
}

const addressWalletMap: StringMap<Wallet> = {};
const [results] = await mysql.query(
`SELECT DISTINCT a.\`address\`,
Expand Down Expand Up @@ -1036,6 +1046,10 @@ export const incrementTokensTxCount = async (
mysql: MysqlConnection,
tokenList: string[],
): Promise<void> => {
if (tokenList.length === 0) {
return;
}

await mysql.query(`
UPDATE \`token\`
SET \`transactions\` = \`transactions\` + 1
Expand Down
13 changes: 13 additions & 0 deletions packages/daemon/src/guards/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ export const websocketDisconnected = (_context: Context, event: Event) => {
return false;
};

/*
* This guard is used in the `idle` state to detect if the transaction in the
* received event is a vertex removed event, indicating that we should remove
* the transaction from our database
*/
export const vertexRemoved = (_context: Context, event: Event) => {
if (event.type !== EventTypes.FULLNODE_EVENT) {
throw new Error(`Invalid event type on vertexRemvoed guard: ${event.type}`);
}

return event.event.event.type === FullNodeEventTypes.VERTEX_REMOVED;
};

/*
* This guard is used in the `idle` state to detect if the transaction in the
* received event is voided, this can serve many functions, one of them is to
Expand Down
21 changes: 21 additions & 0 deletions packages/daemon/src/machines/SyncMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../types';
import {
handleVertexAccepted,
handleVertexRemoved,
metadataDiff,
handleVoidedTx,
handleTxFirstBlock,
Expand All @@ -39,6 +40,7 @@ import {
websocketDisconnected,
voided,
unchanged,
vertexRemoved,
} from '../guards';
import {
storeInitialState,
Expand Down Expand Up @@ -70,6 +72,7 @@ export const CONNECTED_STATES = {
handlingUnhandledEvent: 'handlingUnhandledEvent',
handlingMetadataChanged: 'handlingMetadataChanged',
handlingVertexAccepted: 'handlingVertexAccepted',
handlingVertexRemoved: 'handlingVertexRemoved',
handlingVoidedTx: 'handlingVoidedTx',
handlingUnvoidedTx: 'handlingUnvoidedTx',
handlingFirstBlock: 'handlingFirstBlock',
Expand Down Expand Up @@ -155,6 +158,10 @@ const SyncMachine = Machine<Context, any, Event>({
*/
cond: 'voided',
target: CONNECTED_STATES.idle,
}, {
actions: ['storeEvent'],
cond: 'vertexRemoved',
target: CONNECTED_STATES.handlingVertexRemoved,
}, {
actions: ['storeEvent'],
cond: 'vertexAccepted',
Expand Down Expand Up @@ -210,6 +217,18 @@ const SyncMachine = Machine<Context, any, Event>({
onError: `#${SYNC_MACHINE_STATES.ERROR}`,
},
},
[CONNECTED_STATES.handlingVertexRemoved]: {
id: CONNECTED_STATES.handlingVertexRemoved,
invoke: {
src: 'handleVertexRemoved',
data: (_context: Context, event: Event) => event,
onDone: {
target: 'idle',
actions: ['sendAck', 'storeEvent'],
},
onError: `#${SYNC_MACHINE_STATES.ERROR}`,
},
},
[CONNECTED_STATES.handlingVoidedTx]: {
id: CONNECTED_STATES.handlingVoidedTx,
invoke: {
Expand Down Expand Up @@ -278,6 +297,7 @@ const SyncMachine = Machine<Context, any, Event>({
websocketDisconnected,
voided,
unchanged,
vertexRemoved,
},
delays: { BACKOFF_DELAYED_RECONNECT },
actions: {
Expand All @@ -298,6 +318,7 @@ const SyncMachine = Machine<Context, any, Event>({
handleVoidedTx,
handleUnvoidedTx,
handleVertexAccepted,
handleVertexRemoved,
handleTxFirstBlock,
metadataDiff,
updateLastSyncedEvent,
Expand Down
Loading

0 comments on commit 44b7615

Please sign in to comment.