Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat! content addressable transaction pool #935

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
18db218
specification and implementation for content addressable transaction …
cmwaters Dec 2, 2022
cf6b66a
fix tests
cmwaters Dec 12, 2022
c39fb34
v2 transaction pool: add pull capability (#2)
cmwaters Dec 12, 2022
6d11e17
fix txpool test
cmwaters Dec 12, 2022
4cc87dc
add logic to handle malleated txs in Update
cmwaters Dec 12, 2022
dc66d27
add already seen tx metrics to both mempools to monitor duplication i…
cmwaters Dec 15, 2022
fc4fb61
fix nill pointer dereference
cmwaters Dec 19, 2022
88cf0e4
fix panic in map
cmwaters Dec 19, 2022
4cbf2eb
add more mempool metrics
cmwaters Dec 20, 2022
f317fb9
e2e: add ability to configure network connectivity
cmwaters Dec 20, 2022
6479e85
rerequest txs after disconnecting with a peer
cmwaters Dec 20, 2022
8725f21
fix panics and races
cmwaters Dec 21, 2022
7830047
fix recursion bug on exit
cmwaters Jan 2, 2023
3fa6e40
remove from field from seen tx
cmwaters Jan 12, 2023
4e06a09
complete initial draft of ADR009
cmwaters Jan 12, 2023
7ad7910
add more tests and fix p2p stuff after patch
cmwaters Jan 12, 2023
65673f6
remove adr
cmwaters Jan 12, 2023
6b01143
Merge branch 'v0.34.x-celestia' into feature/cat
cmwaters Jan 12, 2023
d234299
remove v1.toml
cmwaters Jan 12, 2023
9ea16f7
make proto-gen
cmwaters Jan 12, 2023
a0e71b6
apply suggestions
cmwaters Jan 13, 2023
8a444b4
apply matt's suggestions
cmwaters Jan 16, 2023
5f0236e
fix some tests
cmwaters Jan 16, 2023
9f740b8
Merge branch 'v0.34.x-celestia' into feature/cat
cmwaters Jan 16, 2023
2974d1f
add concurrency tests to the cache
cmwaters Jan 16, 2023
e55688f
add missing defer statements
cmwaters Jan 16, 2023
41e3dca
add small test
cmwaters Jan 16, 2023
37366e3
Merge branch 'feature/cat' of github.com:celestiaorg/celestia-core in…
cmwaters Jan 16, 2023
1ce8926
initial draft of cat pool ADR (#936)
cmwaters Feb 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions abci/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
// Default is v0.
MempoolV0 = "v0"
MempoolV1 = "v1"
MempoolV2 = "v2"
)

// NOTE: Most of the structs & relevant comments + the
Expand Down Expand Up @@ -687,9 +688,7 @@ type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - (default) FIFO mempool.
// 2) "v1" - prioritized mempool.
// WARNING: There's a known memory leak with the prioritized mempool
// that the team are working on. Read more here:
// https://github.com/tendermint/tendermint/issues/8775
// 3) "v2" - content addressable transaction pool
Version string `mapstructure:"version"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Expand Down Expand Up @@ -735,7 +734,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV0,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
# Mempool version to use:
# 1) "v0" - (default) FIFO mempool.
# 2) "v1" - prioritized mempool.
# 3) "v2" - content addressable transaction pool
version = "{{ .Mempool.Version }}"

recheck = {{ .Mempool.Recheck }}
Expand Down
10 changes: 10 additions & 0 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool"

cfg "github.com/tendermint/tendermint/config"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -88,6 +89,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}

if thisConfig.Consensus.WaitForTxs() {
Expand Down
17 changes: 16 additions & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -418,6 +419,17 @@ func newStateWithConfigAndBlockStore(
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
logger := consensusLogger()
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithMetrics(memplMetrics),
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand Down Expand Up @@ -706,7 +718,10 @@ func consensusLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
index, ok := keyvals[i+1].(int)
if ok {
return term.FgBgColor{Fg: term.Color(uint8(index + 1))}
}
}
}
return term.FgBgColor{}
Expand Down
42 changes: 17 additions & 25 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,12 @@ func TestMempoolRmBadTx(t *testing.T) {
assert.True(t, len(resCommit.Data) > 0)

emptyMempoolCh := make(chan struct{})
checkTxRespCh := make(chan struct{})
go func() {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
}
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
require.Error(t, err)

// check for the tx
for {
Expand All @@ -182,18 +172,8 @@ func TestMempoolRmBadTx(t *testing.T) {
}
}()

// Wait until the tx returns
ticker := time.After(time.Second * 5)
select {
case <-checkTxRespCh:
// success
case <-ticker:
t.Errorf("timed out waiting for tx to return")
return
}

// Wait until the tx is removed
ticker = time.After(time.Second * 5)
ticker := time.After(time.Second * 5)
select {
case <-emptyMempoolCh:
// success
Expand Down Expand Up @@ -232,10 +212,20 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
if req.Type == abci.CheckTxType_Recheck {
if txValue >= uint64(app.txCount) {
return abci.ResponseCheckTx{Code: code.CodeTypeOK}
}
return abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue),
}
}
if txValue != uint64(app.mempoolTxCount) {
return abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)}
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue),
}
}
app.mempoolTxCount++
return abci.ResponseCheckTx{Code: code.CodeTypeOK}
Expand All @@ -248,7 +238,9 @@ func txAsUint64(tx []byte) uint64 {
}

func (app *CounterApplication) Commit() abci.ResponseCommit {
app.mempoolTxCount = app.txCount
if app.mempoolTxCount < app.txCount {
app.mempoolTxCount = app.txCount
}
if app.txCount == 0 {
return abci.ResponseCommit{}
}
Expand Down
11 changes: 11 additions & 0 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -184,6 +185,16 @@ func TestReactorWithEvidence(t *testing.T) {
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking] this switch statement for defining the mempool can probably be refactored into a helper. I think I've seen it 3x already.

mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithMetrics(memplMetrics),
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand Down
11 changes: 6 additions & 5 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (c *Client) ABCIQuery(ctx context.Context, path string, data tmbytes.HexByt

// ABCIQueryWithOptions returns an error if opts.Prove is false.
func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes,
opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {

opts rpcclient.ABCIQueryOptions,
) (*ctypes.ResultABCIQuery, error) {
// always request the proof
opts.Prove = true

Expand Down Expand Up @@ -516,7 +516,6 @@ func (c *Client) Validators(
height *int64,
pagePtr, perPagePtr *int,
) (*ctypes.ResultValidators, error) {

// Update the light client if we're behind and retrieve the light block at the
// requested height or at the latest height if no height is provided.
l, err := c.updateLightClientIfNeededTo(ctx, height)
Expand All @@ -538,15 +537,17 @@ func (c *Client) Validators(
BlockHeight: l.Height,
Validators: v,
Count: len(v),
Total: totalCount}, nil
Total: totalCount,
}, nil
}

func (c *Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
return c.next.BroadcastEvidence(ctx, ev)
}

func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error) {
return c.next.Subscribe(ctx, subscriber, query, outCapacity...)
}

Expand Down
Loading