Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat! content addressable transaction pool #935

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
18db218
specification and implementation for content addressable transaction …
cmwaters Dec 2, 2022
cf6b66a
fix tests
cmwaters Dec 12, 2022
c39fb34
v2 transaction pool: add pull capability (#2)
cmwaters Dec 12, 2022
6d11e17
fix txpool test
cmwaters Dec 12, 2022
4cc87dc
add logic to handle malleated txs in Update
cmwaters Dec 12, 2022
dc66d27
add already seen tx metrics to both mempools to monitor duplication i…
cmwaters Dec 15, 2022
fc4fb61
fix nill pointer dereference
cmwaters Dec 19, 2022
88cf0e4
fix panic in map
cmwaters Dec 19, 2022
4cbf2eb
add more mempool metrics
cmwaters Dec 20, 2022
f317fb9
e2e: add ability to configure network connectivity
cmwaters Dec 20, 2022
6479e85
rerequest txs after disconnecting with a peer
cmwaters Dec 20, 2022
8725f21
fix panics and races
cmwaters Dec 21, 2022
7830047
fix recursion bug on exit
cmwaters Jan 2, 2023
3fa6e40
remove from field from seen tx
cmwaters Jan 12, 2023
4e06a09
complete initial draft of ADR009
cmwaters Jan 12, 2023
7ad7910
add more tests and fix p2p stuff after patch
cmwaters Jan 12, 2023
65673f6
remove adr
cmwaters Jan 12, 2023
6b01143
Merge branch 'v0.34.x-celestia' into feature/cat
cmwaters Jan 12, 2023
d234299
remove v1.toml
cmwaters Jan 12, 2023
9ea16f7
make proto-gen
cmwaters Jan 12, 2023
a0e71b6
apply suggestions
cmwaters Jan 13, 2023
8a444b4
apply matt's suggestions
cmwaters Jan 16, 2023
5f0236e
fix some tests
cmwaters Jan 16, 2023
9f740b8
Merge branch 'v0.34.x-celestia' into feature/cat
cmwaters Jan 16, 2023
2974d1f
add concurrency tests to the cache
cmwaters Jan 16, 2023
e55688f
add missing defer statements
cmwaters Jan 16, 2023
41e3dca
add small test
cmwaters Jan 16, 2023
37366e3
Merge branch 'feature/cat' of github.com:celestiaorg/celestia-core in…
cmwaters Jan 16, 2023
1ce8926
initial draft of cat pool ADR (#936)
cmwaters Feb 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions abci/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
// Default is v0.
MempoolV0 = "v0"
MempoolV1 = "v1"
MempoolV2 = "v2"
)

// NOTE: Most of the structs & relevant comments + the
Expand Down Expand Up @@ -687,9 +688,7 @@ type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - (default) FIFO mempool.
// 2) "v1" - prioritized mempool.
// WARNING: There's a known memory leak with the prioritized mempool
// that the team are working on. Read more here:
// https://github.com/tendermint/tendermint/issues/8775
// 3) "v2" - content addressable transaction pool
Version string `mapstructure:"version"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Expand Down Expand Up @@ -735,7 +734,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV0,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
# Mempool version to use:
# 1) "v0" - (default) FIFO mempool.
# 2) "v1" - prioritized mempool.
# 3) "v2" - content addressable transaction pool
version = "{{ .Mempool.Version }}"

recheck = {{ .Mempool.Recheck }}
Expand Down
10 changes: 10 additions & 0 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool"

cfg "github.com/tendermint/tendermint/config"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -88,6 +89,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}

if thisConfig.Consensus.WaitForTxs() {
Expand Down
17 changes: 16 additions & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -418,6 +419,17 @@ func newStateWithConfigAndBlockStore(
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
logger := consensusLogger()
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithMetrics(memplMetrics),
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand Down Expand Up @@ -706,7 +718,10 @@ func consensusLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
index, ok := keyvals[i+1].(int)
if ok {
return term.FgBgColor{Fg: term.Color(uint8(index + 1))}
}
}
}
return term.FgBgColor{}
Expand Down
42 changes: 17 additions & 25 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,12 @@ func TestMempoolRmBadTx(t *testing.T) {
assert.True(t, len(resCommit.Data) > 0)

emptyMempoolCh := make(chan struct{})
checkTxRespCh := make(chan struct{})
go func() {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
}
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
require.Error(t, err)

// check for the tx
for {
Expand All @@ -182,18 +172,8 @@ func TestMempoolRmBadTx(t *testing.T) {
}
}()

// Wait until the tx returns
ticker := time.After(time.Second * 5)
select {
case <-checkTxRespCh:
// success
case <-ticker:
t.Errorf("timed out waiting for tx to return")
return
}

// Wait until the tx is removed
ticker = time.After(time.Second * 5)
ticker := time.After(time.Second * 5)
select {
case <-emptyMempoolCh:
// success
Expand Down Expand Up @@ -232,10 +212,20 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
if req.Type == abci.CheckTxType_Recheck {
if txValue >= uint64(app.txCount) {
return abci.ResponseCheckTx{Code: code.CodeTypeOK}
}
return abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue),
}
}
if txValue != uint64(app.mempoolTxCount) {
return abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)}
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue),
}
}
app.mempoolTxCount++
return abci.ResponseCheckTx{Code: code.CodeTypeOK}
Expand All @@ -248,7 +238,9 @@ func txAsUint64(tx []byte) uint64 {
}

func (app *CounterApplication) Commit() abci.ResponseCommit {
app.mempoolTxCount = app.txCount
if app.mempoolTxCount < app.txCount {
app.mempoolTxCount = app.txCount
}
if app.txCount == 0 {
return abci.ResponseCommit{}
}
Expand Down
11 changes: 11 additions & 0 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -184,6 +185,16 @@ func TestReactorWithEvidence(t *testing.T) {
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking] this switch statement for defining the mempool can probably be refactored into a helper. I think I've seen it 3x already.

mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithMetrics(memplMetrics),
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand Down
96 changes: 96 additions & 0 deletions docs/celestia-architecture/adr-009-cat-pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# ADR 009: Content addressable transaction pool

## Changelog

- 2023-01-11: Initial Draft (@cmwaters)

## Context

One of the criterias of success for Celestia as a reliable data availability layer is the ability to handle large transactional throughput. A component that plays a significant role in this is the mempool. It's purpose is to receive transactions from clients and broadcast them to all other nodes, eventually reaching the next block proposer who includes it in their block. Given Celestia's aggregator-like role whereby larger transactions, i.e. blobs, are expected to dominate network traffic, a content-addressable algorithm, common in many other [peer-to-peer file sharing protocols](https://en.wikipedia.org/wiki/InterPlanetary_File_System), could be far more beneficial than the current transaction-flooding protocol that Tendermint currently uses.

This ADR describes the content addressable transaction protocol and through a comparative analysis with the existing gossip protocol, presents the case for it's adoption in Celestia.

## Decision

Use a content addressable transaction pool for disseminating transaction to nodes within the Celestia Network

## Detailed Design

The core idea is that each transaction can be referenced by a key, generated through a cryptographic hash function that reflects the content of the transaction. Nodes signal to one another which transactions they have via this key, and can request transactions they are missing through the key. This reduces the amount of duplicated transmission compared to a system which blindly sends received transactions to all other connected peers (as we will see in the consequences section).

Full details on the exact protocol can be found in the [spec](../../mempool/cat/spec.md). Here, the document focuses on the main deciding points around the architecture:

- It is assumed clients submit transactions to a single node in the network. Thus a node that receives a transaction through RPC will immediately broadcast it to all connected peers.
- The new messages: `SeenTx` and `WantTx`, are broadcast over a new mempool channel `byte(0x31)` for backwards compatibility and to distinguish priorities. Nodes running the other mempools will not receive these messages and will be able to operate normally. Similarly, the interfaces used by Tendermint are not modified in any way, thus a node operator can easily switch between mempool versions.
- Transaction gossiping takes priority over these "state" messages as to avoid situations where we receive a `SeenTx` and respond with a `WantTx` while the transaction is still queued in the nodes p2p buffer.
- The node only sends `SeenTx` to nodes that haven't yet seen the transaction, using jitter (with an upper bound of 100ms) to stagger when `SeenTx`s are broadcast to avoid messages being sent at once.
- `WantTx`s are sent to one peer at a time. A timeout is used to deem when a peer is unresponsive and the `WantTx` should be sent to another peer. This is currently set to 200ms (an estimation of network round trip time). It is not yet configurable but we may want to change that in the future.
- A channel has been added to allow the `TxPool` to feed validated txs to the `Reactor` to be sent to all other peers.

A series of new metrics have been added to monitor effectiveness:

- SuccessfulTxs: number of transactions committed in a block (to be used as a baseline)
- AlreadySeenTxs: transactions that are received more than once
- RequestedTxs: the number of initial requests for a transaction
- RerequestedTxs: the numer of follow up requests for a transaction. If this is high, it may indicate that the request timeout is too short.

The CAT pool has had numerous unit tests added. It has been tested in the local e2e networks and put under strain in large, geographically dispersed 100 node networks.

## Alternative Approaches

A few variations on the design were prototyped and tested. An early implementation experimented with just `SeenTx`s. All nodes would gossip `SeenTx` upon receiving a valid tx. Nodes would not relay received transactions to peers that had sent them a `SeenTx`. However, in many cases this would lead to a node sending a tx to a peer before it was able to receive the `SeenTx` that the node had just sent. Even with a higher priority, a large amount of duplication still occured.

Another trick was tested which involved adding a `From` field to the `SeenTx`. Nodes receiving the `SeenTx` would use the `NodeID` in `From` to check if they were already connected to that peer and thus could expect a transaction from them soon instead of immediately issuing a `WantTx`. In large scale tests, this proved to be surprisingly less efficient. This might be because a `SeenTx` rarely arrives from another node before the initial sender has broadcast to everyone. It may also be because in the testnets, each node was only connected to 10 other nodes, decreasing the chance that the node was actually connected to the original sender. The `From` field also added an extra 40 bytes to the `SeenTx` message. In the chart below, this experiemnt is shown as CAT2.

## Status

Proposed

## Consequences

To validate its effectiveness, the protocol was benchmarked against existing mempool implementations. This was done under close-to-real network environments which used [testground](https://github.com/testground/testground) and the celestia-app binary (@ v0.11.0) to create 100 validator networks. The network would then be subjected to PFBs from 1100 light nodes at 4kb per transaction. The network followed 15 second blocktimes with a maximum block size of roughly 8MB (these were being filled). This was run for 10 minutes before being torn down. The collected data was aggregated across the 100 nodes and is as follows:

| Version | Average Bandwidth | Standard Deviation | Finalized Bandwidth |
|-----|-----|------|------|
| v0 | 982.66MB/s | 113.91MB/s | 11MB/s |
| v1 | 999.89MB/s | 133.24MB/s | 11MB/s |
| v2 (CAT) | 98.90MB/s | 18.95MB/s | 11MB/s |
| v2 (CAT2) | 110.28MB/s | 33.49MB/s | 11MB/s |

> Finalized bandwidth is the amount of bytes finalized by consensus per second whereas the other measurements are per node.

Rather than just expressing the difference in bytes, this can also be viewed by the factor of duplication (i.e. the amount of times a transaction is received by a node)

| Version | v0 | v1 | v2 (CAT) | v2 (CAT2) |
| --------|----|----|----------|-----------|
| Duplication | 17.61x | 17.21x | 1.75x | 1.85x |


This, of course, comes at the cost of additional message overhead and there comes a point where the transactions are small enough that the reduction in duplication doesn't outweigh the extra state messages.


### Positive

- Reduction in network bandwidth.
- Cross compatible and therefore easily reversible.

### Negative

- Extra network round trip when not directly receiving a transaction.
- Greater complexity than a simple flooding mechanism.

### Neutral

- Allows for compact blocks to be implemented as it depends on the push pull functionality.

## Ongoing work

This section describes further work that may be subsequently undertaken in this area. The first is transaction bundling. If a node is subject to a lot of transactions from clients, instead of sending them off immediately one-by-one, it may wait for a fixed period (~100ms) and bundle them all together. The set of transactions can now be represented as a single key. This increases the content to key ratio and thus improves the performance of the protocol.

An area of further exploration is the concept of neighborhoods. Variations of this idea are present in both [GossipSub](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#gossipsub-the-gossiping-mesh-router) and Solana's Turbine. The concept entails shaping the network typology into many neighborhoods or sections where a node can be seen as strongly connected to nodes in their neighbourhood and weakly connected to peers in other neighborhoods. The idea behind a more structured topology is to make the broadcasting more directed.

Outside of protocol development, work can be done to more accurately measure the performance. Both protocols managed to sustain 15 second block times with mostly full blocks i.e. same output throughput. This indicates that the network was being artificially constrained. Either of these constraints need to be lifted (ideally max square size) so we are able to measure the underlying network speed.

## References

- [Content-addressable transaction pool spec](../../mempool/cat/spec.md)
11 changes: 6 additions & 5 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (c *Client) ABCIQuery(ctx context.Context, path string, data tmbytes.HexByt

// ABCIQueryWithOptions returns an error if opts.Prove is false.
func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes,
opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {

opts rpcclient.ABCIQueryOptions,
) (*ctypes.ResultABCIQuery, error) {
// always request the proof
opts.Prove = true

Expand Down Expand Up @@ -541,7 +541,6 @@ func (c *Client) Validators(
height *int64,
pagePtr, perPagePtr *int,
) (*ctypes.ResultValidators, error) {

// Update the light client if we're behind and retrieve the light block at the
// requested height or at the latest height if no height is provided.
l, err := c.updateLightClientIfNeededTo(ctx, height)
Expand All @@ -563,15 +562,17 @@ func (c *Client) Validators(
BlockHeight: l.Height,
Validators: v,
Count: len(v),
Total: totalCount}, nil
Total: totalCount,
}, nil
}

func (c *Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
return c.next.BroadcastEvidence(ctx, ev)
}

func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error) {
return c.next.Subscribe(ctx, subscriber, query, outCapacity...)
}

Expand Down
Loading