From 6c6fa5c1a6694c3085541c3f29a664b594633684 Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Thu, 27 Dec 2018 08:57:25 +0000 Subject: [PATCH 1/3] persitence crawler --- crawl.js | 142 +++++++++++++++++-------------- models/blockchain/blockSigner.js | 10 ++- models/blockchain/validator.js | 9 +- models/blockchain/web3ws.js | 16 ++-- 4 files changed, 99 insertions(+), 78 deletions(-) diff --git a/crawl.js b/crawl.js index 9d691cd4..39d84a88 100644 --- a/crawl.js +++ b/crawl.js @@ -1,46 +1,51 @@ 'use strict' -const validator = require('./models/blockchain/validator') -const blockSigner = require('./models/blockchain/blockSigner') -const web3 = require('./models/blockchain/web3ws') +const Validator = require('./models/blockchain/validator') +const BlockSigner = require('./models/blockchain/blockSigner') +const Web3Ws = require('./models/blockchain/web3ws') const config = require('config') const db = require('./models/mongodb') const BigNumber = require('bignumber.js') const EventEmitter = require('events').EventEmitter const moment = require('moment') const logger = require('./helpers/logger') -const emitter = new EventEmitter() process.setMaxListeners(100) +var web3 = new Web3Ws() +var validator = new Validator(web3) +var blockSigner = new BlockSigner(web3) +var cpBlockSigner = 0 +var cpValidator = 0 + async function watchValidator () { try { - const blockNumber = await web3.eth.getBlockNumber() - logger.info('TomoValidator %s - Listen events from block number %s ...', + const blockNumber = cpValidator || await web3.eth.getBlockNumber() + logger.debug('TomoValidator %s - Listen events from block number %s ...', config.get('blockchain.validatorAddress'), blockNumber) - validator.events.allEvents({ + return validator.getPastEvents('allEvents', { fromBlock: blockNumber, toBlock: 'latest' - }, async function (error, result) { - if (error) { - logger.error('watchValidator %s %s', error, result) - return false - } else { - logger.info('Event %s in block %s', result.event, result.blockNumber) + }).then(async events => { + let map = events.map(async (event) => { + let result = event + logger.debug('Event %s in block %s', result.event, result.blockNumber) if (result.event === 'Withdraw') { let owner = (result.returnValues._owner || '').toLowerCase() let blockNumber = result.blockNumber let capacity = result.returnValues._cap - let wd = new db.Withdraw({ - smartContractAddress: config.get('blockchain.validatorAddress'), - blockNumber: blockNumber, - tx: result.transactionHash, - owner: owner, - capacity: capacity - }) - wd.save() - return true + return db.Withdraw.updateOne({ + tx: result.transactionHash + }, { + $set: { + smartContractAddress: config.get('blockchain.validatorAddress'), + blockNumber: blockNumber, + tx: result.transactionHash, + owner: owner, + capacity: capacity + } + }, { upsert: true }) } else { let candidate = (result.returnValues._candidate || '').toLowerCase() let voter = (result.returnValues._voter || '').toLowerCase() @@ -48,18 +53,23 @@ async function watchValidator () { let capacity = result.returnValues._cap let blk = await web3.eth.getBlock(result.blockNumber) let createdAt = moment.unix(blk.timestamp).utc() - let tx = new db.Transaction({ - smartContractAddress: config.get('blockchain.validatorAddress'), - tx: result.transactionHash, - event: result.event, - voter: voter, - owner: owner, - candidate: candidate, - capacity: capacity, - blockNumber: blockNumber, - createdAt: createdAt + await db.Transaction.updateOne({ + tx: result.transactionHash + }, { + $set: { + smartContractAddress: config.get('blockchain.validatorAddress'), + tx: result.transactionHash, + event: result.event, + voter: voter, + owner: owner, + candidate: candidate, + capacity: capacity, + blockNumber: blockNumber, + createdAt: createdAt + } + }, { + upsert: true }) - tx.save() if (result.event === 'Vote' || result.event === 'Unvote') { await updateVoterCap(candidate, voter) } @@ -68,11 +78,16 @@ async function watchValidator () { } await updateCandidateInfo(candidate) } - } + }) + + return Promise.all(map).then(() => { + return web3.eth.getBlockNumber(n => cpValidator = n) + }) + }).catch(e => { + logger.error('watchValidator %s', e) }) } catch (e) { logger.error('watchValidator2 %s', e) - emitter.emit('error', e) } } @@ -82,7 +97,7 @@ async function updateCandidateInfo (candidate) { let owner = (await validator.methods.getCandidateOwner(candidate).call() || '').toLowerCase() let status = await validator.methods.isCandidate(candidate).call() let result - logger.info('Update candidate %s capacity %s %s', candidate, String(capacity), status) + logger.debug('Update candidate %s capacity %s %s', candidate, String(capacity), status) if (candidate !== '0x0000000000000000000000000000000000000000') { result = await db.Candidate.updateOne({ smartContractAddress: config.get('blockchain.validatorAddress'), @@ -116,7 +131,7 @@ async function updateCandidateInfo (candidate) { async function updateVoterCap (candidate, voter) { try { let capacity = await validator.methods.getVoterCap(candidate, voter).call() - logger.info('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity)) + logger.debug('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity)) return await db.Voter.updateOne({ smartContractAddress: config.get('blockchain.validatorAddress'), candidate: candidate, @@ -196,6 +211,9 @@ async function updatePenalties () { await Promise.all(blks.map(blk => getPenalty(blk))) } catch (e) { logger.error('updatePenalties %s', e) + web3 = new Web3Ws() + validator = new Validator(web3) + blockSigner = new BlockSigner(web3) } } @@ -233,35 +251,36 @@ let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time)) async function watchNewBlock () { while (true) { try { - logger.info('Update signers after sleeping 10 seconds') + logger.info('Update signers after sleeping 5 seconds') await updateSigners() await updatePenalties() + await updateLatestSignedBlock() + await watchValidator() } catch (e) { logger.error('watchNewBlock %s', e) - emitter.emit('error', e) } - await sleep(10000) + await sleep(5000) } } async function updateLatestSignedBlock () { try { - let blockNumber = await web3.eth.getBlockNumber() - logger.info('BlockSigner %s - Listen events from block number %s ...', + let blockNumber = cpBlockSigner || await web3.eth.getBlockNumber() + cpBlockSigner = await web3.eth.getBlockNumber() + + logger.debug('BlockSigner %s - Listen events from block number %s ...', config.get('blockchain.blockSignerAddress'), blockNumber) - blockSigner.events.allEvents({ + return blockSigner.getPastEvents('Sign', { fromBlock: blockNumber, toBlock: 'latest' - }, async function (error, result) { - if (error) { - logger.error(error, result) - return false - } else { + }).then(async (events) => { + let map = events.map(event => { + let result = event let signer = result.returnValues._signer let bN = String(result.returnValues._blockNumber) - logger.info('%s sign block %s with tx %s', signer, result.blockNumber, result.transactionHash) + logger.debug('%s sign block %s with tx %s', signer, result.blockNumber, result.transactionHash) - await db.Candidate.updateOne({ + return db.Candidate.updateOne({ smartContractAddress: config.get('blockchain.validatorAddress'), candidate: signer.toLowerCase() }, { @@ -269,10 +288,17 @@ async function updateLatestSignedBlock () { latestSignedBlock: bN } }, { upsert: false }) - } + }) + + + return Promise.all(map).then(() => { + return web3.eth.getBlockNumber(n => cpBlockSigner = n) + }) + }).catch(e => { + logger.error('updateLatestSignedBlock2 %s', e) }) } catch (e) { - emitter.emit('error', e) + logger.error('updateLatestSignedBlock %s', e) } } @@ -281,7 +307,7 @@ async function getPastEvent () { let lastBlockTx = await db.Transaction.findOne().sort({ blockNumber: -1 }) let lb = (lastBlockTx && lastBlockTx.blockNumber) ? lastBlockTx.blockNumber : 0 - logger.info('Get all past event from block', lb, 'to block', blockNumber) + logger.debug('Get all past event from block', lb, 'to block', blockNumber) validator.getPastEvents('allEvents', { fromBlock: lb, toBlock: blockNumber }, async function (error, events) { if (error) { logger.error(error) @@ -333,15 +359,7 @@ db.Candidate.updateMany({}, { $set: { status: 'RESIGNED' } }).then(() => { }).then(() => { return getPastEvent().then(() => { watchNewBlock() - watchValidator() - updateLatestSignedBlock() }) }).catch(e => { - logger.error(e) - process.exit(1) -}) - -emitter.on('error', e => { - logger.error('ERROR!!!', e) - process.exit(1) + logger.error('Start error %s', e) }) diff --git a/models/blockchain/blockSigner.js b/models/blockchain/blockSigner.js index bf3453d5..901f180c 100644 --- a/models/blockchain/blockSigner.js +++ b/models/blockchain/blockSigner.js @@ -1,8 +1,12 @@ 'use strict' const BlockSignerABI = require('../../build/contracts/BlockSigner') -const web3 = require('./web3ws') +const Web3Ws = require('./web3ws') const config = require('config') -const blockSigner = new web3.eth.Contract(BlockSignerABI.abi, config.get('blockchain.blockSignerAddress')) -module.exports = blockSigner +function BlockSigner (web3) { + let blockSigner = new web3.eth.Contract(BlockSignerABI.abi, config.get('blockchain.blockSignerAddress')) + return blockSigner +} + +module.exports = BlockSigner diff --git a/models/blockchain/validator.js b/models/blockchain/validator.js index 61b4f9f1..bcfb61c3 100644 --- a/models/blockchain/validator.js +++ b/models/blockchain/validator.js @@ -1,8 +1,11 @@ 'use strict' const ValidatorABI = require('../../build/contracts/TomoValidator') -const web3 = require('./web3ws') const config = require('config') -const validator = new web3.eth.Contract(ValidatorABI.abi, config.get('blockchain.validatorAddress')) -module.exports = validator +function Validator (web3) { + const validator = new web3.eth.Contract(ValidatorABI.abi, config.get('blockchain.validatorAddress')) + return validator +} + +module.exports = Validator diff --git a/models/blockchain/web3ws.js b/models/blockchain/web3ws.js index 3b70bb10..56f7ec96 100644 --- a/models/blockchain/web3ws.js +++ b/models/blockchain/web3ws.js @@ -2,17 +2,13 @@ const Web3 = require('web3') const config = require('config') +const logger = require('../../helpers/logger') -const provider = new Web3.providers.WebsocketProvider(config.get('blockchain.ws')) -const web3 = new Web3(provider) -const restart = function (e) { - console.error('Connection error', e) - console.error('Restart process') - process.exit(1) +function Web3Ws () { + let provider = new Web3.providers.WebsocketProvider(config.get('blockchain.ws')) + let web3 = new Web3(provider) + return web3 } -provider.on('error', (e) => restart(e)) -provider.on('end', (e) => restart(e)) - -module.exports = web3 +module.exports = Web3Ws From 9252443fbc75479cf66c111bc80c79c186d48a54 Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Thu, 27 Dec 2018 09:00:01 +0000 Subject: [PATCH 2/3] lint fix --- crawl.js | 10 ++++++---- models/blockchain/blockSigner.js | 1 - models/blockchain/web3ws.js | 2 -- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/crawl.js b/crawl.js index 39d84a88..9e810ec8 100644 --- a/crawl.js +++ b/crawl.js @@ -6,7 +6,6 @@ const Web3Ws = require('./models/blockchain/web3ws') const config = require('config') const db = require('./models/mongodb') const BigNumber = require('bignumber.js') -const EventEmitter = require('events').EventEmitter const moment = require('moment') const logger = require('./helpers/logger') @@ -81,7 +80,9 @@ async function watchValidator () { }) return Promise.all(map).then(() => { - return web3.eth.getBlockNumber(n => cpValidator = n) + return web3.eth.getBlockNumber(n => { + cpValidator = n + }) }) }).catch(e => { logger.error('watchValidator %s', e) @@ -290,9 +291,10 @@ async function updateLatestSignedBlock () { }, { upsert: false }) }) - return Promise.all(map).then(() => { - return web3.eth.getBlockNumber(n => cpBlockSigner = n) + return web3.eth.getBlockNumber(n => { + cpBlockSigner = n + }) }) }).catch(e => { logger.error('updateLatestSignedBlock2 %s', e) diff --git a/models/blockchain/blockSigner.js b/models/blockchain/blockSigner.js index 901f180c..a6e271b2 100644 --- a/models/blockchain/blockSigner.js +++ b/models/blockchain/blockSigner.js @@ -1,7 +1,6 @@ 'use strict' const BlockSignerABI = require('../../build/contracts/BlockSigner') -const Web3Ws = require('./web3ws') const config = require('config') function BlockSigner (web3) { diff --git a/models/blockchain/web3ws.js b/models/blockchain/web3ws.js index 56f7ec96..af14edbc 100644 --- a/models/blockchain/web3ws.js +++ b/models/blockchain/web3ws.js @@ -2,8 +2,6 @@ const Web3 = require('web3') const config = require('config') -const logger = require('../../helpers/logger') - function Web3Ws () { let provider = new Web3.providers.WebsocketProvider(config.get('blockchain.ws')) From 7afb74a0ececf01d939e81f48958f5e3216681c7 Mon Sep 17 00:00:00 2001 From: Nguyen Sy Thanh Son Date: Thu, 27 Dec 2018 09:03:44 +0000 Subject: [PATCH 3/3] log info --- crawl.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crawl.js b/crawl.js index 9e810ec8..7f2d429e 100644 --- a/crawl.js +++ b/crawl.js @@ -20,7 +20,7 @@ var cpValidator = 0 async function watchValidator () { try { const blockNumber = cpValidator || await web3.eth.getBlockNumber() - logger.debug('TomoValidator %s - Listen events from block number %s ...', + logger.info('TomoValidator %s - Listen events from block number %s ...', config.get('blockchain.validatorAddress'), blockNumber) return validator.getPastEvents('allEvents', { @@ -269,7 +269,7 @@ async function updateLatestSignedBlock () { let blockNumber = cpBlockSigner || await web3.eth.getBlockNumber() cpBlockSigner = await web3.eth.getBlockNumber() - logger.debug('BlockSigner %s - Listen events from block number %s ...', + logger.info('BlockSigner %s - Listen events from block number %s ...', config.get('blockchain.blockSignerAddress'), blockNumber) return blockSigner.getPastEvents('Sign', { fromBlock: blockNumber,