Skip to content

Commit

Permalink
Use polling instead subscription in Kusama (#406)
Browse files Browse the repository at this point in the history
* uncomment polkadot

* bug fixes

* pollForNewBlock

* mod getBlockHeight

* add again commented api disconnect to not forget

* enqueueAndPublishBlockAdded

* re-subscribe on error

* stringify error

* stringify errors

* api.isReady

* Add delay to newBlockHandler

* Ana/add balances to kusama (#405)

* rudimentary balance

* add some comment

* Ana/switch polkadot to rest calls (#408)

* add rewards and overview functions. not working

* poll instead of subscribe to blocks

* switch over to http provider

* fix lint errors

* use httpprovider

* getting last block hash

* get blockhash and header. but no height

* back to normal. use mario node subscription

* successfully get block height polling with http

* get block hash from http. use polling

* uncomment all other networks

* pass the store to source classes

* create the api only once

* comment api.disconnect()

* added polling fix and quick querying

* fixed block height query

* clean up

* comment

* cleanup and optimization

Co-authored-by: Fabian Weber <[email protected]>
Co-authored-by: mariopino <[email protected]>

* remove unused dependency

Co-authored-by: Bitcoinera <[email protected]>
Co-authored-by: Ana G <[email protected]>
Co-authored-by: Fabian Weber <[email protected]>
  • Loading branch information
4 people authored Mar 8, 2020
1 parent 6e5fe7c commit 68744d5
Show file tree
Hide file tree
Showing 6 changed files with 711 additions and 89 deletions.
70 changes: 35 additions & 35 deletions data/networks.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,43 +186,43 @@ module.exports = [
enabled: true,
icon: 'https://app.lunie.io/img/networks/emoney-testnet.png',
slug: 'emoney-testnet'
},
{
id: 'polkadot-testnet',
title: 'Kusama',
chain_id: 'kusama-cc3',
api_url: 'https://host-01.polkascan.io/kusama/api/v1/',
rpc_url: 'wss://kusama-rpc.polkadot.io/',
bech32_prefix: ' ',
address_prefix: ' ',
ledger_app: 'polkadot',
address_creator: 'polkadot',
network_type: 'polkadot',
source_class_name: 'source/polkadotV0-source',
block_listener_class_name: 'block-listeners/polkadot-node-subscription',
testnet: true,
feature_session: true,
feature_explore: true,
feature_portfolio: true,
feature_validators: true,
feature_proposals: false,
feature_activity: false,
feature_explorer: false,
action_send: true,
action_claim_rewards: false,
action_delegate: true,
action_redelegate: true,
action_undelegate: true,
action_deposit: false,
action_vote: false,
action_proposal: false,
default: false,
stakingDenom: 'KSM',
enabled: true,
icon: 'https://app.lunie.io/img/networks/polkadot-testnet.png',
slug: 'kusama'
}
// {
// id: 'polkadot-testnet',
// title: 'Kusama',
// chain_id: 'kusama-cc3',
// api_url: 'https://host-01.polkascan.io/kusama/api/v1/',
// rpc_url: 'wss://kusama-rpc.polkadot.io/',
// bech32_prefix: ' ',
// address_prefix: ' ',
// ledger_app: 'polkadot',
// address_creator: 'polkadot',
// network_type: 'polkadot',
// source_class_name: 'source/polkadotV0-source',
// block_listener_class_name: 'block-listeners/polkadot-node-subscription',
// testnet: true,
// feature_session: true,
// feature_explore: true,
// feature_portfolio: true,
// feature_validators: true,
// feature_proposals: false,
// feature_activity: false,
// feature_explorer: false,
// action_send: true,
// action_claim_rewards: false,
// action_delegate: true,
// action_redelegate: true,
// action_undelegate: true,
// action_deposit: false,
// action_vote: false,
// action_proposal: false,
// default: false,
// stakingDenom: 'KSM',
// enabled: true,
// icon: 'https://app.lunie.io/img/networks/polkadot-testnet.png',
// slug: 'kusama'
// }
// {
// id: 'livepeer-mainnet',
// title: 'Livepeer',
// chain_id: 'ethereum-1',
Expand Down
73 changes: 46 additions & 27 deletions lib/block-listeners/polkadot-node-subscription.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const _ = require('lodash')
const { ApiPromise, WsProvider } = require('@polkadot/api')
const {
publishBlockAdded
// publishUserTransactionAdded,
Expand All @@ -8,12 +9,15 @@ const Sentry = require('@sentry/node')
const database = require('../database')
const config = require('../../config.js')

const POLLING_INTERVAL = 1000
// const NEW_BLOCK_DELAY = 2000

// This class polls for new blocks
// Used for listening to events, such as new blocks.
class PolkadotNodeSubscription {
constructor(network, PolkadotApiClass, store) {
this.network = network
this.polkadotAPI = new PolkadotApiClass(network)
this.polkadotAPI = new PolkadotApiClass(network, store)
this.store = store
this.validators = []
this.sessionValidators = []
Expand All @@ -22,36 +26,52 @@ class PolkadotNodeSubscription {
this.height = 0
this.currentSessionIndex = 0
this.blockQueue = []
this.subscribeForNewBlock()
this.pollForNewBlock()
}

// here we init the polkadot rpc once for all processes
// the class gets stored in the store to be used by all instances
async initPolkadotRPC() {
this.api = new ApiPromise({
provider: new WsProvider(this.network.rpc_url)
})
this.store.polkadotRPC = this.api
await this.api.isReady
}

async subscribeForNewBlock() {
const api = await this.polkadotAPI.getAPIPromise()
// poll latest block height and handle all blocks unknown to this API yet in order
async checkForNewBlock() {
try {
const blockHeight = await this.polkadotAPI.getBlockHeight()

// if we get a newer block then expected query for all the outstanding blocks
while (blockHeight > this.height) {
this.height = this.height ? this.height++ : blockHeight

this.newBlockHandler(this.height)

// Subscribe to new block headers
await api.rpc.chain.subscribeNewHeads(async blockHeader => {
const blockHeight = blockHeader.number.toNumber()
if (this.height < blockHeight) {
this.height = blockHeight
console.log(`\x1b[36mNew kusama block #${blockHeight}\x1b[0m`)
this.newBlockHandler(blockHeight)
// we are safe, that the chain produced a block so it didn't hang up
if (this.chainHangup) clearTimeout(this.chainHangup)
}
})
} catch (error) {
console.error('Failed to check for a new block', error)
Sentry.captureException(error)
}
}

// Sometimes blocks get published unordered so we need to enqueue
// them before publish to ensure correct order. This adds 3 blocks delay.
enqueueAndPublishBlockAdded(newBlock) {
this.blockQueue.push(newBlock)
if (this.blockQueue.length > 2) {
this.blockQueue.sort((a, b) =>
a.height > b.height ? 1 : b.height > a.height ? -1 : 0
)
console.log(
`\x1b[36mPublishing new kusama block #${newBlock.height}\x1b[0m`
)
publishBlockAdded(this.network.id, this.blockQueue.shift())
async pollForNewBlock() {
// here we init the polkadot rpc once for all processes
if (!this.api) {
await this.initPolkadotRPC()
}

// immediatly check and not wait the polling delay
await this.checkForNewBlock()

this.pollingTimeout = setTimeout(async () => {
await this.checkForNewBlock()
this.pollForNewBlock()
}, POLLING_INTERVAL)
}

// For each block event, we fetch the block information and publish a message.
Expand All @@ -64,8 +84,7 @@ class PolkadotNodeSubscription {
})

const block = await this.polkadotAPI.getBlockByHeight(blockHeight)
// publishBlockAdded(this.network.id, block)
this.enqueueAndPublishBlockAdded(block)
publishBlockAdded(this.network.id, block)

// We dont need to fetch validators on every new block.
// Validator list only changes on new sessions
Expand Down Expand Up @@ -106,7 +125,7 @@ class PolkadotNodeSubscription {
// publishEvent(this.network.id, 'transaction', address, tx)
// })
} catch (error) {
console.error(`newBlockHandler failed: ${error}`)
console.error(`newBlockHandler failed`, JSON.stringify(error, null, 2))
Sentry.captureException(error)
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/network-container.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class NetworkContainer {
createDataSource() {
if (this.sourceClass) {
return {
api: new this.sourceClass(this.network),
api: new this.sourceClass(this.network, this.store),
store: this.store
}
}
Expand Down
19 changes: 18 additions & 1 deletion lib/reducers/polkadotV0-reducers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
const BigNumber = require('bignumber.js')

const POLKADOT_CONVERSION = 1000000000000

function blockReducer(
networkId,
blockHeight,
Expand Down Expand Up @@ -49,7 +53,20 @@ function validatorReducer(network, validator) {
}
}

function balanceReducer(balance) {
// hack. We convert the balance into an Array to make it an Iterable
return [
{
amount: BigNumber(balance)
.div(POLKADOT_CONVERSION)
.toFixed(4),
denom: `KSM` // hardcoded for now. Looking how to do a more complete query
}
]
}

module.exports = {
blockReducer,
validatorReducer
validatorReducer,
balanceReducer
}
70 changes: 50 additions & 20 deletions lib/source/polkadotV0-source.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
const { ApiPromise, WsProvider } = require('@polkadot/api')
const BigNumber = require('bignumber.js')

class polkadotAPI {
constructor(network) {
constructor(network, store) {
this.network = network
this.setReducers()
this.store = store
}

setReducers() {
this.reducers = require('../reducers/polkadotV0-reducers')
}

async getAPIPromise() {
return await ApiPromise.create({
provider: new WsProvider(this.network.rpc_url)
})
// rpc initialization is async so we always need to assume we need to wait for it to be initialized
async getAPI() {
const api = this.store.polkadotRPC
await api.isReady
return api
}

async getBlockByHeight(blockHeight) {
const api = await this.getAPIPromise()
async getBlockHeight() {
const api = await this.getAPI()
const block = await api.rpc.chain.getBlock()
return JSON.parse(JSON.stringify(block.block.header.number))
}

const [blockHash, sessionIndex] = await Promise.all([
api.rpc.chain.getBlockHash(blockHeight),
async getBlockByHeight(blockHeight) {
const api = await this.getAPI()

// heavy nesting to provide optimal parallelization here
const [
[{ author }, blockEvents, blockHash],
sessionIndex
] = await Promise.all([
api.rpc.chain.getBlockHash(blockHeight).then(async blockHash => {
const [{ author }, blockEvents] = await Promise.all([
api.derive.chain.getHeader(blockHash),
api.query.system.events.at(blockHash)
])

return [{ author }, blockEvents, blockHash]
}),
api.query.babe.epochIndex()
])

const [{ author }, blockEvents] = await Promise.all([
api.derive.chain.getHeader(blockHash),
api.query.system.events.at(blockHash)
])

api.disconnect()

return this.reducers.blockReducer(
this.networkId,
blockHeight,
Expand Down Expand Up @@ -99,7 +110,7 @@ class polkadotAPI {
}

async getAllValidators() {
const api = await this.getAPIPromise()
const api = await this.getAPI()

// Fetch validator addresses for current session
const validatorAddresses = await api.query.session.validators()
Expand Down Expand Up @@ -145,8 +156,6 @@ class polkadotAPI {
}
})

api.disconnect()

return validators.map(validator =>
this.reducers.validatorReducer(this.network, validator)
)
Expand All @@ -156,6 +165,27 @@ class polkadotAPI {
async getExpectedReturns() {
return 0
}

async getBalancesFromAddress(address) {
const api = await this.getAPI()
// the documentation points at api.query.system.account as the preferred query. Looking into it
const balance = await api.query.balances.freeBalance(address)
return this.reducers.balanceReducer(JSON.stringify(balance))
}

async getRewards(delegatorAddress) {
const api = await this.getAPI()
const rewards = api.derive.staking.stakerRewards(delegatorAddress)
// currently not working. It always returns []
console.log('REWARDS', rewards)
return rewards
}

async getOverview(delegatorAddress) {
const api = await this.getAPI()
const accountBalances = await api.query.staking.bonded(delegatorAddress) // or stashId?
console.log('ACCOUNT', JSON.stringify(accountBalances))
}
}

module.exports = polkadotAPI
Loading

0 comments on commit 68744d5

Please sign in to comment.