Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handling the vertex_removed event #182

Merged
merged 9 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/daemon/__tests__/db/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ describe('address and wallet related tests', () => {
};
await updateAddressTablesWithTx(mysql, txId5, timestamp5, addrMap5);
await expect(checkAddressBalanceTable(mysql, 5, address1, 'token1', 5, 7, lockExpires - 1, 5)).resolves.toBe(true);

// We shouldn't throw if the addressBalanceMap is empty:
await expect(updateAddressTablesWithTx(mysql, txId5, timestamp5, {})).resolves.not.toThrow();
});

test('updateAddressLockedBalance', async () => {
Expand Down Expand Up @@ -715,6 +718,9 @@ describe('address and wallet related tests', () => {

const addressWalletMap = await getAddressWalletInfo(mysql, Object.keys(finalMap));
expect(addressWalletMap).toStrictEqual(finalMap);

// Should not throw on empty addresses list
await expect(getAddressWalletInfo(mysql, [])).resolves.not.toThrow();
});

test('updateWalletLockedBalance', async () => {
Expand Down
53 changes: 53 additions & 0 deletions packages/daemon/__tests__/integration/balances.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import { cleanDatabase, fetchAddressBalances, validateBalances } from './utils';
import unvoidedScenarioBalances from './scenario_configs/unvoided_transactions.balances';
import reorgScenarioBalances from './scenario_configs/reorg.balances';
import singleChainBlocksAndTransactionsBalances from './scenario_configs/single_chain_blocks_and_transactions.balances';
import invalidMempoolBalances from './scenario_configs/invalid_mempool_transaction.balances';
import {
DB_NAME,
DB_USER,
DB_PORT,
DB_PASS,
DB_ENDPOINT,
INVALID_MEMPOOL_TRANSACTION_PORT,
UNVOIDED_SCENARIO_PORT,
UNVOIDED_SCENARIO_LAST_EVENT,
REORG_SCENARIO_PORT,
REORG_SCENARIO_LAST_EVENT,
SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT,
SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT,
INVALID_MEMPOOL_TRANSACTION_LAST_EVENT,
} from './config';

jest.mock('../../src/config', () => {
Expand Down Expand Up @@ -220,3 +223,53 @@ describe('single chain blocks and transactions scenario', () => {
});
});
});

describe('invalid mempool transactions scenario', () => {
beforeAll(() => {
jest.spyOn(Services, 'fetchMinRewardBlocks').mockImplementation(async () => 300);
});

it('should do a full sync and the balances should match', async () => {
// @ts-ignore
getConfig.mockReturnValue({
NETWORK: 'testnet',
SERVICE_NAME: 'daemon-test',
CONSOLE_LEVEL: 'debug',
TX_CACHE_SIZE: 100,
BLOCK_REWARD_LOCK: 300,
FULLNODE_PEER_ID: 'simulator_peer_id',
STREAM_ID: 'simulator_stream_id',
FULLNODE_NETWORK: 'simulator_network',
FULLNODE_HOST: `127.0.0.1:${INVALID_MEMPOOL_TRANSACTION_PORT}`,
USE_SSL: false,
DB_ENDPOINT,
DB_NAME,
DB_USER,
DB_PASS,
DB_PORT,
});

const machine = interpret(SyncMachine);

await new Promise<void>((resolve) => {
machine.onTransition(async (state) => {
const addressBalances = await fetchAddressBalances(mysql);
if (state.matches('CONNECTED.idle')) {
// @ts-ignore
const lastSyncedEvent = await getLastSyncedEvent(mysql);
console.log(lastSyncedEvent);
if (lastSyncedEvent?.last_event_id === INVALID_MEMPOOL_TRANSACTION_LAST_EVENT) {
// @ts-ignore
expect(validateBalances(addressBalances, invalidMempoolBalances));

machine.stop();

resolve();
}
}
});

machine.start();
});
});
});
7 changes: 6 additions & 1 deletion packages/daemon/__tests__/integration/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ export const REORG_SCENARIO_PORT = 8082;
// Same as the comment on the unvoided scenario last event
export const REORG_SCENARIO_LAST_EVENT = 19;


// single chain blocks and transactions port
export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_PORT = 8083;
// Same as the comment on the unvoided scenario last event
export const SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS_LAST_EVENT = 37;

export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS'];

export const INVALID_MEMPOOL_TRANSACTION_PORT = 8085;
export const INVALID_MEMPOOL_TRANSACTION_LAST_EVENT = 40;

export const SCENARIOS = ['UNVOIDED_SCENARIO', 'REORG_SCENARIO', 'SINGLE_CHAIN_BLOCKS_AND_TRANSACTIONS', 'INVALID_MEMPOOL_TRANSACTION'];
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export default {
"HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000,
"HFtz2f59Lms4p3Jfgtsr73s97MbJHsRENh": 6400,
"HJQbEERnD5Ak3f2dsi8zAmsZrCWTT8FZns": 6400,
"HRSYchTEsFFpZAkgSTMsohNGQ6eLPyhXvJ": 6400,
"HQfVqxyxQV4BHwnsMnRXpZGmwPYiNSVmMu": 6400,
"HPnkpR2vnBuCoZCEnRZNHMBtf8ygeSidbW": 6400,
"HPNvtPZaDF44i6CL91u4BvZPu6z2xPNt26": 6400,
"HQijr325t63VJFdc4vYkaTyd87oeBLpSed": 6400,
"H8fCNrYGkj4B6VzKgtRiHBgoSxM31d65JR": 6400,
"HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 6400,
"HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 0,
"HAqrADnn7GyyT68fSX8zmtsRNFyabPzoRQ": 0,
"HRTH6uGo7zn3LWrosBYn7eXkwAeHAHTRh8": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ export default {
"HNqTfEASfdx7H4vMUGzfD2HyD3GeKuxjTJ": 5400,
"HRQe4CXj8AZXzSmuNztU8iQR74QTQMbnTs": 1000,
"HVayMofEDh4XGsaQJeRJKhutYxYodYNop6": 100000000000
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ services:
]
ports:
- "8083:8080"
invalid_mempool_transaction:
image: hathornetwork/hathor-core:stable
command: [
"events_simulator",
"--scenario", "INVALID_MEMPOOL_TRANSACTION",
"--seed", "1"
]
ports:
- "8085:8080"

networks:
database:
2 changes: 1 addition & 1 deletion packages/daemon/__tests__/integration/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export const validateBalances = async (
if (totalBalanceA !== balanceB) {
console.log(totalBalanceA);
console.log(balanceB);
throw new Error(`Balances are not equal for address: ${address}`);
throw new Error(`Balances are not equal for address: ${address}, expected: ${balanceB}, received: ${totalBalanceA}`);
}
}
};
4 changes: 2 additions & 2 deletions packages/daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"build": "tsc -b",
"start": "node dist/index.js",
"watch": "tsc -w",
"test_images_up": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml up -d",
"test_images_down": "docker-compose -f ./__tests__/integration/scripts/docker-compose.yml down",
"test_images_up": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml up -d",
"test_images_down": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml down",
"test_images_integration": "jest --config ./jest_integration.config.js --runInBand --forceExit",
"test_images_migrate": "NODE_ENV=test DB_NAME=hathor DB_PORT=3380 DB_PASS=hathor DB_USER=hathor yarn run sequelize-cli --migrations-path ../../db/migrations --config ./__tests__/integration/scripts/sequelize-db-config.js db:migrate",
"test_images_wait_for_db": "yarn dlx ts-node ./__tests__/integration/scripts/wait-for-db-up.ts",
Expand Down
26 changes: 20 additions & 6 deletions packages/daemon/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ export const updateAddressTablesWithTx = async (
timestamp: number,
addressBalanceMap: StringMap<TokenBalanceMap>,
): Promise<void> => {
if (Object.keys(addressBalanceMap).length === 0) {
// No need to do anything here
return;
}
/*
* update address table
*
Expand All @@ -506,12 +510,14 @@ export const updateAddressTablesWithTx = async (
* If address is already present, just increment the transactions counter.
*/
const addressEntries = Object.keys(addressBalanceMap).map((address) => [address, 1]);
await mysql.query(
`INSERT INTO \`address\`(\`address\`, \`transactions\`)
VALUES ?
ON DUPLICATE KEY UPDATE transactions = transactions + 1`,
[addressEntries],
);
if (addressEntries.length > 0) {
await mysql.query(
`INSERT INTO \`address\`(\`address\`, \`transactions\`)
VALUES ?
ON DUPLICATE KEY UPDATE transactions = transactions + 1`,
[addressEntries],
);
}

const entries = [];
for (const [address, tokenMap] of Object.entries(addressBalanceMap)) {
Expand Down Expand Up @@ -769,6 +775,10 @@ export const updateAddressLockedBalance = async (
* @returns A map of address and corresponding wallet information
*/
export const getAddressWalletInfo = async (mysql: MysqlConnection, addresses: string[]): Promise<StringMap<Wallet>> => {
if (addresses.length === 0) {
return {};
}

const addressWalletMap: StringMap<Wallet> = {};
const [results] = await mysql.query(
`SELECT DISTINCT a.\`address\`,
Expand Down Expand Up @@ -1036,6 +1046,10 @@ export const incrementTokensTxCount = async (
mysql: MysqlConnection,
tokenList: string[],
): Promise<void> => {
if (tokenList.length === 0) {
return;
}

await mysql.query(`
UPDATE \`token\`
SET \`transactions\` = \`transactions\` + 1
Expand Down
13 changes: 13 additions & 0 deletions packages/daemon/src/guards/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ export const websocketDisconnected = (_context: Context, event: Event) => {
return false;
};

/*
* This guard is used in the `idle` state to detect if the transaction in the
* received event is a vertex removed event, indicating that we should remove
* the transaction from our database
*/
export const vertexRemoved = (_context: Context, event: Event) => {
if (event.type !== EventTypes.FULLNODE_EVENT) {
throw new Error(`Invalid event type on vertexRemvoed guard: ${event.type}`);
}

return event.event.event.type === FullNodeEventTypes.VERTEX_REMOVED;
};

/*
* This guard is used in the `idle` state to detect if the transaction in the
* received event is voided, this can serve many functions, one of them is to
Expand Down
21 changes: 21 additions & 0 deletions packages/daemon/src/machines/SyncMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../types';
import {
handleVertexAccepted,
handleVertexRemoved,
metadataDiff,
handleVoidedTx,
handleTxFirstBlock,
Expand All @@ -39,6 +40,7 @@ import {
websocketDisconnected,
voided,
unchanged,
vertexRemoved,
} from '../guards';
import {
storeInitialState,
Expand Down Expand Up @@ -70,6 +72,7 @@ export const CONNECTED_STATES = {
handlingUnhandledEvent: 'handlingUnhandledEvent',
handlingMetadataChanged: 'handlingMetadataChanged',
handlingVertexAccepted: 'handlingVertexAccepted',
handlingVertexRemoved: 'handlingVertexRemoved',
handlingVoidedTx: 'handlingVoidedTx',
handlingUnvoidedTx: 'handlingUnvoidedTx',
handlingFirstBlock: 'handlingFirstBlock',
Expand Down Expand Up @@ -155,6 +158,10 @@ const SyncMachine = Machine<Context, any, Event>({
*/
cond: 'voided',
target: CONNECTED_STATES.idle,
}, {
actions: ['storeEvent'],
cond: 'vertexRemoved',
target: CONNECTED_STATES.handlingVertexRemoved,
}, {
actions: ['storeEvent'],
cond: 'vertexAccepted',
Expand Down Expand Up @@ -210,6 +217,18 @@ const SyncMachine = Machine<Context, any, Event>({
onError: `#${SYNC_MACHINE_STATES.ERROR}`,
},
},
[CONNECTED_STATES.handlingVertexRemoved]: {
id: CONNECTED_STATES.handlingVertexRemoved,
invoke: {
src: 'handleVertexRemoved',
data: (_context: Context, event: Event) => event,
onDone: {
target: 'idle',
actions: ['sendAck', 'storeEvent'],
},
onError: `#${SYNC_MACHINE_STATES.ERROR}`,
},
},
[CONNECTED_STATES.handlingVoidedTx]: {
id: CONNECTED_STATES.handlingVoidedTx,
invoke: {
Expand Down Expand Up @@ -278,6 +297,7 @@ const SyncMachine = Machine<Context, any, Event>({
websocketDisconnected,
voided,
unchanged,
vertexRemoved,
},
delays: { BACKOFF_DELAYED_RECONNECT },
actions: {
Expand All @@ -298,6 +318,7 @@ const SyncMachine = Machine<Context, any, Event>({
handleVoidedTx,
handleUnvoidedTx,
handleVertexAccepted,
handleVertexRemoved,
handleTxFirstBlock,
metadataDiff,
updateLastSyncedEvent,
Expand Down
Loading
Loading