From 6c91804f8c8d232022ee9fbb75649f6608334b5d Mon Sep 17 00:00:00 2001 From: Brennan Lamey <66885902+brennanjl@users.noreply.github.com> Date: Fri, 6 Oct 2023 13:59:31 -0600 Subject: [PATCH] added extensible auth using build tags (#339) * added extensible auth using build tags * go mod tidied * made gavins requested changes --- internal/app/kwild/server/build.go | 12 +- internal/controller/grpc/txsvc/v1/pricing.go | 6 +- internal/controller/grpc/txsvc/v1/service.go | 3 + pkg/abci/abci.go | 149 +++++++++++++++-- pkg/abci/abci_test.go | 158 +++++++++++++++++++ pkg/abci/interfaces.go | 19 +++ pkg/abci/mempool.go | 139 ++++++++++++++++ pkg/abci/opts.go | 6 + pkg/balances/accounts.go | 4 +- pkg/kv/atomic/committer.go | 3 +- pkg/modules/validators/interfaces.go | 5 + pkg/modules/validators/transactions.go | 65 ++++++-- pkg/modules/validators/validators.go | 19 ++- pkg/sessions/errors.go | 1 + pkg/sessions/interfaces.go | 2 +- pkg/sessions/mock_test.go | 3 +- pkg/sessions/session.go | 11 +- pkg/sessions/sql-session/session.go | 15 +- pkg/validators/mgr.go | 3 + pkg/validators/opts.go | 6 + pkg/validators/pricing.go | 32 ++++ 21 files changed, 610 insertions(+), 51 deletions(-) create mode 100644 pkg/abci/mempool.go create mode 100644 pkg/validators/pricing.go diff --git a/internal/app/kwild/server/build.go b/internal/app/kwild/server/build.go index 9414080d1..fe4150a33 100644 --- a/internal/app/kwild/server/build.go +++ b/internal/app/kwild/server/build.go @@ -75,7 +75,7 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { bootstrapperModule := buildBootstrapper(d) - abciApp := buildAbci(d, closers, datasetsModule, validatorModule, + abciApp := buildAbci(d, closers, accs, datasetsModule, validatorModule, ac, snapshotModule, bootstrapperModule) cometBftNode := buildCometNode(d, closers, abciApp) @@ -132,7 +132,7 @@ func (c *closeFuncs) closeAll() error { return err } -func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule, +func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.AccountsModule, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule, atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp { badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, kwild.ABCIInfoSubDirName) badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{ @@ -157,6 +157,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.Data genesisHash := d.genesisCfg.ComputeGenesisHash() return abci.NewAbciApp( + accountsModule, datasetsModule, validatorModule, atomicKv, @@ -165,6 +166,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.Data bootstrapper, genesisHash, abci.WithLogger(*d.log.Named("abci")), + abci.WithoutGasCosts(d.genesisCfg.ConsensusParams.WithoutGasCosts), ) } @@ -251,9 +253,15 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions closer.addCloser(db.Close) joinExpiry := d.genesisCfg.ConsensusParams.Validator.JoinExpiry + feeMultiplier := 1 + if d.genesisCfg.ConsensusParams.WithoutGasCosts { + feeMultiplier = 0 + } + v, err := vmgr.NewValidatorMgr(d.ctx, db, vmgr.WithLogger(*d.log.Named("validatorStore")), vmgr.WithJoinExpiry(joinExpiry), + vmgr.WithFeeMultiplier(int64(feeMultiplier)), ) if err != nil { failBuild(err, "failed to build validator store") diff --git a/internal/controller/grpc/txsvc/v1/pricing.go b/internal/controller/grpc/txsvc/v1/pricing.go index aad4ed1a6..ff0416fc7 100644 --- a/internal/controller/grpc/txsvc/v1/pricing.go +++ b/internal/controller/grpc/txsvc/v1/pricing.go @@ -89,13 +89,13 @@ func (s *Service) priceAction(ctx context.Context, txBody *txpb.Transaction_Body // TODO: Later to be moved to validator module (or) manager func (s *Service) priceValidatorJoin(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceJoin(ctx) } func (s *Service) priceValidatorLeave(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceLeave(ctx) } func (s *Service) priceValidatorApprove(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) { - return big.NewInt(10000000000000), nil + return s.vstore.PriceApprove(ctx) } diff --git a/internal/controller/grpc/txsvc/v1/service.go b/internal/controller/grpc/txsvc/v1/service.go index 215aead18..ab8298228 100644 --- a/internal/controller/grpc/txsvc/v1/service.go +++ b/internal/controller/grpc/txsvc/v1/service.go @@ -65,4 +65,7 @@ type ValidatorReader interface { CurrentValidators(ctx context.Context) ([]*validators.Validator, error) ActiveVotes(ctx context.Context) ([]*validators.JoinRequest, error) // JoinStatus(ctx context.Context, joiner []byte) ([]*JoinRequest, error) + PriceJoin(ctx context.Context) (*big.Int, error) + PriceLeave(ctx context.Context) (*big.Int, error) + PriceApprove(ctx context.Context) (*big.Int, error) } diff --git a/pkg/abci/abci.go b/pkg/abci/abci.go index 479c971eb..57e328604 100644 --- a/pkg/abci/abci.go +++ b/pkg/abci/abci.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "math/big" "sort" "github.com/kwilteam/kwil-db/pkg/crypto" @@ -62,7 +63,7 @@ func (ds nilStringer) String() string { return "no message" } -func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule, +func NewAbciApp(accounts AccountsModule, database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule, bootstrapper DBBootstrapModule, genesisHash []byte, opts ...AbciOpt) *AbciApp { app := &AbciApp{ genesisAppHash: genesisHash, @@ -72,11 +73,18 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com metadataStore: &metadataStore{ kv: kv, }, - bootstrapper: bootstrapper, - snapshotter: snapshotter, + bootstrapper: bootstrapper, + snapshotter: snapshotter, + accounts: accounts, + withoutGasCosts: true, valAddrToKey: make(map[string][]byte), + mempool: &mempool{ + nonceTracker: make(map[string]uint64), + accounts: make(map[string]*userAccount), + }, + log: log.NewNoOp(), } @@ -120,9 +128,18 @@ type AbciApp struct { snapshotter SnapshotModule // bootstrapper is the bootstrapper module that handles bootstrapping the database - bootstrapper DBBootstrapModule + bootstrapper DBBootstrapModule + + // metadataStore to track the app hash and block height metadataStore *metadataStore + // accountStore is the store that maintains the account state + accounts AccountsModule + withoutGasCosts bool + + // mempoolState maintains in-memory account state to validate the transactions against. + mempool *mempool + log log.Logger // Expected AppState after bootstrapping the node with a given snapshot, @@ -193,9 +210,11 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseCheckTx { logger := a.log.With(zap.String("stage", "ABCI CheckTx")) logger.Debug("check tx") + ctx := context.Background() + var err error tx := &transactions.Transaction{} - err := tx.UnmarshalBinary(incoming.Tx) + err = tx.UnmarshalBinary(incoming.Tx) if err != nil { logger.Error("failed to unmarshal transaction", zap.Error(err)) return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} @@ -205,21 +224,77 @@ func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseC zap.String("sender", hex.EncodeToString(tx.Sender)), zap.String("PayloadType", tx.Body.PayloadType.String())) - err = tx.Verify() + // For a new transaction (not re-check), before looking at execution cost or + // checking nonce validity, ensure the payload is recognized and signature is valid. + if incoming.Type == abciTypes.CheckTxType_New { + // Verify Payload type + if !tx.Body.PayloadType.Valid() { + logger.Error("invalid payload type", zap.String("payloadType", tx.Body.PayloadType.String())) + return abciTypes.ResponseCheckTx{Code: 1, Log: "invalid payload type"} + } + + // Verify Signature + err = tx.Verify() + if err != nil { + logger.Error("failed to verify transaction", zap.Error(err)) + return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} + } + + // Estimate price of the transaction based on the payload type + if !a.withoutGasCosts { + var price *big.Int + price, err = a.estimateTxPrice(ctx, tx) + if err != nil { + logger.Error("failed to estimate tx price", zap.Error(err)) + return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} + } + + // Check if tx fee is enough to cover the gas costs + if tx.Body.Fee.Cmp(price) < 0 { + logger.Error("insufficient fee", zap.String("fee", tx.Body.Fee.String()), zap.String("price", price.String())) + return abciTypes.ResponseCheckTx{Code: 1, Log: "insufficient fee"} + } + } + } + + a.mempool.mu.Lock() + defer a.mempool.mu.Unlock() + + acct, err := a.accountInfo(ctx, tx.Sender) if err != nil { - logger.Error("failed to verify transaction", zap.Error(err)) + logger.Error("failed to get account info", zap.Error(err)) return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()} } - // TODO: CheckTx must reject transactions with a nonce <= currently mined - // account nonce. Also, with re-check happening (CheckTxType_Recheck) we - // must evict transactions that become invalid. - // - // Pathological cases: - // a. accept any txns to mempool -- that's a trivial mempool flooding attack. - // b. mine them (failed) -- that's a trivial block space attack. - // c. txns paying no fee -- flood, so a. and b. are crucial with no gas + if incoming.Type == abciTypes.CheckTxType_New { + + // It is normally permissible to accept a transaction with the same nonce + // as a tx already in mempool (but not in a block), however without gas + // we would not want to allow that since there is no criteria + // for selecting the one to mine (normally higher fee). + if tx.Body.Nonce != uint64(acct.nonce)+1 { + logger.Error("nonce mismatch", zap.Uint64("txNonce", tx.Body.Nonce), zap.Int64("accountNonce", acct.nonce)) + return abciTypes.ResponseCheckTx{Code: 1, Log: "nonce mismatch"} + } + + a.mempool.nonceTracker[string(tx.Sender)] = tx.Body.Nonce + } else { + // Ensure nonce is higher than the last nonce for this account. + if tx.Body.Nonce <= uint64(acct.nonce) { + logger.Error("nonce mismatch", zap.Uint64("txNonce", tx.Body.Nonce), zap.Int64("accountNonce", acct.nonce)) + return abciTypes.ResponseCheckTx{Code: 1, Log: "nonce mismatch"} + } + } + if !a.withoutGasCosts { + // Check if the sender has enough balance to pay for the tx fee + if tx.Body.Fee.Cmp(acct.balance) > 0 { + logger.Error("insufficient account balance", zap.String("balance", acct.balance.String())) + return abciTypes.ResponseCheckTx{Code: 1, Log: "insufficient account balance"} + } + } + // spend fee and nonce + a.updateAccount(tx.Sender, tx.Body.Fee) return abciTypes.ResponseCheckTx{Code: 0} } @@ -522,6 +597,8 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit { // Unlock all the DBs } + a.mempool.reset() + return abciTypes.ResponseCommit{ Data: appHash, } @@ -793,6 +870,43 @@ func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes } } +func (a *AbciApp) validateProposalTransactions(ctx context.Context, Txns [][]byte) error { + grouped, err := groupTxsBySender(Txns) + if err != nil { + a.log.Error("failed to group transaction based on sender: ", zap.Error(err)) + return err + } + + for sender, txs := range grouped { + var expectedNonce uint64 = 1 + acct, err := a.accounts.GetAccount(ctx, []byte(sender)) + if err != nil { + return err + } + + if acct != nil { + expectedNonce = uint64(acct.Nonce) + 1 + } + + for _, tx := range txs { + if tx.Body.Nonce != expectedNonce { + a.log.Info("Nonce mismatch", + zap.String("stage", "ABCI ProcessProposal"), + zap.Uint64("txNonce", tx.Body.Nonce), + zap.Uint64("expectedNonce", expectedNonce)) + return fmt.Errorf("nonce mismatch, ExpectedNonce: %d TxNonce: %d", expectedNonce, tx.Body.Nonce) + } + expectedNonce++ + } + } + return nil +} + +// Reject the block if: +// 1. Any gap in the nonces +// 2. Any duplicate nonces +// 3. Any nonce is less than the last committed nonce for the account +// 4. transactions are not ordered by nonces func (a *AbciApp) ProcessProposal(p0 abciTypes.RequestProcessProposal) abciTypes.ResponseProcessProposal { a.log.Debug("", zap.String("stage", "ABCI ProcessProposal"), @@ -803,7 +917,12 @@ func (a *AbciApp) ProcessProposal(p0 abciTypes.RequestProcessProposal) abciTypes // previous best confirmed or within this block. Our current transaction // execution does not update an accounts nonce in state unless it is the // next nonce. Delivering transactions to a block in that way cannot happen. + ctx := context.Background() + if err := a.validateProposalTransactions(ctx, p0.Txs); err != nil { + return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_REJECT} + } + // TODO: Verify the Tx and Block sizes based on the genesis configuration return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_ACCEPT} } diff --git a/pkg/abci/abci_test.go b/pkg/abci/abci_test.go index 4a89597db..e37082403 100644 --- a/pkg/abci/abci_test.go +++ b/pkg/abci/abci_test.go @@ -2,12 +2,15 @@ package abci import ( "bytes" + "context" "math/big" "testing" "github.com/kwilteam/kwil-db/pkg/auth" + "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/log" "github.com/kwilteam/kwil-db/pkg/transactions" + "github.com/stretchr/testify/assert" ) func marshalTx(t *testing.T, tx *transactions.Transaction) []byte { @@ -38,6 +41,36 @@ func cloneTx(tx *transactions.Transaction) *transactions.Transaction { } } +type MockAccountStore struct { +} + +func (m *MockAccountStore) GetAccount(ctx context.Context, pubKey []byte) (*balances.Account, error) { + return &balances.Account{ + PublicKey: nil, + Balance: big.NewInt(0), + Nonce: 0, + }, nil +} + +func newTx(t *testing.T, nonce uint64, sender string) []byte { + tx := &transactions.Transaction{ + Signature: &auth.Signature{}, + Body: &transactions.TransactionBody{ + Description: "test", + Payload: []byte(`random payload`), + Fee: big.NewInt(0), + Nonce: nonce, + }, + Sender: []byte(sender), + } + + bts, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("could not marshal transaction! %v", err) + } + return bts +} + func Test_prepareMempoolTxns(t *testing.T) { // To make these tests deterministic, we manually craft certain misorderings // and the known expected orderings. Also include some malformed @@ -171,3 +204,128 @@ func Test_prepareMempoolTxns(t *testing.T) { }) } } + +func Test_ProcessProposal_TxValidation(t *testing.T) { + ctx := context.Background() + abciApp := &AbciApp{ + accounts: &MockAccountStore{}, + } + logger := log.NewStdOut(log.DebugLevel) + + abciApp.log = logger + + txA1 := newTx(t, 1, "A") + txA2 := newTx(t, 2, "A") + txA3 := newTx(t, 3, "A") + txA4 := newTx(t, 4, "A") + txB1 := newTx(t, 1, "B") + txB2 := newTx(t, 2, "B") + txB3 := newTx(t, 3, "B") + + testcases := []struct { + name string + txs [][]byte + err bool // expect error + }{ + { + // Invalid ordering of nonces: 3, 1, 2 by single sender + name: "Invalid ordering on nonces by single sender", + txs: [][]byte{ + txA3, + txA1, + txA2, + }, + err: true, + }, + { + // A1, A3, B3, A2, B1, B2 + name: "Invalid ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txA3, + txB3, + txA2, + txB1, + txB2, + }, + err: true, + }, + { + // Gaps in the ordering of nonces: 1, 3, 4 by single sender + name: "Gaps in the ordering of nonces by single sender", + txs: [][]byte{ + txA1, + txA3, + txA4, + }, + err: true, + }, + { + // Gaps in the ordering of nonces by multiple senders + name: "Gaps in the ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txB1, + txA4, + txB3, + }, + err: true, + }, + { + // Duplicate Nonces: 1, 2, 2 by single sender + name: "Duplicate Nonces by single sender", + txs: [][]byte{ + txA1, + txA2, + txA2, + }, + err: true, + }, + { + // Duplicate Nonces: 1, 2, 2 by multiple senders + name: "Duplicate Nonces by multiple senders", + txs: [][]byte{ + txA1, + txA2, + txB1, + txB2, + txB2, + }, + err: true, + }, + { + // Valid ordering of nonces: 1, 2, 3 by single sender + name: "Valid ordering of nonces by single sender", + txs: [][]byte{ + txA1, + txA2, + txA3, + }, + err: false, + }, + { + // Valid ordering of nonces: 1, 2, 3 by multiple senders + name: "Valid ordering of nonces by multiple senders", + txs: [][]byte{ + txA1, + txA2, + txB1, + txB2, + txA3, + txB3, + }, + err: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := abciApp.validateProposalTransactions(ctx, tc.txs) + if tc.err { + assert.Error(t, err, "expected error due to %s", tc.name) + } else { + assert.NoError(t, err, "TC: %s, expected no error", tc.name) + } + }) + } +} diff --git a/pkg/abci/interfaces.go b/pkg/abci/interfaces.go index f40bf0332..a267a65c0 100644 --- a/pkg/abci/interfaces.go +++ b/pkg/abci/interfaces.go @@ -2,10 +2,12 @@ package abci import ( "context" + "math/big" modDataset "github.com/kwilteam/kwil-db/pkg/modules/datasets" modVal "github.com/kwilteam/kwil-db/pkg/modules/validators" + "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/engine/types" "github.com/kwilteam/kwil-db/pkg/snapshots" "github.com/kwilteam/kwil-db/pkg/transactions" @@ -16,6 +18,10 @@ type DatasetsModule interface { Deploy(ctx context.Context, schema *types.Schema, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) Drop(ctx context.Context, dbid string, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) Execute(ctx context.Context, dbid string, action string, args [][]any, tx *transactions.Transaction) (*modDataset.ExecutionResponse, error) + + PriceDeploy(ctx context.Context, schema *types.Schema) (*big.Int, error) + PriceDrop(ctx context.Context, dbid string) (*big.Int, error) + PriceExecute(ctx context.Context, dbid string, action string, args [][]any) (*big.Int, error) } // ValidatorModule handles the processing of validator approve/join/leave @@ -56,6 +62,15 @@ type ValidatorModule interface { // Updates block height stored by the validator manager. Called in the abci Commit UpdateBlockHeight(ctx context.Context, blockHeight int64) + + // PriceJoin returns the price of a join transaction. + PriceJoin(ctx context.Context) (*big.Int, error) + + // PriceApprove returns the price of an approve transaction. + PriceApprove(ctx context.Context) (*big.Int, error) + + // PriceLeave returns the price of a leave transaction. + PriceLeave(ctx context.Context) (*big.Int, error) } // AtomicCommitter is an interface for a struct that implements atomic commits across multiple stores @@ -99,3 +114,7 @@ type DBBootstrapModule interface { // Signifies the end of the db restoration IsDBRestored() bool } + +type AccountsModule interface { + GetAccount(ctx context.Context, pubKey []byte) (*balances.Account, error) +} diff --git a/pkg/abci/mempool.go b/pkg/abci/mempool.go new file mode 100644 index 000000000..3bfd829b3 --- /dev/null +++ b/pkg/abci/mempool.go @@ -0,0 +1,139 @@ +package abci + +import ( + "context" + "errors" + "math/big" + "sync" + + engineTypes "github.com/kwilteam/kwil-db/pkg/engine/types" + modDataset "github.com/kwilteam/kwil-db/pkg/modules/datasets" + "github.com/kwilteam/kwil-db/pkg/transactions" +) + +// mempoolState maintains in-memory account state to validate the transactions against. +type mempool struct { + // nonceTracker tracks the last valid nonce for each account. + // nonce is unconfirmed if the value is greater than the nonce in the account store. + // implying that the transaction is not yet committed. + nonceTracker map[string]uint64 + + // in-memory account state to validate transactions against, purged at the end of commit. + accounts map[string]*userAccount + mu sync.RWMutex +} + +type userAccount struct { + nonce int64 + balance *big.Int + pubKey []byte +} + +// accountInfo retrieves the account info from the mempool state or the account store. +// If the account is not found, it returns a dummy account with nonce 0 and balance 0. +func (a *AbciApp) accountInfo(ctx context.Context, pubKey []byte) (*userAccount, error) { + if acctInfo, ok := a.mempool.accounts[string(pubKey)]; ok { + return acctInfo, nil + } + + acctInfo := &userAccount{ + nonce: 0, + balance: big.NewInt(0), + pubKey: pubKey, + } + a.mempool.accounts[string(pubKey)] = acctInfo + + // get account from account store + acct, err := a.accounts.GetAccount(ctx, pubKey) + if err != nil { + return nil, err + } + + if acct.PublicKey == nil { + return acctInfo, nil + } + + // Update the account info with the account store's view of the account. + acctInfo.balance = acct.Balance + acctInfo.nonce = acct.Nonce + return acctInfo, nil +} + +// updateAccount is called post-transaction validation, so that the effects of +// the transaction is reflected in the mempool's view of the account state. +// This ensures that the subsequent transactions are validated against this +// updated view of the account state, rather than the one from the account store. +func (a *AbciApp) updateAccount(pubKey []byte, fee *big.Int) { + acct, ok := a.mempool.accounts[string(pubKey)] + if !ok { + panic("account not found in mempool state") + } + + acct.balance.Sub(acct.balance, fee) + acct.nonce++ +} + +// At the end of commit, purge the mempool state of accounts as they might have been updated. +// If recheck is enabled, uncommitted transactions must be rechecked against the committed state. +func (m *mempool) reset() { + m.mu.Lock() + defer m.mu.Unlock() + + m.accounts = make(map[string]*userAccount) +} + +// estimateTxPrice estimates the price of the transaction based on the payload type. +func (a *AbciApp) estimateTxPrice(ctx context.Context, tx *transactions.Transaction) (*big.Int, error) { + switch tx.Body.PayloadType { + case transactions.PayloadTypeDeploySchema: + var schemaPayload transactions.Schema + err := schemaPayload.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return nil, err + } + var schema *engineTypes.Schema + schema, err = modDataset.ConvertSchemaToEngine(&schemaPayload) + if err != nil { + return nil, err + } + return a.database.PriceDeploy(ctx, schema) + case transactions.PayloadTypeDropSchema: + var drop transactions.DropSchema + err := drop.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return nil, err + } + return a.database.PriceDrop(ctx, drop.DBID) + case transactions.PayloadTypeExecuteAction: + var execution transactions.ActionExecution + err := execution.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return nil, err + } + // TODO: replace nil with the actual args once pricing becomes schema dependent + return a.database.PriceExecute(ctx, execution.DBID, execution.Action, nil) + case transactions.PayloadTypeValidatorJoin: + return a.validators.PriceJoin(ctx) + case transactions.PayloadTypeValidatorLeave: + return a.validators.PriceLeave(ctx) + case transactions.PayloadTypeValidatorApprove: + return a.validators.PriceApprove(ctx) + default: + return nil, errors.New("unknown payload type") + } +} + +// groupTransactions groups the transactions by sender. +func groupTxsBySender(txns [][]byte) (map[string][]*transactions.Transaction, error) { + grouped := make(map[string][]*transactions.Transaction) + for _, tx := range txns { + t := &transactions.Transaction{} + err := t.UnmarshalBinary(tx) + if err != nil { + return nil, err + } + key := string(t.Sender) + grouped[key] = append(grouped[key], t) + } + return grouped, nil +} diff --git a/pkg/abci/opts.go b/pkg/abci/opts.go index c27cfd6ac..098f82640 100644 --- a/pkg/abci/opts.go +++ b/pkg/abci/opts.go @@ -15,3 +15,9 @@ func WithApplicationVersion(version uint64) AbciOpt { a.applicationVersion = version } } + +func WithoutGasCosts(gasCosts bool) AbciOpt { + return func(a *AbciApp) { + a.withoutGasCosts = gasCosts + } +} diff --git a/pkg/balances/accounts.go b/pkg/balances/accounts.go index 04ba45ae4..8805b853d 100644 --- a/pkg/balances/accounts.go +++ b/pkg/balances/accounts.go @@ -44,9 +44,9 @@ func (ac *Committable) ID(ctx context.Context) ([]byte, error) { // Wrapper around the Cancel method on the base Committable. // This reset the updates recorded in the account store within a commit session. -func (ac *Committable) Cancel(ctx context.Context) { +func (ac *Committable) Cancel(ctx context.Context) error { ac.resetDBHash() - ac.Committable.Cancel(ctx) + return ac.Committable.Cancel(ctx) } func NewAccountStore(ctx context.Context, datastore Datastore, opts ...AccountStoreOpts) (*AccountStore, error) { diff --git a/pkg/kv/atomic/committer.go b/pkg/kv/atomic/committer.go index 1247500fe..e53d17acf 100644 --- a/pkg/kv/atomic/committer.go +++ b/pkg/kv/atomic/committer.go @@ -167,7 +167,7 @@ func (k *AtomicKV) EndApply(ctx context.Context) error { return nil } -func (k *AtomicKV) Cancel(ctx context.Context) { +func (k *AtomicKV) Cancel(ctx context.Context) error { k.mu.Lock() defer k.mu.Unlock() @@ -178,6 +178,7 @@ func (k *AtomicKV) Cancel(ctx context.Context) { k.currentTx = nil k.inSession = false k.uncommittedData = make([]*keyValue, 0) + return nil } func (k *AtomicKV) ID(ctx context.Context) ([]byte, error) { diff --git a/pkg/modules/validators/interfaces.go b/pkg/modules/validators/interfaces.go index 987a9fbf2..44847c130 100644 --- a/pkg/modules/validators/interfaces.go +++ b/pkg/modules/validators/interfaces.go @@ -2,6 +2,7 @@ package validators import ( "context" + "math/big" "github.com/kwilteam/kwil-db/pkg/balances" "github.com/kwilteam/kwil-db/pkg/validators" @@ -20,4 +21,8 @@ type ValidatorMgr interface { Approve(ctx context.Context, joiner, approver []byte) error Finalize(ctx context.Context) []*validators.Validator // end of block processing requires providing list of updates to the node's consensus client UpdateBlockHeight(blockHeight int64) + + PriceJoin(ctx context.Context) (*big.Int, error) + PriceApprove(ctx context.Context) (*big.Int, error) + PriceLeave(ctx context.Context) (*big.Int, error) } diff --git a/pkg/modules/validators/transactions.go b/pkg/modules/validators/transactions.go index 8d695e993..fef333009 100644 --- a/pkg/modules/validators/transactions.go +++ b/pkg/modules/validators/transactions.go @@ -2,6 +2,7 @@ package validators import ( "context" + "fmt" "math/big" "github.com/kwilteam/kwil-db/pkg/balances" @@ -14,6 +15,13 @@ type ExecutionResponse struct { GasUsed int64 } +func resp(fee *big.Int) *ExecutionResponse { + return &ExecutionResponse{ + Fee: fee, + GasUsed: 0, + } +} + // Join/Leave/Approve required a spend. There is currently no pricing associated // with the actions, although there probably should be for Join. @@ -29,47 +37,74 @@ func (vm *ValidatorModule) spend(ctx context.Context, acctPubKey []byte, // Join creates a join request for a prospective validator. func (vm *ValidatorModule) Join(ctx context.Context, joiner []byte, power int64, txn *transactions.Transaction) (*ExecutionResponse, error) { + price, err := vm.PriceJoin(ctx) + if err != nil { + if price == nil { + price = big.NewInt(0) + } + return resp(price), err + } - err := vm.spend(ctx, joiner, txn.Body.Fee, txn.Body.Nonce) + if txn.Body.Fee.Cmp(price) < 0 { + return resp(price), fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } + + err = vm.spend(ctx, joiner, txn.Body.Fee, txn.Body.Nonce) if err != nil { - return nil, err + return resp(price), err } if err = vm.mgr.Join(ctx, joiner, power); err != nil { - return nil, err + return resp(price), err } - return &ExecutionResponse{ - Fee: txn.Body.Fee, - GasUsed: 0, - }, nil + return resp(price), nil } // Leave creates a leave request for a current validator. func (vm *ValidatorModule) Leave(ctx context.Context, leaver []byte, txn *transactions.Transaction) (*ExecutionResponse, error) { + price, err := vm.PriceLeave(ctx) + if err != nil { + if price == nil { + price = big.NewInt(0) + } + return resp(price), err + } + + if txn.Body.Fee.Cmp(price) < 0 { + return resp(price), fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } - err := vm.spend(ctx, leaver, txn.Body.Fee, txn.Body.Nonce) + err = vm.spend(ctx, leaver, txn.Body.Fee, txn.Body.Nonce) if err != nil { - return nil, err + return resp(price), err } if err = vm.mgr.Leave(ctx, leaver); err != nil { - return nil, err + return resp(price), err } - return &ExecutionResponse{ - Fee: txn.Body.Fee, - GasUsed: 0, - }, nil + return resp(price), nil } // Approve records an approval transaction from a current validator.. func (vm *ValidatorModule) Approve(ctx context.Context, joiner []byte, txn *transactions.Transaction) (*ExecutionResponse, error) { approver := txn.Sender + price, err := vm.PriceApprove(ctx) + if err != nil { + if price == nil { + price = big.NewInt(0) + } + return resp(price), err + } + + if txn.Body.Fee.Cmp(price) < 0 { + return resp(price), fmt.Errorf("insufficient fee: %d < %d", txn.Body.Fee, price) + } - err := vm.spend(ctx, approver, txn.Body.Fee, txn.Body.Nonce) + err = vm.spend(ctx, approver, txn.Body.Fee, txn.Body.Nonce) if err != nil { return nil, err } diff --git a/pkg/modules/validators/validators.go b/pkg/modules/validators/validators.go index f13811bdb..de5176668 100644 --- a/pkg/modules/validators/validators.go +++ b/pkg/modules/validators/validators.go @@ -2,7 +2,12 @@ // blockchain application using a pluggable validator manager and account store. package validators -import "github.com/kwilteam/kwil-db/pkg/log" +import ( + "context" + "math/big" + + "github.com/kwilteam/kwil-db/pkg/log" +) // NOTE: currently there is no pricing. Any fee is accepted (nonce update only) // if their account has the sufficient balance. @@ -42,3 +47,15 @@ func WithLogger(logger log.Logger) ValidatorModuleOpt { u.log = logger } } + +func (v *ValidatorModule) PriceJoin(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceJoin(ctx) +} + +func (v *ValidatorModule) PriceLeave(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceLeave(ctx) +} + +func (v *ValidatorModule) PriceApprove(ctx context.Context) (*big.Int, error) { + return v.mgr.PriceApprove(ctx) +} diff --git a/pkg/sessions/errors.go b/pkg/sessions/errors.go index 6d63e2567..dc9bf8728 100644 --- a/pkg/sessions/errors.go +++ b/pkg/sessions/errors.go @@ -18,6 +18,7 @@ var ( ErrAlreadyRegistered = errors.New("committable already registered") ErrUnknownCommittable = errors.New("unknown committable") ErrClosed = errors.New("session closed") + ErrCancel = errors.New("error cancelling committer") ) // wrapError wraps an error with a message. diff --git a/pkg/sessions/interfaces.go b/pkg/sessions/interfaces.go index a1da332e0..6bf6c11bb 100644 --- a/pkg/sessions/interfaces.go +++ b/pkg/sessions/interfaces.go @@ -37,7 +37,7 @@ type Committable interface { EndApply(ctx context.Context) error // Cancel is used to cancel a session. - Cancel(ctx context.Context) + Cancel(ctx context.Context) error // ID returns a unique ID representative of the state changes that have occurred so far for this committable. // It should be deterministic, and should change if and only if the committable has changed. diff --git a/pkg/sessions/mock_test.go b/pkg/sessions/mock_test.go index 974214cd0..68424e5f1 100644 --- a/pkg/sessions/mock_test.go +++ b/pkg/sessions/mock_test.go @@ -191,13 +191,14 @@ func (m *mockCommittable) EndApply(ctx context.Context) error { return nil } -func (m *mockCommittable) Cancel(ctx context.Context) { +func (m *mockCommittable) Cancel(ctx context.Context) error { m.isInCommit = false m.isInApply = false m.appliedData = map[string]any{} m.canceled = true + return nil } func (m *mockCommittable) ID(ctx context.Context) ([]byte, error) { diff --git a/pkg/sessions/session.go b/pkg/sessions/session.go index f9c3b4c5e..69f2ef66b 100644 --- a/pkg/sessions/session.go +++ b/pkg/sessions/session.go @@ -239,10 +239,13 @@ func (a *AtomicCommitter) apply(ctx context.Context) (err error) { return a.wal.Truncate(ctx) } -func (a *AtomicCommitter) cancel(ctx context.Context) { +func (a *AtomicCommitter) cancel(ctx context.Context) error { for _, committable := range a.committables { - committable.Cancel(ctx) + if err := committable.Cancel(ctx); err != nil { + return wrapError(ErrCancel, err) + } } + return nil } // handleErr checks if an error is nil or not. @@ -452,7 +455,9 @@ func (a *AtomicCommitter) Unregister(ctx context.Context, id string) error { delete(a.committables, CommittableId(id)) if a.phase.InSession() { - committable.Cancel(ctx) + if err := committable.Cancel(ctx); err != nil { + return wrapError(ErrCancel, err) + } } return nil diff --git a/pkg/sessions/sql-session/session.go b/pkg/sessions/sql-session/session.go index 0397048a1..872d3e1c6 100644 --- a/pkg/sessions/sql-session/session.go +++ b/pkg/sessions/sql-session/session.go @@ -148,25 +148,26 @@ func (s *SqlCommitable) EndApply(ctx context.Context) error { // Cancel cancels the current session. // it deletes the session and rolls back the savepoint. // it will also enable foreign key constraints. -func (s *SqlCommitable) Cancel(ctx context.Context) { - errs := []error{} +func (s *SqlCommitable) Cancel(ctx context.Context) error { + var errs error if s.session != nil { - errs = append(errs, s.session.Delete()) + errs = errors.Join(errs, s.session.Delete()) s.session = nil } if s.savepoint != nil { - errs = append(errs, s.savepoint.Rollback()) + errs = errors.Join(errs, s.savepoint.Rollback()) s.savepoint = nil } // this can be called multiple times without issue if err := s.db.EnableForeignKey(); err != nil { - errs = append(errs, err) + errs = errors.Join(errs, err) } - if len(errs) > 0 { - s.log.Error("errors while cancelling session", zap.Error(errors.Join(errs...))) + if errs != nil { + s.log.Error("errors while cancelling session", zap.Error(errs)) } + return errs } // ID returns the ID of the current session. diff --git a/pkg/validators/mgr.go b/pkg/validators/mgr.go index d6e111ae3..c64bf237f 100644 --- a/pkg/validators/mgr.go +++ b/pkg/validators/mgr.go @@ -67,6 +67,9 @@ type ValidatorMgr struct { // opts joinExpiry int64 log log.Logger + + // pricing + feeMultiplier int64 } // NOTE: The SQLite validator/approval store is local and transparent to the diff --git a/pkg/validators/opts.go b/pkg/validators/opts.go index ac1e41b71..963cda4ad 100644 --- a/pkg/validators/opts.go +++ b/pkg/validators/opts.go @@ -16,3 +16,9 @@ func WithJoinExpiry(joinExpiry int64) ValidatorMgrOpt { v.joinExpiry = joinExpiry } } + +func WithFeeMultiplier(multiplier int64) ValidatorMgrOpt { + return func(v *ValidatorMgr) { + v.feeMultiplier = multiplier + } +} diff --git a/pkg/validators/pricing.go b/pkg/validators/pricing.go new file mode 100644 index 000000000..5dd0b7745 --- /dev/null +++ b/pkg/validators/pricing.go @@ -0,0 +1,32 @@ +package validators + +import ( + "context" + "math/big" +) + +var ( + defaultJoinPrice = big.NewInt(10000000000000) + defaultLeavePrice = big.NewInt(10000000000000) + defaultApprovePrice = big.NewInt(10000000000000) +) + +// applyFeeMultiplier applies the fee multiplier to the price. +func (mgr *ValidatorMgr) applyFeeMultiplier(price *big.Int) *big.Int { + return big.NewInt(0).Mul(price, big.NewInt(mgr.feeMultiplier)) +} + +// PriceJoin returns the price of issuing a join request. +func (mgr *ValidatorMgr) PriceJoin(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultJoinPrice), nil +} + +// PriceLeave returns the price of issuing a leave request. +func (mgr *ValidatorMgr) PriceLeave(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultLeavePrice), nil +} + +// PriceApprove returns the price of approving a join request. +func (mgr *ValidatorMgr) PriceApprove(ctx context.Context) (price *big.Int, err error) { + return mgr.applyFeeMultiplier(defaultApprovePrice), nil +}