Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into feat/mempool-sync
Browse files Browse the repository at this point in the history
* origin/dev:
  refactor: sending transactions before blocks (#12)
  feat: handle reorg on the daemon (#7)
  • Loading branch information
r4mmer committed Jul 1, 2021
2 parents 8aaa67e + 333ad21 commit 46eb0d8
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 52 deletions.
16 changes: 9 additions & 7 deletions src/api/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export const lambdaCall = (fnName: string, payload: any): Promise<any> => new Pr

lambda.invoke(params, (err, data) => {
if (err) {
logger.error('err', err);
logger.error(`Erroed on ${fnName} method call with payload: ${payload}`);
logger.error(err);
reject(err);
} else {
if (data.StatusCode !== 200) {
Expand All @@ -53,24 +54,25 @@ export const lambdaCall = (fnName: string, payload: any): Promise<any> => new Pr

resolve(body);
} catch(e) {
logger.error('Erroed parsing response body: ', data);
logger.error(`Erroed parsing response body: ${data}`);

return reject(e.message);
}
}
});
});

/**
* Calls the onHandleReorgRequest lambda function
*/
export const invokeReorg = async (): Promise<ApiResponse> => lambdaCall('onHandleReorgRequest', {});

/**
* Calls the onNewTxRequest lambda function with a PreparedTx
*
* @param tx - The prepared transaction to be sent
*/
export const sendTx = async (tx: PreparedTx): Promise<ApiResponse> => {
const response = await lambdaCall('onNewTxRequest', tx);

return response;
};
export const sendTx = async (tx: PreparedTx): Promise<ApiResponse> => lambdaCall('onNewTxRequest', tx);

/**
* Calls the getLatestBlock lambda function from the wallet-service returning
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import logger from './logger';

// @ts-ignore
const machine = interpret(SyncMachine).onTransition(state => {
logger.info(`Sync on state: ${state.value}`);
if (state.changed) {
logger.debug('Transitioned to state: ', state.value);
}
});

const handleMessage = (message: any) => {
Expand Down
33 changes: 27 additions & 6 deletions src/machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
SyncSchema,
} from './types';
import logger from './logger';
import { invokeReorg } from './api/lambda';

// @ts-ignore
export const syncHandler = () => (callback, onReceive) => {
Expand All @@ -37,16 +38,18 @@ export const syncHandler = () => (callback, onReceive) => {
}

if (value && !value.success) {
if (value.type === 'reorg') {
logger.warn('A reorg happened: ', value.message);
callback('REORG');
return;
}

logger.error(value.message);
callback('ERROR');
return;
}

if (value.type === 'reorg') {
logger.info('A reorg happened: ', value.message);
callback('REORG');
return;
} else if (value.type === 'finished') {
if (value.type === 'finished') {
logger.info('Sync generator finished.');
callback('DONE');
} else if (value.type === 'block_success') {
Expand Down Expand Up @@ -200,7 +203,25 @@ export const SyncMachine = Machine<SyncContext, SyncSchema>({
],
},
reorg: {
type: 'final',
invoke: {
id: 'invokeReorg',
src: (_context, _event) => async () => {
const response = await invokeReorg();

if (!response.success) {
logger.error(response);
throw new Error('Reorg failed');
}

return;
},
onDone: {
target: 'idle',
},
onError: {
target: 'failure',
},
}
},
failure: {
type: 'final',
Expand Down
87 changes: 52 additions & 35 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ import logger from './logger';

dotenv.config();

export const IGNORE_TXS: {[key: string] : string[]} = {
mainnet: [
export const IGNORE_TXS: Map<string, string[]> = new Map<string, string[]>([
['mainnet', [
'000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc',
'0002d4d2a15def7604688e1878ab681142a7b155cbe52a6b4e031250ae96db0a',
'0002ad8d1519daaddc8e1a37b14aac0b045129c01832281fb1c02d873c7abbf9',
],
testnet: [
]],
['testnet', [
'0000033139d08176d1051fb3a272c3610457f0c7f686afbe0afe3d37f966db85',
'00e161a6b0bee1781ea9300680913fb76fd0fac4acab527cd9626cc1514abdc9',
'00975897028ceb037307327c953f5e7ad4d3f42402d71bd3d11ecb63ac39f01a',
],
};
]],
]);


const TX_CACHE_SIZE: number = parseInt(process.env.TX_CACHE_SIZE as string) || 200;
Expand All @@ -68,7 +68,14 @@ export const downloadTxFromId = async (txId: string): Promise<FullTx | null> =>
const network: string = process.env.NETWORK || 'mainnet';

// Do not download genesis transactions
if (network in IGNORE_TXS && IGNORE_TXS[network].includes(txId)) return null;
if (IGNORE_TXS.has(network)) {
const networkTxs: string[] = IGNORE_TXS.get(network) as string[];

if (networkTxs.includes(txId)) {
// Skip
return null;
}
}

const txData: RawTxResponse = await downloadTx(txId);
const { tx, meta } = txData;
Expand All @@ -83,16 +90,16 @@ export const downloadTxFromId = async (txId: string): Promise<FullTx | null> =>
* @param txIds - List of transactions to download
* @param data - Downloaded transactions, used while being called recursively
*/
export const recursivelyDownloadTx = async (blockId: string, txIds: string[] = [], data: FullTx[] = []): Promise<FullTx[]> => {
export const recursivelyDownloadTx = async (blockId: string, txIds: string[] = [], data = new Map<string, FullTx>()): Promise<Map<string, FullTx>> => {
if (txIds.length === 0) {
return data;
}

const txId: string = txIds.pop() as string;
const network: string = process.env.NETWORK || 'mainnet';

if (network in IGNORE_TXS) {
const networkTxs: string[] = IGNORE_TXS[network];
if (IGNORE_TXS.has(network)) {
const networkTxs: string[] = IGNORE_TXS.get(network) as string[];

if (networkTxs.includes(txId)) {
// Skip
Expand All @@ -119,9 +126,18 @@ export const recursivelyDownloadTx = async (blockId: string, txIds: string[] = [
return recursivelyDownloadTx(blockId, txIds, data);
}

const newParents = parsedTx.parents.filter((parent) => txIds.indexOf(parent) < 0 && parent !== txId);

return recursivelyDownloadTx(blockId, [...txIds, ...newParents], [...data, parsedTx]);
// check if we have already downloaded the parents
const newParents = parsedTx.parents.filter((parent) => {
return txIds.indexOf(parent) < 0 &&
/* Removing the current tx from the list of transactions to download: */
parent !== txId &&
/* Data works as our return list on the recursion and also as a "seen" list on the BFS.
* We don't want to download a transaction that is already on our seen list.
*/
!data.has(parent)
});

return recursivelyDownloadTx(blockId, [...txIds, ...newParents], data.set(parsedTx.txId, parsedTx));
};

/**
Expand Down Expand Up @@ -340,11 +356,12 @@ export async function* syncToLatestBlock(): AsyncGenerator<StatusEvent> {
const ourBestBlockInFullNode = await getBlockByTxId(ourBestBlock.txId, true);

if (!ourBestBlockInFullNode.success) {
logger.warn(ourBestBlockInFullNode.message);

yield {
type: 'error',
type: 'reorg',
success: false,
message: 'Best block not found in the full-node. Reorg?',
error: ourBestBlockInFullNode.message,
};

return;
Expand Down Expand Up @@ -389,27 +406,9 @@ export async function* syncToLatestBlock(): AsyncGenerator<StatusEvent> {
];

// Download block transactions
let txs: FullTx[] = await recursivelyDownloadTx(block.txId, blockTxs);

txs = txs.sort((x, y) => x.timestamp - y.timestamp);

// We will send the block only after all transactions were downloaded
// to be sure that all downloads were succesfull since there is no
// ROLLBACK yet on the wallet-service.
const sendBlockResponse: ApiResponse = await sendTx(preparedBlock);

if (!sendBlockResponse.success) {
logger.debug(sendBlockResponse);
yield {
type: 'error',
success: false,
message: `Failure on block ${preparedBlock.tx_id}`,
};

success = false;
const txList: Map<string, FullTx> = await recursivelyDownloadTx(block.txId, blockTxs);

break;
}
const txs: FullTx[] = Array.from(txList.values()).sort((x, y) => x.timestamp - y.timestamp);

// Exclude duplicates:
const uniqueTxs: Record<string, FullTx> = txs.reduce((acc: Record<string, FullTx>, tx: FullTx) => {
Expand Down Expand Up @@ -446,6 +445,24 @@ export async function* syncToLatestBlock(): AsyncGenerator<StatusEvent> {
}
}

// We will send the block only after all transactions were sent
// to be sure that all downloads were succesfull since there is no
// ROLLBACK yet on the wallet-service.
const sendBlockResponse: ApiResponse = await sendTx(preparedBlock);

if (!sendBlockResponse.success) {
logger.debug(sendBlockResponse);
yield {
type: 'error',
success: false,
message: `Failure on block ${preparedBlock.tx_id}`,
};

success = false;

break;
}

yield {
type: 'block_success',
success: true,
Expand Down
13 changes: 10 additions & 3 deletions test/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import {
MOCK_CREATE_TOKEN_TX,
generateBlock,
} from './utils';

import {
FullTx,
} from '../src/types';
import {
prepareTx,
parseTx,
Expand Down Expand Up @@ -93,7 +97,7 @@ test('syncToLatestBlockGen should yield an error when it fails to send a block',
getBlockByTxIdSpy.mockReturnValue(Promise.resolve(OUR_BEST_BLOCK_API_RESPONSE));
sendTxSpy.mockReturnValue(Promise.resolve({ success: false, message: 'generic error message' }));
downloadBlockByHeightSpy.mockReturnValue(Promise.resolve(BLOCK_BY_HEIGHT));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve([]));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve(new Map<string, FullTx>()));

const iterator = syncToLatestBlock();

Expand All @@ -119,7 +123,10 @@ test('syncToLatestBlockGen should yield an error when it fails to send a transac
getBlockByTxIdSpy.mockReturnValue(Promise.resolve(OUR_BEST_BLOCK_API_RESPONSE));
// sendTxSpy.mockReturnValue(Promise.resolve({ success: false, message: 'generic error message' }));
downloadBlockByHeightSpy.mockReturnValue(Promise.resolve(BLOCK_BY_HEIGHT));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve([MOCK_FULL_TXS[0]]));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve(new Map<string, FullTx>([[
MOCK_FULL_TXS[0].txId as string,
MOCK_FULL_TXS[0] as FullTx,
]])));

const mockSendTxImplementation = jest.fn((tx) => {
if (hathorLib.helpers.isBlock(tx)) {
Expand Down Expand Up @@ -161,7 +168,7 @@ test('syncToLatestBlockGen should sync from our current height until the best bl
getFullNodeBestBlockSpy.mockReturnValue(Promise.resolve(generateBlock(MOCK_TXS[0], 3)));
getBlockByTxIdSpy.mockReturnValue(Promise.resolve(OUR_BEST_BLOCK_API_RESPONSE));
sendTxSpy.mockReturnValue(Promise.resolve({ success: true, message: 'ok' }));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve([]));
recursivelyDownloadTxSpy.mockReturnValue(Promise.resolve(new Map<string, FullTx>()));

const mockBlockHeightImplementation = jest.fn((height: number) => {
return Promise.resolve({
Expand Down

0 comments on commit 46eb0d8

Please sign in to comment.