Skip to content

Commit

Permalink
Merge pull request #421 from thanhson1085/master
Browse files Browse the repository at this point in the history
 persitence crawler
  • Loading branch information
thanhson1085 authored Dec 27, 2018
2 parents 6a07855 + 555e9ec commit a55e56e
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 78 deletions.
142 changes: 81 additions & 61 deletions crawl.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,74 @@
'use strict'

const validator = require('./models/blockchain/validator')
const blockSigner = require('./models/blockchain/blockSigner')
const web3 = require('./models/blockchain/web3ws')
const Validator = require('./models/blockchain/validator')
const BlockSigner = require('./models/blockchain/blockSigner')
const Web3Ws = require('./models/blockchain/web3ws')
const config = require('config')
const db = require('./models/mongodb')
const BigNumber = require('bignumber.js')
const EventEmitter = require('events').EventEmitter
const moment = require('moment')
const logger = require('./helpers/logger')
const emitter = new EventEmitter()

process.setMaxListeners(100)

var web3 = new Web3Ws()
var validator = new Validator(web3)
var blockSigner = new BlockSigner(web3)
var cpBlockSigner = 0
var cpValidator = 0

async function watchValidator () {
try {
const blockNumber = await web3.eth.getBlockNumber()
const blockNumber = cpValidator || await web3.eth.getBlockNumber()
logger.info('TomoValidator %s - Listen events from block number %s ...',
config.get('blockchain.validatorAddress'), blockNumber)

validator.events.allEvents({
return validator.getPastEvents('allEvents', {
fromBlock: blockNumber,
toBlock: 'latest'
}, async function (error, result) {
if (error) {
logger.error('watchValidator %s %s', error, result)
return false
} else {
logger.info('Event %s in block %s', result.event, result.blockNumber)
}).then(async events => {
let map = events.map(async (event) => {
let result = event
logger.debug('Event %s in block %s', result.event, result.blockNumber)
if (result.event === 'Withdraw') {
let owner = (result.returnValues._owner || '').toLowerCase()
let blockNumber = result.blockNumber
let capacity = result.returnValues._cap
let wd = new db.Withdraw({
smartContractAddress: config.get('blockchain.validatorAddress'),
blockNumber: blockNumber,
tx: result.transactionHash,
owner: owner,
capacity: capacity
})
wd.save()
return true
return db.Withdraw.updateOne({
tx: result.transactionHash
}, {
$set: {
smartContractAddress: config.get('blockchain.validatorAddress'),
blockNumber: blockNumber,
tx: result.transactionHash,
owner: owner,
capacity: capacity
}
}, { upsert: true })
} else {
let candidate = (result.returnValues._candidate || '').toLowerCase()
let voter = (result.returnValues._voter || '').toLowerCase()
let owner = (result.returnValues._owner || '').toLowerCase()
let capacity = result.returnValues._cap
let blk = await web3.eth.getBlock(result.blockNumber)
let createdAt = moment.unix(blk.timestamp).utc()
let tx = new db.Transaction({
smartContractAddress: config.get('blockchain.validatorAddress'),
tx: result.transactionHash,
event: result.event,
voter: voter,
owner: owner,
candidate: candidate,
capacity: capacity,
blockNumber: blockNumber,
createdAt: createdAt
await db.Transaction.updateOne({
tx: result.transactionHash
}, {
$set: {
smartContractAddress: config.get('blockchain.validatorAddress'),
tx: result.transactionHash,
event: result.event,
voter: voter,
owner: owner,
candidate: candidate,
capacity: capacity,
blockNumber: blockNumber,
createdAt: createdAt
}
}, {
upsert: true
})
tx.save()
if (result.event === 'Vote' || result.event === 'Unvote') {
await updateVoterCap(candidate, voter)
}
Expand All @@ -68,11 +77,18 @@ async function watchValidator () {
}
await updateCandidateInfo(candidate)
}
}
})

return Promise.all(map).then(() => {
return web3.eth.getBlockNumber(n => {
cpValidator = n
})
})
}).catch(e => {
logger.error('watchValidator %s', e)
})
} catch (e) {
logger.error('watchValidator2 %s', e)
emitter.emit('error', e)
}
}

Expand All @@ -82,7 +98,7 @@ async function updateCandidateInfo (candidate) {
let owner = (await validator.methods.getCandidateOwner(candidate).call() || '').toLowerCase()
let status = await validator.methods.isCandidate(candidate).call()
let result
logger.info('Update candidate %s capacity %s %s', candidate, String(capacity), status)
logger.debug('Update candidate %s capacity %s %s', candidate, String(capacity), status)
if (candidate !== '0x0000000000000000000000000000000000000000') {
result = await db.Candidate.updateOne({
smartContractAddress: config.get('blockchain.validatorAddress'),
Expand Down Expand Up @@ -116,7 +132,7 @@ async function updateCandidateInfo (candidate) {
async function updateVoterCap (candidate, voter) {
try {
let capacity = await validator.methods.getVoterCap(candidate, voter).call()
logger.info('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity))
logger.debug('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity))
return await db.Voter.updateOne({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: candidate,
Expand Down Expand Up @@ -196,6 +212,9 @@ async function updatePenalties () {
await Promise.all(blks.map(blk => getPenalty(blk)))
} catch (e) {
logger.error('updatePenalties %s', e)
web3 = new Web3Ws()
validator = new Validator(web3)
blockSigner = new BlockSigner(web3)
}
}

Expand Down Expand Up @@ -233,46 +252,55 @@ let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time))
async function watchNewBlock () {
while (true) {
try {
logger.info('Update signers after sleeping 10 seconds')
logger.info('Update signers after sleeping 5 seconds')
await updateSigners()
await updatePenalties()
await updateLatestSignedBlock()
await watchValidator()
} catch (e) {
logger.error('watchNewBlock %s', e)
emitter.emit('error', e)
}
await sleep(10000)
await sleep(5000)
}
}

async function updateLatestSignedBlock () {
try {
let blockNumber = await web3.eth.getBlockNumber()
let blockNumber = cpBlockSigner || await web3.eth.getBlockNumber()
cpBlockSigner = await web3.eth.getBlockNumber()

logger.info('BlockSigner %s - Listen events from block number %s ...',
config.get('blockchain.blockSignerAddress'), blockNumber)
blockSigner.events.allEvents({
return blockSigner.getPastEvents('Sign', {
fromBlock: blockNumber,
toBlock: 'latest'
}, async function (error, result) {
if (error) {
logger.error(error, result)
return false
} else {
}).then(async (events) => {
let map = events.map(event => {
let result = event
let signer = result.returnValues._signer
let bN = String(result.returnValues._blockNumber)
logger.info('%s sign block %s with tx %s', signer, result.blockNumber, result.transactionHash)
logger.debug('%s sign block %s with tx %s', signer, result.blockNumber, result.transactionHash)

await db.Candidate.updateOne({
return db.Candidate.updateOne({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: signer.toLowerCase()
}, {
$set: {
latestSignedBlock: bN
}
}, { upsert: false })
}
})

return Promise.all(map).then(() => {
return web3.eth.getBlockNumber(n => {
cpBlockSigner = n
})
})
}).catch(e => {
logger.error('updateLatestSignedBlock2 %s', e)
})
} catch (e) {
emitter.emit('error', e)
logger.error('updateLatestSignedBlock %s', e)
}
}

Expand All @@ -281,7 +309,7 @@ async function getPastEvent () {
let lastBlockTx = await db.Transaction.findOne().sort({ blockNumber: -1 })
let lb = (lastBlockTx && lastBlockTx.blockNumber) ? lastBlockTx.blockNumber : 0

logger.info('Get all past event from block', lb, 'to block', blockNumber)
logger.debug('Get all past event from block', lb, 'to block', blockNumber)
validator.getPastEvents('allEvents', { fromBlock: lb, toBlock: blockNumber }, async function (error, events) {
if (error) {
logger.error(error)
Expand Down Expand Up @@ -333,15 +361,7 @@ db.Candidate.updateMany({}, { $set: { status: 'RESIGNED' } }).then(() => {
}).then(() => {
return getPastEvent().then(() => {
watchNewBlock()
watchValidator()
updateLatestSignedBlock()
})
}).catch(e => {
logger.error(e)
process.exit(1)
})

emitter.on('error', e => {
logger.error('ERROR!!!', e)
process.exit(1)
logger.error('Start error %s', e)
})
9 changes: 6 additions & 3 deletions models/blockchain/blockSigner.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const BlockSignerABI = require('../../build/contracts/BlockSigner')
const web3 = require('./web3ws')
const config = require('config')
const blockSigner = new web3.eth.Contract(BlockSignerABI.abi, config.get('blockchain.blockSignerAddress'))

module.exports = blockSigner
function BlockSigner (web3) {
let blockSigner = new web3.eth.Contract(BlockSignerABI.abi, config.get('blockchain.blockSignerAddress'))
return blockSigner
}

module.exports = BlockSigner
9 changes: 6 additions & 3 deletions models/blockchain/validator.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const ValidatorABI = require('../../build/contracts/TomoValidator')
const web3 = require('./web3ws')
const config = require('config')
const validator = new web3.eth.Contract(ValidatorABI.abi, config.get('blockchain.validatorAddress'))

module.exports = validator
function Validator (web3) {
const validator = new web3.eth.Contract(ValidatorABI.abi, config.get('blockchain.validatorAddress'))
return validator
}

module.exports = Validator
16 changes: 5 additions & 11 deletions models/blockchain/web3ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@
const Web3 = require('web3')
const config = require('config')

const provider = new Web3.providers.WebsocketProvider(config.get('blockchain.ws'))
const web3 = new Web3(provider)

const restart = function (e) {
console.error('Connection error', e)
console.error('Restart process')
process.exit(1)
function Web3Ws () {
let provider = new Web3.providers.WebsocketProvider(config.get('blockchain.ws'))
let web3 = new Web3(provider)
return web3
}

provider.on('error', (e) => restart(e))
provider.on('end', (e) => restart(e))

module.exports = web3
module.exports = Web3Ws

0 comments on commit a55e56e

Please sign in to comment.