From 66b164f31ab00fd808f7fc8defcf677f8d77e7de Mon Sep 17 00:00:00 2001 From: charithabandi Date: Fri, 6 Oct 2023 13:59:31 -0600 Subject: [PATCH] validate uncommitted transactions against the mempool state * added extensible auth using build tags * go mod tidied * made gavins requested changes --- internal/app/kwild/server/build.go | 11 +- internal/controller/grpc/txsvc/v1/pricing.go | 6 +- internal/controller/grpc/txsvc/v1/service.go | 3 + pkg/abci/abci.go | 112 ++++++++-- pkg/abci/abci_test.go | 210 +++++++++++++++++++ pkg/abci/interfaces.go | 19 ++ pkg/abci/mempool.go | 115 ++++++++++ pkg/abci/txcode.go | 9 +- pkg/balances/accounts.go | 4 +- pkg/kv/atomic/committer.go | 3 +- pkg/modules/validators/interfaces.go | 5 + pkg/modules/validators/transactions.go | 48 ++++- pkg/modules/validators/validators.go | 19 +- pkg/sessions/errors.go | 1 + pkg/sessions/interfaces.go | 2 +- pkg/sessions/mock_test.go | 3 +- pkg/sessions/session.go | 22 +- pkg/sessions/sql-session/session.go | 15 +- pkg/validators/mgr.go | 3 + pkg/validators/opts.go | 6 + pkg/validators/pricing.go | 32 +++ 21 files changed, 587 insertions(+), 61 deletions(-) create mode 100644 pkg/abci/mempool.go create mode 100644 pkg/validators/pricing.go diff --git a/internal/app/kwild/server/build.go b/internal/app/kwild/server/build.go index 9414080d1..d30badf91 100644 --- a/internal/app/kwild/server/build.go +++ b/internal/app/kwild/server/build.go @@ -75,7 +75,7 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { bootstrapperModule := buildBootstrapper(d) - abciApp := buildAbci(d, closers, datasetsModule, validatorModule, + abciApp := buildAbci(d, closers, accs, datasetsModule, validatorModule, ac, snapshotModule, bootstrapperModule) cometBftNode := buildCometNode(d, closers, abciApp) @@ -132,7 +132,7 @@ func (c *closeFuncs) closeAll() error { return err } -func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule, +func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.AccountsModule, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule, atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp { badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, kwild.ABCIInfoSubDirName) badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{ @@ -157,6 +157,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.Data genesisHash := d.genesisCfg.ComputeGenesisHash() return abci.NewAbciApp( + accountsModule, datasetsModule, validatorModule, atomicKv, @@ -251,9 +252,15 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions closer.addCloser(db.Close) joinExpiry := d.genesisCfg.ConsensusParams.Validator.JoinExpiry + feeMultiplier := 1 + if d.genesisCfg.ConsensusParams.WithoutGasCosts { + feeMultiplier = 0 + } + v, err := vmgr.NewValidatorMgr(d.ctx, db, vmgr.WithLogger(*d.log.Named("validatorStore")), vmgr.WithJoinExpiry(joinExpiry), + vmgr.WithFeeMultiplier(int64(feeMultiplier)), ) if err != nil { failBuild(err, "failed to build validator store") diff --git a/internal/controller/grpc/txsvc/v1/pricing.go b/internal/controller/grpc/txsvc/v1/pricing.go index aad4ed1a6..ff0416fc7 100644 --- a/internal/controller/grpc/txsvc/v1/pricing.go +++ b/internal/controller/grpc/txsvc/v1/pricing.go @@ -89,13 +89,13 @@ func (s *Service) priceAction(ctx context.Context, txBody *txpb.Transaction_Body // TODO: Later to be moved to validator module (or) manager func (s *Service) priceValidatorJoin(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceJoin(ctx) } func (s *Service) priceValidatorLeave(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceLeave(ctx) } func (s *Service) priceValidatorApprove(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceApprove(ctx) } diff --git a/internal/controller/grpc/txsvc/v1/service.go b/internal/controller/grpc/txsvc/v1/service.go index 215aead18..ab8298228 100644 --- a/internal/controller/grpc/txsvc/v1/service.go +++ b/internal/controller/grpc/txsvc/v1/service.go @@ -65,4 +65,7 @@ type ValidatorReader interface { CurrentValidators(ctx context.Context) ([]*validators.Validator, error) ActiveVotes(ctx context.Context) ([]*validators.JoinRequest, error) // JoinStatus(ctx context.Context, joiner []byte) ([]*JoinRequest, error) + PriceJoin(ctx context.Context) (*big.Int, error) + PriceLeave(ctx context.Context) (*big.Int, error) + PriceApprove(ctx context.Context) (*big.Int, error) } diff --git a/pkg/abci/abci.go b/pkg/abci/abci.go index 479c971eb..3f0f36ad7 100644 --- a/pkg/abci/abci.go +++ b/pkg/abci/abci.go @@ -62,7 +62,7 @@ func (ds nilStringer) String() string { return "no message" } -func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule, +func NewAbciApp(accounts AccountsModule, database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule, bootstrapper DBBootstrapModule, genesisHash []byte, opts ...AbciOpt) *AbciApp { app := &AbciApp{ genesisAppHash: genesisHash, @@ -74,9 +74,16 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com }, bootstrapper: bootstrapper, snapshotter: snapshotter, + accounts: accounts, valAddrToKey: make(map[string][]byte), + mempool: &mempool{ + nonceTracker: make(map[string]uint64), + accounts: make(map[string]*userAccount), + accountStore: accounts, + }, + log: log.NewNoOp(), } @@ -120,9 +127,17 @@ type AbciApp struct { snapshotter SnapshotModule // bootstrapper is the bootstrapper module that handles bootstrapping the database - bootstrapper DBBootstrapModule + bootstrapper DBBootstrapModule + + // metadataStore to track the app hash and block height metadataStore *metadataStore + // accountStore is the store that maintains the account state + accounts AccountsModule + + // mempool maintains in-memory account state to validate the unconfirmed transactions against. + mempool *mempool + log log.Logger // Expected AppState after bootstrapping the node with a given snapshot, @@ -193,34 +208,50 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseCheckTx { logger := a.log.With(zap.String("stage", "ABCI CheckTx")) logger.Debug("check tx") + ctx := context.Background() + var err error + code := CodeOk + newTx := incoming.Type == abciTypes.CheckTxType_New tx := &transactions.Transaction{} - err := tx.UnmarshalBinary(incoming.Tx) + err = tx.UnmarshalBinary(incoming.Tx) if err != nil { + code = CodeEncodingError logger.Error("failed to unmarshal transaction", zap.Error(err)) - return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} + return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()} } logger.Debug("", zap.String("sender", hex.EncodeToString(tx.Sender)), zap.String("PayloadType", tx.Body.PayloadType.String())) - err = tx.Verify() - if err != nil { - logger.Error("failed to verify transaction", zap.Error(err)) - return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} + // For a new transaction (not re-check), before looking at execution cost or + // checking nonce validity, ensure the payload is recognized and signature is valid. + if newTx { + // Verify Payload type + if !tx.Body.PayloadType.Valid() { + code = CodeInvalidTxType + logger.Error("invalid payload type", zap.String("payloadType", tx.Body.PayloadType.String())) + return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: "invalid payload type"} + } + + // Verify Signature + err = tx.Verify() + if err != nil { + code = CodeInvalidSignature + logger.Error("failed to verify transaction", zap.Error(err)) + return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()} + } } - // TODO: CheckTx must reject transactions with a nonce <= currently mined - // account nonce. Also, with re-check happening (CheckTxType_Recheck) we - // must evict transactions that become invalid. - // - // Pathological cases: - // a. accept any txns to mempool -- that's a trivial mempool flooding attack. - // b. mine them (failed) -- that's a trivial block space attack. - // c. txns paying no fee -- flood, so a. and b. are crucial with no gas + err = a.mempool.applyTransaction(ctx, tx) + if err != nil { + code = CodeInvalidNonce + logger.Error("failed to verify transaction against local mempool state", zap.Error(err)) + return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()} + } - return abciTypes.ResponseCheckTx{Code: 0} + return abciTypes.ResponseCheckTx{Code: code.Uint32()} } func (a *AbciApp) DeliverTx(req abciTypes.RequestDeliverTx) abciTypes.ResponseDeliverTx { @@ -473,6 +504,8 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit { logger.Debug("start commit") ctx := context.Background() + defer a.mempool.reset() + // generate the unique id for all changes occurred thus far id, err := a.committer.ID(ctx) if err != nil { @@ -793,17 +826,54 @@ func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes } } +func (a *AbciApp) validateProposalTransactions(ctx context.Context, Txns [][]byte) error { + logger := a.log.With(zap.String("stage", "ABCI ProcessProposal")) + grouped, err := groupTxsBySender(Txns) + if err != nil { + logger.Error("failed to group transaction based on sender: ", zap.Error(err)) + return err + } + + // ensure there are no gaps in an account's nonce, either from the + // previous best confirmed or within this block. Our current transaction + // execution does not update an accounts nonce in state unless it is the + // next nonce. Delivering transactions to a block in that way cannot happen. + for sender, txs := range grouped { + acct, err := a.accounts.GetAccount(ctx, []byte(sender)) + if err != nil { + return err + } + expectedNonce := uint64(acct.Nonce) + 1 + + for _, tx := range txs { + if tx.Body.Nonce != expectedNonce { + logger.Error("nonce mismatch", zap.Uint64("txNonce", tx.Body.Nonce), zap.Uint64("expectedNonce", expectedNonce)) + return fmt.Errorf("nonce mismatch, ExpectedNonce: %d TxNonce: %d", expectedNonce, tx.Body.Nonce) + } + expectedNonce++ + } + } + return nil +} + +// ProcessProposal should validate the received blocks and reject the block if: +// 1. transactions are not ordered by nonces +// 2. nonce is less than the last committed nonce for the account +// 3. duplicates or gaps in the nonces +// 4. transaction size is greater than the max_tx_bytes +// else accept the proposed block. func (a *AbciApp) ProcessProposal(p0 abciTypes.RequestProcessProposal) abciTypes.ResponseProcessProposal { a.log.Debug("", zap.String("stage", "ABCI ProcessProposal"), zap.Int64("height", p0.Height), zap.Int("txs", len(p0.Txs))) - // TODO: ensure there are no gaps in an account's nonce, either from the - // previous best confirmed or within this block. Our current transaction - // execution does not update an accounts nonce in state unless it is the - // next nonce. Delivering transactions to a block in that way cannot happen. + ctx := context.Background() + if err := a.validateProposalTransactions(ctx, p0.Txs); err != nil { + return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_REJECT} + } + // TODO: Verify the Tx and Block sizes based on the genesis configuration return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_ACCEPT} } diff --git a/pkg/abci/abci_test.go b/pkg/abci/abci_test.go index 4a89597db..f4b662149 100644 --- a/pkg/abci/abci_test.go +++ b/pkg/abci/abci_test.go @@ -2,12 +2,16 @@ package abci import ( "bytes" + "context" "math/big" "testing" "github.com/kwilteam/kwil-db/pkg/auth" + "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/log" "github.com/kwilteam/kwil-db/pkg/transactions" + + "github.com/stretchr/testify/assert" ) func marshalTx(t *testing.T, tx *transactions.Transaction) []byte { @@ -38,6 +42,49 @@ func cloneTx(tx *transactions.Transaction) *transactions.Transaction { } } +type MockAccountStore struct { +} + +func (m *MockAccountStore) GetAccount(ctx context.Context, pubKey []byte) (*balances.Account, error) { + return &balances.Account{ + PublicKey: nil, + Balance: big.NewInt(0), + Nonce: 0, + }, nil +} + +func newTxBts(t *testing.T, nonce uint64, sender string) []byte { + tx := &transactions.Transaction{ + Signature: &auth.Signature{}, + Body: &transactions.TransactionBody{ + Description: "test", + Payload: []byte(`random payload`), + Fee: big.NewInt(0), + Nonce: nonce, + }, + Sender: []byte(sender), + } + + bts, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("could not marshal transaction! %v", err) + } + return bts +} + +func newTx(t *testing.T, nonce uint64, sender string) *transactions.Transaction { + return &transactions.Transaction{ + Signature: &auth.Signature{}, + Body: &transactions.TransactionBody{ + Description: "test", + Payload: []byte(`random payload`), + Fee: big.NewInt(0), + Nonce: nonce, + }, + Sender: []byte(sender), + } +} + func Test_prepareMempoolTxns(t *testing.T) { // To make these tests deterministic, we manually craft certain misorderings // and the known expected orderings. Also include some malformed @@ -171,3 +218,166 @@ func Test_prepareMempoolTxns(t *testing.T) { }) } } + +func Test_ProcessProposal_TxValidation(t *testing.T) { + ctx := context.Background() + abciApp := &AbciApp{ + accounts: &MockAccountStore{}, + } + logger := log.NewStdOut(log.DebugLevel) + + abciApp.log = logger + + txA1 := newTxBts(t, 1, "A") + txA2 := newTxBts(t, 2, "A") + txA3 := newTxBts(t, 3, "A") + txA4 := newTxBts(t, 4, "A") + txB1 := newTxBts(t, 1, "B") + txB2 := newTxBts(t, 2, "B") + txB3 := newTxBts(t, 3, "B") + + testcases := []struct { + name string + txs [][]byte + err bool // expect error + }{ + { + // Invalid ordering of nonces: 3, 1, 2 by single sender + name: "Invalid ordering on nonces by single sender", + txs: [][]byte{ + txA3, + txA1, + txA2, + }, + err: true, + }, + { + // A1, A3, B3, A2, B1, B2 + name: "Invalid ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txA3, + txB3, + txA2, + txB1, + txB2, + }, + err: true, + }, + { + // Gaps in the ordering of nonces: 1, 3, 4 by single sender + name: "Gaps in the ordering of nonces by single sender", + txs: [][]byte{ + txA1, + txA3, + txA4, + }, + err: true, + }, + { + // Gaps in the ordering of nonces by multiple senders + name: "Gaps in the ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txB1, + txA4, + txB3, + }, + err: true, + }, + { + // Duplicate Nonces: 1, 2, 2 by single sender + name: "Duplicate Nonces by single sender", + txs: [][]byte{ + txA1, + txA2, + txA2, + }, + err: true, + }, + { + // Duplicate Nonces: 1, 2, 2 by multiple senders + name: "Duplicate Nonces by multiple senders", + txs: [][]byte{ + txA1, + txA2, + txB1, + txB2, + txB2, + }, + err: true, + }, + { + // Valid ordering of nonces: 1, 2, 3 by single sender + name: "Valid ordering of nonces by single sender", + txs: [][]byte{ + txA1, + txA2, + txA3, + }, + err: false, + }, + { + // Valid ordering of nonces: 1, 2, 3 by multiple senders + name: "Valid ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txA2, + txB1, + txB2, + txA3, + txB3, + }, + err: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := abciApp.validateProposalTransactions(ctx, tc.txs) + if tc.err { + assert.Error(t, err, "expected error due to %s", tc.name) + } else { + assert.NoError(t, err, "TC: %s, expected no error", tc.name) + } + }) + } +} + +func Test_CheckTx(t *testing.T) { + m := &mempool{ + accountStore: &MockAccountStore{}, + accounts: make(map[string]*userAccount), + nonceTracker: make(map[string]uint64), + } + ctx := context.Background() + + // Successful transaction A: 1 + err := m.applyTransaction(ctx, newTx(t, 1, "A")) + assert.NoError(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 1) + + // Successful transaction A: 2 + err = m.applyTransaction(ctx, newTx(t, 2, "A")) + assert.NoError(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 2) + + // Duplicate nonce failure + err = m.applyTransaction(ctx, newTx(t, 2, "A")) + assert.Error(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 2) + + // Invalid order + err = m.applyTransaction(ctx, newTx(t, 4, "A")) + assert.Error(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 2) + + err = m.applyTransaction(ctx, newTx(t, 3, "A")) + assert.NoError(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 3) + + // Recheck nonce 4 transaction + err = m.applyTransaction(ctx, newTx(t, 4, "A")) + assert.NoError(t, err) + assert.EqualValues(t, m.nonceTracker["A"], 4) +} diff --git a/pkg/abci/interfaces.go b/pkg/abci/interfaces.go index f40bf0332..a267a65c0 100644 --- a/pkg/abci/interfaces.go +++ b/pkg/abci/interfaces.go @@ -2,10 +2,12 @@ package abci import ( "context" + "math/big" modDataset "github.com/kwilteam/kwil-db/pkg/modules/datasets" modVal "github.com/kwilteam/kwil-db/pkg/modules/validators" + "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/engine/types" "github.com/kwilteam/kwil-db/pkg/snapshots" "github.com/kwilteam/kwil-db/pkg/transactions" @@ -16,6 +18,10 @@ type DatasetsModule interface { Deploy(ctx context.Context, schema *types.Schema, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) Drop(ctx context.Context, dbid string, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) Execute(ctx context.Context, dbid string, action string, args [][]any, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) + + PriceDeploy(ctx context.Context, schema *types.Schema) (*big.Int, error) + PriceDrop(ctx context.Context, dbid string) (*big.Int, error) + PriceExecute(ctx context.Context, dbid string, action string, args [][]any) (*big.Int, error) } // ValidatorModule handles the processing of validator approve/join/leave @@ -56,6 +62,15 @@ type ValidatorModule interface { // Updates block height stored by the validator manager. Called in the abci Commit UpdateBlockHeight(ctx context.Context, blockHeight int64) + + // PriceJoin returns the price of a join transaction. + PriceJoin(ctx context.Context) (*big.Int, error) + + // PriceApprove returns the price of an approve transaction. + PriceApprove(ctx context.Context) (*big.Int, error) + + // PriceLeave returns the price of a leave transaction. + PriceLeave(ctx context.Context) (*big.Int, error) } // AtomicCommitter is an interface for a struct that implements atomic commits across multiple stores @@ -99,3 +114,7 @@ type DBBootstrapModule interface { // Signifies the end of the db restoration IsDBRestored() bool } + +type AccountsModule interface { + GetAccount(ctx context.Context, pubKey []byte) (*balances.Account, error) +} diff --git a/pkg/abci/mempool.go b/pkg/abci/mempool.go new file mode 100644 index 000000000..44004502f --- /dev/null +++ b/pkg/abci/mempool.go @@ -0,0 +1,115 @@ +package abci + +import ( + "context" + "encoding/hex" + "fmt" + "math/big" + "sync" + + "github.com/kwilteam/kwil-db/pkg/transactions" +) + +// mempoolState maintains in-memory account state to validate the transactions against. +type mempool struct { + accountStore AccountsModule + // nonceTracker tracks the last valid nonce for each account. nonce is + // unconfirmed if the value is greater than the nonce in the account store. + // Key: sender's public key, Value: last valid nonce + nonceTracker map[string]uint64 + + // in-memory account state to validate transactions against, purged at the end of commit. + accounts map[string]*userAccount + mu sync.Mutex +} + +type userAccount struct { + nonce int64 + balance *big.Int +} + +// accountInfo retrieves the account info from the mempool state or the account store. +// If the account is not found, it returns a dummy account with nonce 0 and balance 0. +func (m *mempool) accountInfo(ctx context.Context, pubKey []byte) (*userAccount, error) { + if acctInfo, ok := m.accounts[string(pubKey)]; ok { + return acctInfo, nil // there is an unconfirmed tx for this account + } + + // get account from account store + acct, err := m.accountStore.GetAccount(ctx, pubKey) + if err != nil { + return nil, err + } + + acctInfo := &userAccount{ + nonce: acct.Nonce, + balance: acct.Balance, + } + m.accounts[string(pubKey)] = acctInfo + + return acctInfo, nil +} + +// applyTransaction validates account specific info and applies valid transactions to the mempool state. +func (m *mempool) applyTransaction(ctx context.Context, tx *transactions.Transaction) error { + m.mu.Lock() + defer m.mu.Unlock() + + // get account info from mempool state or account store + acct, err := m.accountInfo(context.Background(), tx.Sender) + if err != nil { + return err + } + + hexKey := hex.EncodeToString(tx.Sender) + + // It is normally permissible to accept a transaction with the same nonce + // as a tx already in mempool (but not in a block), however without gas + // we would not want to allow that since there is no criteria + // for selecting the one to mine (normally higher fee). + if tx.Body.Nonce != uint64(acct.nonce)+1 { + return fmt.Errorf("invalid nonce for account %s: got %d, expected %d", hexKey, tx.Body.Nonce, acct.nonce+1) + } + + m.updateAccount(tx.Sender, tx.Body.Nonce) + return nil +} + +// updateAccount is called post-transaction validation, so that the effects of +// the transaction is reflected in the mempool's view of the account state. +// This ensures that the subsequent transactions are validated against this +// updated view of the account state, rather than the one from the account store. +func (m *mempool) updateAccount(pubKey []byte, txNonce uint64) { + publicKey := string(pubKey) + + m.accounts[publicKey].nonce++ + //acct.balance.Sub(acct.balance, fee) + + if txNonce > m.nonceTracker[publicKey] { + m.nonceTracker[publicKey] = txNonce + } +} + +// reset clears the in-memory unconfirmed account states. +// This should be done at the end of block commit. +func (m *mempool) reset() { + m.mu.Lock() + defer m.mu.Unlock() + + m.accounts = make(map[string]*userAccount) +} + +// groupTransactions groups the transactions by sender. +func groupTxsBySender(txns [][]byte) (map[string][]*transactions.Transaction, error) { + grouped := make(map[string][]*transactions.Transaction) + for _, tx := range txns { + t := &transactions.Transaction{} + err := t.UnmarshalBinary(tx) + if err != nil { + return nil, err + } + key := string(t.Sender) + grouped[key] = append(grouped[key], t) + } + return grouped, nil +} diff --git a/pkg/abci/txcode.go b/pkg/abci/txcode.go index 643805c0b..2da4efbd3 100644 --- a/pkg/abci/txcode.go +++ b/pkg/abci/txcode.go @@ -11,9 +11,12 @@ var ( type TxCode uint32 const ( - CodeOk TxCode = 0 - CodeEncodingError TxCode = 1 - CodeUnknownError TxCode = 2 // for now it's for all non-encoding error + CodeOk TxCode = 0 + CodeEncodingError TxCode = 1 + CodeInvalidTxType TxCode = 2 + CodeInvalidSignature TxCode = 3 + CodeInvalidNonce TxCode = 4 + CodeUnknownError TxCode = 5 // for now it's for all non-encoding error ) func (c TxCode) Uint32() uint32 { diff --git a/pkg/balances/accounts.go b/pkg/balances/accounts.go index 04ba45ae4..8805b853d 100644 --- a/pkg/balances/accounts.go +++ b/pkg/balances/accounts.go @@ -44,9 +44,9 @@ func (ac *Committable) ID(ctx context.Context) ([]byte, error) { // Wrapper around the Cancel method on the base Committable. // This reset the updates recorded in the account store within a commit session. -func (ac *Committable) Cancel(ctx context.Context) { +func (ac *Committable) Cancel(ctx context.Context) error { ac.resetDBHash() - ac.Committable.Cancel(ctx) + return ac.Committable.Cancel(ctx) } func NewAccountStore(ctx context.Context, datastore Datastore, opts ...AccountStoreOpts) (*AccountStore, error) { diff --git a/pkg/kv/atomic/committer.go b/pkg/kv/atomic/committer.go index 1247500fe..e53d17acf 100644 --- a/pkg/kv/atomic/committer.go +++ b/pkg/kv/atomic/committer.go @@ -167,7 +167,7 @@ func (k *AtomicKV) EndApply(ctx context.Context) error { return nil } -func (k *AtomicKV) Cancel(ctx context.Context) { +func (k *AtomicKV) Cancel(ctx context.Context) error { k.mu.Lock() defer k.mu.Unlock() @@ -178,6 +178,7 @@ func (k *AtomicKV) Cancel(ctx context.Context) { k.currentTx = nil k.inSession = false k.uncommittedData = make([]*keyValue, 0) + return nil } func (k *AtomicKV) ID(ctx context.Context) ([]byte, error) { diff --git a/pkg/modules/validators/interfaces.go b/pkg/modules/validators/interfaces.go index 987a9fbf2..44847c130 100644 --- a/pkg/modules/validators/interfaces.go +++ b/pkg/modules/validators/interfaces.go @@ -2,6 +2,7 @@ package validators import ( "context" + "math/big" "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/validators" @@ -20,4 +21,8 @@ type ValidatorMgr interface { Approve(ctx context.Context, joiner, approver []byte) error Finalize(ctx context.Context) []*validators.Validator // end of block processing requires providing list of updates to the node's consensus client UpdateBlockHeight(blockHeight int64) + + PriceJoin(ctx context.Context) (*big.Int, error) + PriceApprove(ctx context.Context) (*big.Int, error) + PriceLeave(ctx context.Context) (*big.Int, error) } diff --git a/pkg/modules/validators/transactions.go b/pkg/modules/validators/transactions.go index 8d695e993..4d2720709 100644 --- a/pkg/modules/validators/transactions.go +++ b/pkg/modules/validators/transactions.go @@ -2,6 +2,7 @@ package validators import ( "context" + "fmt" "math/big" "github.com/kwilteam/kwil-db/pkg/balances" @@ -14,6 +15,13 @@ type ExecutionResponse struct { GasUsed int64 } +func resp(fee *big.Int) *ExecutionResponse { + return &ExecutionResponse{ + Fee: fee, + GasUsed: 0, + } +} + // Join/Leave/Approve required a spend. There is currently no pricing associated // with the actions, although there probably should be for Join. @@ -29,8 +37,16 @@ func (vm *ValidatorModule) spend(ctx context.Context, acctPubKey []byte, // Join creates a join request for a prospective validator. func (vm *ValidatorModule) Join(ctx context.Context, joiner []byte, power int64, txn *transactions.Transaction) (*ExecutionResponse, error) { + price, err := vm.PriceJoin(ctx) + if err != nil { + return nil, err + } + + if txn.Body.Fee.Cmp(price) < 0 { + return nil, fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } - err := vm.spend(ctx, joiner, txn.Body.Fee, txn.Body.Nonce) + err = vm.spend(ctx, joiner, txn.Body.Fee, txn.Body.Nonce) if err != nil { return nil, err } @@ -39,17 +55,22 @@ func (vm *ValidatorModule) Join(ctx context.Context, joiner []byte, power int64, return nil, err } - return &ExecutionResponse{ - Fee: txn.Body.Fee, - GasUsed: 0, - }, nil + return resp(txn.Body.Fee), nil } // Leave creates a leave request for a current validator. func (vm *ValidatorModule) Leave(ctx context.Context, leaver []byte, txn *transactions.Transaction) (*ExecutionResponse, error) { + price, err := vm.PriceLeave(ctx) + if err != nil { + return nil, err + } - err := vm.spend(ctx, leaver, txn.Body.Fee, txn.Body.Nonce) + if txn.Body.Fee.Cmp(price) < 0 { + return nil, fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } + + err = vm.spend(ctx, leaver, txn.Body.Fee, txn.Body.Nonce) if err != nil { return nil, err } @@ -58,18 +79,23 @@ func (vm *ValidatorModule) Leave(ctx context.Context, leaver []byte, return nil, err } - return &ExecutionResponse{ - Fee: txn.Body.Fee, - GasUsed: 0, - }, nil + return resp(txn.Body.Fee), nil } // Approve records an approval transaction from a current validator.. func (vm *ValidatorModule) Approve(ctx context.Context, joiner []byte, txn *transactions.Transaction) (*ExecutionResponse, error) { approver := txn.Sender + price, err := vm.PriceApprove(ctx) + if err != nil { + return nil, err + } + + if txn.Body.Fee.Cmp(price) < 0 { + return nil, fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } - err := vm.spend(ctx, approver, txn.Body.Fee, txn.Body.Nonce) + err = vm.spend(ctx, approver, txn.Body.Fee, txn.Body.Nonce) if err != nil { return nil, err } diff --git a/pkg/modules/validators/validators.go b/pkg/modules/validators/validators.go index f13811bdb..de5176668 100644 --- a/pkg/modules/validators/validators.go +++ b/pkg/modules/validators/validators.go @@ -2,7 +2,12 @@ // blockchain application using a pluggable validator manager and account store. package validators -import "github.com/kwilteam/kwil-db/pkg/log" +import ( + "context" + "math/big" + + "github.com/kwilteam/kwil-db/pkg/log" +) // NOTE: currently there is no pricing. Any fee is accepted (nonce update only) // if their account has the sufficient balance. @@ -42,3 +47,15 @@ func WithLogger(logger log.Logger) ValidatorModuleOpt { u.log = logger } } + +func (v *ValidatorModule) PriceJoin(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceJoin(ctx) +} + +func (v *ValidatorModule) PriceLeave(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceLeave(ctx) +} + +func (v *ValidatorModule) PriceApprove(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceApprove(ctx) +} diff --git a/pkg/sessions/errors.go b/pkg/sessions/errors.go index 6d63e2567..dc9bf8728 100644 --- a/pkg/sessions/errors.go +++ b/pkg/sessions/errors.go @@ -18,6 +18,7 @@ var ( ErrAlreadyRegistered = errors.New("committable already registered") ErrUnknownCommittable = errors.New("unknown committable") ErrClosed = errors.New("session closed") + ErrCancel = errors.New("error cancelling committer") ) // wrapError wraps an error with a message. diff --git a/pkg/sessions/interfaces.go b/pkg/sessions/interfaces.go index a1da332e0..6bf6c11bb 100644 --- a/pkg/sessions/interfaces.go +++ b/pkg/sessions/interfaces.go @@ -37,7 +37,7 @@ type Committable interface { EndApply(ctx context.Context) error // Cancel is used to cancel a session. - Cancel(ctx context.Context) + Cancel(ctx context.Context) error // ID returns a unique ID representative of the state changes that have occurred so far for this committable. // It should be deterministic, and should change if and only if the committable has changed. diff --git a/pkg/sessions/mock_test.go b/pkg/sessions/mock_test.go index 974214cd0..68424e5f1 100644 --- a/pkg/sessions/mock_test.go +++ b/pkg/sessions/mock_test.go @@ -191,13 +191,14 @@ func (m *mockCommittable) EndApply(ctx context.Context) error { return nil } -func (m *mockCommittable) Cancel(ctx context.Context) { +func (m *mockCommittable) Cancel(ctx context.Context) error { m.isInCommit = false m.isInApply = false m.appliedData = map[string]any{} m.canceled = true + return nil } func (m *mockCommittable) ID(ctx context.Context) ([]byte, error) { diff --git a/pkg/sessions/session.go b/pkg/sessions/session.go index f9c3b4c5e..756a014b4 100644 --- a/pkg/sessions/session.go +++ b/pkg/sessions/session.go @@ -239,10 +239,14 @@ func (a *AtomicCommitter) apply(ctx context.Context) (err error) { return a.wal.Truncate(ctx) } -func (a *AtomicCommitter) cancel(ctx context.Context) { +func (a *AtomicCommitter) cancel(ctx context.Context) error { + var errs error for _, committable := range a.committables { - committable.Cancel(ctx) + if err := committable.Cancel(ctx); err != nil { + errs = errors.Join(errs, err) + } } + return errs } // handleErr checks if an error is nil or not. @@ -251,7 +255,9 @@ func (a *AtomicCommitter) cancel(ctx context.Context) { func (a *AtomicCommitter) handleErr(ctx context.Context, err *error) { if *err != nil { a.log.Error("error during atomic commit", zap.Error(*err)) - a.cancel(ctx) + if err := a.cancel(ctx); err != nil { + a.log.Error("error cancelling atomic commit", zap.Error(err)) + } a.phase = CommitPhaseNone } } @@ -295,8 +301,7 @@ func (a *AtomicCommitter) applyWal(ctx context.Context) (err error) { if truncErr != nil { return truncErr } - a.cancel(ctx) - return nil + return a.cancel(ctx) } if err != nil { return err @@ -452,7 +457,9 @@ func (a *AtomicCommitter) Unregister(ctx context.Context, id string) error { delete(a.committables, CommittableId(id)) if a.phase.InSession() { - committable.Cancel(ctx) + if err := committable.Cancel(ctx); err != nil { + return wrapError(ErrCancel, err) + } } return nil @@ -478,8 +485,7 @@ func (a *AtomicCommitter) Close() error { ctx, cancel := context.WithTimeout(context.Background(), a.timeout) defer cancel() - a.cancel(ctx) - return nil + return a.cancel(ctx) } type CommitPhase uint8 diff --git a/pkg/sessions/sql-session/session.go b/pkg/sessions/sql-session/session.go index 0397048a1..872d3e1c6 100644 --- a/pkg/sessions/sql-session/session.go +++ b/pkg/sessions/sql-session/session.go @@ -148,25 +148,26 @@ func (s *SqlCommitable) EndApply(ctx context.Context) error { // Cancel cancels the current session. // it deletes the session and rolls back the savepoint. // it will also enable foreign key constraints. -func (s *SqlCommitable) Cancel(ctx context.Context) { - errs := []error{} +func (s *SqlCommitable) Cancel(ctx context.Context) error { + var errs error if s.session != nil { - errs = append(errs, s.session.Delete()) + errs = errors.Join(errs, s.session.Delete()) s.session = nil } if s.savepoint != nil { - errs = append(errs, s.savepoint.Rollback()) + errs = errors.Join(errs, s.savepoint.Rollback()) s.savepoint = nil } // this can be called multiple times without issue if err := s.db.EnableForeignKey(); err != nil { - errs = append(errs, err) + errs = errors.Join(errs, err) } - if len(errs) > 0 { - s.log.Error("errors while cancelling session", zap.Error(errors.Join(errs...))) + if errs != nil { + s.log.Error("errors while cancelling session", zap.Error(errs)) } + return errs } // ID returns the ID of the current session. diff --git a/pkg/validators/mgr.go b/pkg/validators/mgr.go index d6e111ae3..c64bf237f 100644 --- a/pkg/validators/mgr.go +++ b/pkg/validators/mgr.go @@ -67,6 +67,9 @@ type ValidatorMgr struct { // opts joinExpiry int64 log log.Logger + + // pricing + feeMultiplier int64 } // NOTE: The SQLite validator/approval store is local and transparent to the diff --git a/pkg/validators/opts.go b/pkg/validators/opts.go index ac1e41b71..963cda4ad 100644 --- a/pkg/validators/opts.go +++ b/pkg/validators/opts.go @@ -16,3 +16,9 @@ func WithJoinExpiry(joinExpiry int64) ValidatorMgrOpt { v.joinExpiry = joinExpiry } } + +func WithFeeMultiplier(multiplier int64) ValidatorMgrOpt { + return func(v *ValidatorMgr) { + v.feeMultiplier = multiplier + } +} diff --git a/pkg/validators/pricing.go b/pkg/validators/pricing.go new file mode 100644 index 000000000..5dd0b7745 --- /dev/null +++ b/pkg/validators/pricing.go @@ -0,0 +1,32 @@ +package validators + +import ( + "context" + "math/big" +) + +var ( + defaultJoinPrice = big.NewInt(10000000000000) + defaultLeavePrice = big.NewInt(10000000000000) + defaultApprovePrice = big.NewInt(10000000000000) +) + +// applyFeeMultiplier applies the fee multiplier to the price. +func (mgr *ValidatorMgr) applyFeeMultiplier(price *big.Int) *big.Int { + return big.NewInt(0).Mul(price, big.NewInt(mgr.feeMultiplier)) +} + +// PriceJoin returns the price of issuing a join request. +func (mgr *ValidatorMgr) PriceJoin(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultJoinPrice), nil +} + +// PriceLeave returns the price of issuing a leave request. +func (mgr *ValidatorMgr) PriceLeave(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultLeavePrice), nil +} + +// PriceApprove returns the price of approving a join request. +func (mgr *ValidatorMgr) PriceApprove(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultApprovePrice), nil +}