-
Notifications
You must be signed in to change notification settings - Fork 269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat! content addressable transaction pool #935
Conversation
…n message passing (#875) This commit adds two metrics AlreadySeenTxs and SuccessfulTxs to both mempool implementations. If there is a high ratio of AlreadySeenTxs to total txs this indicates a high degree of duplication in the messages sent across the wire and validates the hypothesis that a content addressable network would be more advantageous in saving bandwidth (especially given the expected size of a tx). Co-authored-by: Rootul P <[email protected]>
Wondering how much code here was copied from the existing implementations |
v2 started as a complete copy of v1 so there's a significant amount although the protocol does make some substantial changes to information flow. For example, you no longer have a go routine for every peer you connect with. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 14/41 files but pausing review so I wanted to submit this first batch of comments.
Since the cat mempool code seems inspired by the priority mempool I wonder if we need to address any of the performance issues Sergio alluded to in informalsystems/audit-celestia#36
mempool/cat/cache.go
Outdated
// GetList returns the underlying linked-list that backs the LRU cache. Note, | ||
// this should be used for testing purposes only! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] since cache_test.go
lives inside the cat
package, what are you thoughts on un-exporting this method?
The question is motived by an effort to enforce: "Note, this should be used for testing purposes only!"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
mempool/cat/pool.go
Outdated
// - We send multiple requests and the first peer eventually responds after the second peer has already | ||
if txmp.IsRejectedTx(key) { | ||
// The peer has sent us a transaction that we have previously marked as invalid. Since `CheckTx` can | ||
// be non-deterministic, we don't punish the peer but instead just ignore the msg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I think the mempool only deals with transactions and doesn't inspect the contents (i.e. messages)
// be non-deterministic, we don't punish the peer but instead just ignore the msg | |
// be non-deterministic, we don't punish the peer but instead just ignore the tx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to take a second look at the tests, but overall this is really really good. The spec and the ADR are excellently written. Really top shelf stuff.
if err != nil { | ||
// TODO: find a more polite way of handling this error | ||
panic(err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional]
perhaps just a simple error log and os.Exit
call?
// Add jitter to when the node broadcasts it's seen txs to stagger when nodes | ||
// in the network broadcast their seenTx messages. | ||
time.Sleep(time.Duration(rand.Intn(10)*10) * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question]
how critical is this to the protocol working? What about natural network latencies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-critical. We could remove it. Staggering when we send these messages may reduce the amount we need to send but it might mean in the worst case that a node becomes aware of transactions existence later than necessary.
func (r *requestScheduler) ForTx(key types.TxKey) uint16 { | ||
r.mtx.Lock() | ||
defer r.mtx.Unlock() | ||
|
||
return r.requestsByTx[key] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment]
fwiw I found this name confusing, not sure I have a better suggestion tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, just the name of the method
} | ||
|
||
numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge) | ||
txmp.metrics.EvictedTxs.Add(float64(numExpired)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question]
are we adding to the evicted transaction metrics here but not adding them to the evicted cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I remember thinking that there's a difference between eviction due to a TTL and eviction due to overflow but now I'm not sure if I agree with my earlier self.
We've been running the priority mempool in 100 node networks without any noticeable problems to performance. If there are any known problems then let me know and we can work them out. I'm also indifferent if we want to base CAT off the FIFO mempool. My understanding was that priority based block production and eviction was an important design requirement. |
@@ -184,6 +185,16 @@ func TestReactorWithEvidence(t *testing.T) { | |||
mempoolv1.WithPreCheck(sm.TxPreCheck(state)), | |||
mempoolv1.WithPostCheck(sm.TxPostCheck(state)), | |||
) | |||
case cfg.MempoolV2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking] this switch statement for defining the mempool can probably be refactored into a helper. I think I've seen it 3x already.
@@ -0,0 +1,97 @@ | |||
package cat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking] to properly test the threadsafe nature of the cache we should add a test that spins up a number of go rountines and goes through the various actions.
Something like, goroutine A is Adding txns, goroutine B is removing txns, goroutine C is checking and evicting txns. They all have random sleeps in between actions, and then then a common done channel that closes them and the test down after 10s or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial thoughts 12/42 files reviewed.
mempool/cat/cache.go
Outdated
mtx tmsync.Mutex | ||
size int | ||
cacheMap map[types.TxKey]*list.Element | ||
list *list.List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding this correctly, the size is static. I'm not sure if we have any standards around prefixes cc @evan-forbes but I'd recommend something like static
so that it is clear that this field doesn't require acquiring the mutex in order to access it.
Similarly the list should also have the static
prefix as it manages its own mutex and doesn't require the LRUTxCache
mutex for its operations to be threadsafe.
mtx tmsync.Mutex | |
size int | |
cacheMap map[types.TxKey]*list.Element | |
list *list.List | |
staticSize int | |
staticList *list.List | |
mtx tmsync.Mutex | |
cacheMap map[types.TxKey]*list.Element |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're right. I think the convention is to place the mutex only above the fields that access it. Here this was just copied it from the v1 mempool implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly the list should also have the static prefix as it manages its own mutex and doesn't require the LRUTxCache mutex for its operations to be threadsafe.
list.List
does not have it's own mutex and is managed by the LRUTxCache's mutex so I will leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently going through some of the pull requests that are related to my upcoming tasks, to familiarize myself with the codebase. I'm not able to give a detailed review at this time, as I am still gathering more information and background. I'll make some non-critical suggestions and comments as I read through the code. :)
|
||
mtx tmsync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mtx tmsync.Mutex | |
mtx tmsync.Mutex |
} | ||
} | ||
|
||
func (c *LRUTxCache) Reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some brief function description for the LRUTxCache methods can be helpful.
delete(c.cacheMap, txKey) | ||
|
||
if e != nil { | ||
c.list.Remove(e) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious to know why the delete
operation is not moved to its following if
block? the current version does not error out as delete
is a no-op for the invalid and non-exiting keys, but logically it would make sense for both Remove
and delete
operations to be conditioned to e
being nil
, wdyt?
} | ||
|
||
func (c *LRUTxCache) Remove(txKey types.TxKey) { | ||
c.mtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following what has been implemented in the Has
method (where it checks the cache size), a similar check can be done in here too:
c.mtx.Lock() | |
if c.staticSize == 0 { | |
return | |
} | |
c.mtx.Lock() |
Closing this as all the constituent parts have been completed |
Closes: #884