-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chain streaming interfaces #102
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
854f42d
to
f608c3e
Compare
a7e348f
to
17d4e70
Compare
17d4e70
to
1110a02
Compare
type contractConfig struct { | ||
fromBlock int | ||
contractAddress common.Address | ||
topics []common.Hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these topics the same as the topics that users would query by? Is this included so that users can query the blockchain directly? For nodes, I imagine they would just firehose everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, "topics" is an ethereum event term as well. In this case, the topic is actually the hash of the signature of the event itself so that the indexer can pull the right events in contracts that might emit multiple event types.
} | ||
|
||
highestBlockCanProcess := int(highestBlock) - LAG_FROM_HIGHEST_BLOCK | ||
numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it +1 here? Is the fromBlock already processed, or not processed yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the number of blocks you want to have in a single query. So even if you have from: 5, to: 5
you are actually processing 1 block since it seems both the from and to are inclusive.
return nil, nil, err | ||
} | ||
|
||
nextBlockNumber := to + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so it looks like the fromBlock
is not processed? I wonder if it would be easier to understand/math if we use the cursor pattern and make this lastSeenBlock
instead of fromBlock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fromBlock
should get processed here. Both from and to are inclusive in ethereum
topics := [][]common.Hash{} | ||
for _, topic := range contractConfig.topics { | ||
topics = append(topics, []common.Hash{topic}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noob question, why is this a 2D array? What does each dimension represent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. Here's the docs around it.
https://pkg.go.dev/github.com/ethereum/go-ethereum#FilterQuery
func (r *RpcLogStreamer) watchContract(watcher contractConfig) { | ||
fromBlock := int(watcher.fromBlock) | ||
logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex())) | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an infinite poll loop without any interval between polls right? Curious if there's a rationale for having no interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out. I can make it sleep once it gets to the point of having no new logs.
No pauses makes sense when you are backfilling historical data, but it's way too aggressive once you are caught up.
tl;dr
Notes
There are many ways to get messages from a blockchain, each with trade-offs.
eth_getLogs
on an archive node to retrieve log messageseth_getLogs
on an archive node to retrieve old log messages, then subscribe for new log messages in real-timeI have opted for 1 here, since it's the simplest and most reliable. We can move to 2 if we want to improve latency for new blocks.
Moving to 3 is more complicated. StreamingFast has a Firehose package that can be used on top of a node to rapidly ingest logs, and deal with complicated cases like reorgs. But that takes a bunch of infrastructure that I'm not sure node operators are going to be up to. Options 1 and 2 work with any blockchain node.
It may be possible to work with StreamingFast to get a hosted Firehose deployed that node operators can work with, possibly via Substreams to sink directly into a DB. But that's going to take time.