diff --git a/internal/app/kwild/server/cometbft.go b/internal/app/kwild/server/cometbft.go index 2a0719ce3..7a4320030 100644 --- a/internal/app/kwild/server/cometbft.go +++ b/internal/app/kwild/server/cometbft.go @@ -82,7 +82,7 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config { func extractGenesisDoc(g *config.GenesisConfig) (*cmttypes.GenesisDoc, error) { consensusParams := &cmttypes.ConsensusParams{ - Block: cmttypes.BlockParams{ + Block: cmttypes.BlockParams{ // TODO: set MaxBytes to -1 so we can do the truncation in PrepareProposal after our other processing MaxBytes: g.ConsensusParams.Block.MaxBytes, MaxGas: g.ConsensusParams.Block.MaxGas, }, diff --git a/pkg/abci/abci.go b/pkg/abci/abci.go index 96b31749d..13c3fabc4 100644 --- a/pkg/abci/abci.go +++ b/pkg/abci/abci.go @@ -1,11 +1,13 @@ package abci import ( + "bytes" "context" "encoding/binary" "encoding/hex" "errors" "fmt" + "slices" "github.com/kwilteam/kwil-db/pkg/crypto" engineTypes "github.com/kwilteam/kwil-db/pkg/engine/types" @@ -101,6 +103,11 @@ func pubkeyToAddr(pubkey []byte) (string, error) { type AbciApp struct { genesisAppHash []byte + + // TODO: MempoolModule. Policy for acceptance to (and eviction from) mempool + // depends on information on account balances and nonces, and other factors + // that are likely configurable. + // database is the database module that handles database deployment, dropping, and execution database DatasetsModule @@ -169,6 +176,25 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response return abciTypes.ResponseBeginBlock{} } +// CheckTx is the "Guardian of the mempool: every node runs CheckTx before +// letting a transaction into its local mempool". Also "The transaction may come +// from an external user or another node". Further "CheckTx validates the +// transaction against the current state of the application, for example, +// checking signatures and account balances, but does not apply any of the state +// changes described in the transaction." +// +// This method must reject transactions that are invalid and/or may be crafted +// to attack the network by flooding the mempool or filling blocks with rejected +// transactions. +// +// This method is also used to re-check mempool transactions after blocks are +// mined. This is used to *evict* previously accepted transactions that become +// invalid, which may happen for a variety of reason only the application can +// decide, such as changes in account balance and last mined nonce. +// +// It is important to use this method rather than include failing transactions +// in blocks, particularly if the failure mode involves the transaction author +// spending no gas or achieving including in the block with little effort. func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseCheckTx { logger := a.log.With(zap.String("stage", "ABCI CheckTx")) logger.Debug("check tx") @@ -190,6 +216,15 @@ func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseC return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} } + // TODO: CheckTx must reject transactions with a nonce <= currently mined + // account nonce. Also, with re-check happening (CheckTxType_Recheck) we + // must evict transactions that become invalid. + // + // Pathological cases: + // a. accept any txns to mempool -- that's a trivial mempool flooding attack. + // b. mine them (failed) -- that's a trivial block space attack. + // c. txns paying no fee -- flood, so a. and b. are crucial with no gas + return abciTypes.ResponseCheckTx{Code: 0} } @@ -652,16 +687,68 @@ func (a *AbciApp) OfferSnapshot(p0 abciTypes.RequestOfferSnapshot) abciTypes.Res return abciTypes.ResponseOfferSnapshot{Result: abciTypes.ResponseOfferSnapshot_ACCEPT} } +// prepareMempoolTxns prepares the transactions for the block we are proposing. +// The input transactions are from mempool direct from cometbft, and we modify +// the list for our purposes. This includes ensuring transactions from the same +// sender in ascending nonce-order, enforcing the max bytes limit, etc. +// +// This will either need to become part of a MempoolModule, or use an +// AccountsModule. +func prepareMempoolTxns(txs [][]byte, maxBytes int, log *log.Logger) [][]byte { + type indexedTxn struct { + i int // index in p0.Txs so we can pull the bytes without re-marshalling + *transactions.Transaction + } + + var okTxns []*indexedTxn + for i, txB := range txs { + tx := &transactions.Transaction{} + err := tx.UnmarshalBinary(txB) + if err != nil { + log.Error("failed to unmarshal transaction that was previously accepted to mempool", zap.Error(err)) + continue // should not have passed CheckTx to get into mempool + } + okTxns = append(okTxns, &indexedTxn{i, tx}) + } + + slices.SortStableFunc(okTxns, func(a, b *indexedTxn) int { + if !bytes.Equal(a.Sender, b.Sender) { + return 0 // only swap txns from the same sender + } + if a.Body.Nonce == b.Body.Nonce { + return 0 // don't reorder, the second one will fail to execute + } + if a.Body.Nonce < b.Body.Nonce { + return -1 + } + return 1 + }) + + // TODO: truncate based on our max block size since we'll have to set + // ConsensusParams.Block.MaxBytes to -1 so that we get ALL transactions even + // if it goes beyond max_tx_bytes. See: + // https://github.com/cometbft/cometbft/pull/1003 + // https://docs.cometbft.com/v0.38/spec/abci/abci++_methods#prepareproposal + // https://github.com/cometbft/cometbft/issues/980 + + finalTxns := make([][]byte, len(okTxns)) + for i, tx := range okTxns { + finalTxns[i] = txs[tx.i] + } + + return finalTxns +} + func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes.ResponsePrepareProposal { a.log.Debug("", zap.String("stage", "ABCI PrepareProposal"), zap.Int64("height", p0.Height), zap.Int("txs", len(p0.Txs))) - // TODO: do something with the txs? + okTxns := prepareMempoolTxns(p0.Txs, int(p0.MaxTxBytes), &a.log) return abciTypes.ResponsePrepareProposal{ - Txs: p0.Txs, + Txs: okTxns, } } diff --git a/pkg/abci/abci_test.go b/pkg/abci/abci_test.go new file mode 100644 index 000000000..d8d0e336f --- /dev/null +++ b/pkg/abci/abci_test.go @@ -0,0 +1,87 @@ +package abci + +import ( + "bytes" + "math/big" + "testing" + + "github.com/kwilteam/kwil-db/pkg/crypto" + "github.com/kwilteam/kwil-db/pkg/log" + "github.com/kwilteam/kwil-db/pkg/transactions" +) + +func marshalTx(t *testing.T, tx *transactions.Transaction) []byte { + b, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("could not marshal transaction! %v", err) + } + return b +} + +func cloneTx(tx *transactions.Transaction) *transactions.Transaction { + sig := make([]byte, len(tx.Signature.Signature)) + copy(sig, tx.Signature.Signature) + sender := make([]byte, len(tx.Sender)) + copy(sender, tx.Sender) + body := *tx.Body // same nonce + fee := *tx.Body.Fee + body.Fee = &fee + body.Payload = make([]byte, len(body.Payload)) + copy(body.Payload, tx.Body.Payload) + return &transactions.Transaction{ + Signature: &crypto.Signature{ + Signature: sig, + Type: tx.Signature.Type, + }, + Body: &body, + Serialization: tx.Serialization, + Sender: sender, + } +} + +func Test_prepareMempoolTxns(t *testing.T) { + tA := &transactions.Transaction{ + Signature: &crypto.Signature{ + Signature: []byte{}, + Type: crypto.SignatureTypeEd25519, + }, + Body: &transactions.TransactionBody{ + Description: "tests", + Payload: []byte(`nada`), + Fee: big.NewInt(0), + Nonce: 0, + }, + Sender: []byte(`notapubkey`), + } + tAb := marshalTx(t, tA) + + tB := cloneTx(tA) + tB.Body.Nonce++ + tBb := marshalTx(t, tB) + + logger := log.NewStdOut(log.DebugLevel) + tests := []struct { + name string + txs [][]byte + want [][]byte + }{ + { + "one valid", + [][]byte{tBb, tAb}, + [][]byte{tAb, tBb}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := prepareMempoolTxns(tt.txs, 1e6, &logger) + if len(got) != len(tt.want) { + t.Errorf("got %d txns, expected %d", len(got), len(tt.want)) + } + for i, txi := range got { + if !bytes.Equal(txi, tt.want[i]) { + t.Errorf("mismatched tx %d", i) + } + } + }) + } +}