Skip to content

Commit

Permalink
Improve PS library handling of eth_getLogs calls
Browse files Browse the repository at this point in the history
Related to #75
  • Loading branch information
gabmontes committed Nov 9, 2021
1 parent 4ab7715 commit b6d7874
Showing 1 changed file with 65 additions and 4 deletions.
69 changes: 65 additions & 4 deletions packages/payment-streams/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,75 @@ const createPaymentStreams = function (web3, options = {}) {
)
}

// Helper to split the eth_getLogs calls into chunks and prevent timeouts if
// the node takes too long to respond.
const getPastEventsInChunks = function (contract, event, pastEventOptions) {
const {
fromBlock = contract.options.birthblock,
toBlock: _toBlock,
...restOfPastEventOptions
} = pastEventOptions

return Promise.resolve(_toBlock || web3.eth.getBlockNumber()).then(
function (toBlock) {
debug('Getting past %s events in chunks', event)

// Create an array containing all the chunk's from and to blocks along
// with the rest of the filter options.
const chunkSize = 5760 // 1 day of blocks
const chunksCount = Math.floor((toBlock - fromBlock) / chunkSize) + 1
const chunks = new Array(chunksCount).fill().map(function (_, i) {
const _fromBlock = fromBlock + chunkSize * i
return {
...restOfPastEventOptions,
fromBlock: _fromBlock,
toBlock: Math.min(toBlock, _fromBlock + chunkSize - 1)
}
})

// Then iterate over the chunks sending one logs query at a time to
// allow the nodes to handle even large queries in time.
return chunks.reduce(
(promiseChain, chunk) =>
promiseChain.then(function (events) {
debug(
'Getting chunk from %s to %s',
chunk.fromBlock,
chunk.toBlock
)
return contract
.getPastEvents(event, chunk)
.then(
pTap(function (/** @type {Array} */ newEvents) {
debug('Got %s events', newEvents.length)
})
)
.then(newEvents => events.concat(newEvents))
}),
Promise.resolve([])
)
}
)
}

// Gets all incoming streams by getting past StreamCreated events where the
// payee is the given address.
const getIncomingStreams = function (address) {
debug('Getting all incoming streams of %s', address)
return psfPromise
.then(psf =>
psf.getPastEvents('StreamCreated', {
getPastEventsInChunks(psf, 'StreamCreated', {
fromBlock: psf.options.birthblock,
filter: { payee: address }
})
)
.then(events =>
Promise.all(events.map(e => e.returnValues.id).map(getStream))
Promise.all(
events
.filter(e => !e.removed)
.map(e => e.returnValues.id)
.map(getStream)
)
)
.then(
pTap(function (streams) {
Expand All @@ -181,13 +237,18 @@ const createPaymentStreams = function (web3, options = {}) {
debug('Getting all outgoing streams of %s', address)
return psfPromise
.then(psf =>
psf.getPastEvents('StreamCreated', {
getPastEventsInChunks(psf, 'StreamCreated', {
fromBlock: psf.options.birthblock,
filter: { payer: address }
})
)
.then(events =>
Promise.all(events.map(e => e.returnValues.id).map(getStream))
Promise.all(
events
.filter(e => !e.removed)
.map(e => e.returnValues.id)
.map(getStream)
)
)
.then(
pTap(function (streams) {
Expand Down

0 comments on commit b6d7874

Please sign in to comment.