Skip to content

Commit

Permalink
validate uncommitted transactions against the mempool state
Browse files Browse the repository at this point in the history
* added extensible auth using build tags

* go mod tidied

* made gavins requested changes
  • Loading branch information
charithabandi committed Oct 10, 2023
1 parent 67fe11e commit a5a441f
Show file tree
Hide file tree
Showing 21 changed files with 600 additions and 65 deletions.
11 changes: 9 additions & 2 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {

bootstrapperModule := buildBootstrapper(d)

abciApp := buildAbci(d, closers, datasetsModule, validatorModule,
abciApp := buildAbci(d, closers, accs, datasetsModule, validatorModule,
ac, snapshotModule, bootstrapperModule)

cometBftNode := buildCometNode(d, closers, abciApp)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *closeFuncs) closeAll() error {
return err
}

func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.AccountsModule, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, kwild.ABCIInfoSubDirName)
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
Expand All @@ -157,6 +157,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.Data

genesisHash := d.genesisCfg.ComputeGenesisHash()
return abci.NewAbciApp(
accountsModule,
datasetsModule,
validatorModule,
atomicKv,
Expand Down Expand Up @@ -251,9 +252,15 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
closer.addCloser(db.Close)

joinExpiry := d.genesisCfg.ConsensusParams.Validator.JoinExpiry
feeMultiplier := 1
if d.genesisCfg.ConsensusParams.WithoutGasCosts {
feeMultiplier = 0
}

v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
vmgr.WithJoinExpiry(joinExpiry),
vmgr.WithFeeMultiplier(int64(feeMultiplier)),
)
if err != nil {
failBuild(err, "failed to build validator store")
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/grpc/txsvc/v1/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ func (s *Service) priceAction(ctx context.Context, txBody *txpb.Transaction_Body

// TODO: Later to be moved to validator module (or) manager
func (s *Service) priceValidatorJoin(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceJoin(ctx)
}

func (s *Service) priceValidatorLeave(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceLeave(ctx)
}

func (s *Service) priceValidatorApprove(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceApprove(ctx)
}
3 changes: 3 additions & 0 deletions internal/controller/grpc/txsvc/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ type ValidatorReader interface {
CurrentValidators(ctx context.Context) ([]*validators.Validator, error)
ActiveVotes(ctx context.Context) ([]*validators.JoinRequest, error)
// JoinStatus(ctx context.Context, joiner []byte) ([]*JoinRequest, error)
PriceJoin(ctx context.Context) (*big.Int, error)
PriceLeave(ctx context.Context) (*big.Int, error)
PriceApprove(ctx context.Context) (*big.Int, error)
}
112 changes: 91 additions & 21 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ds nilStringer) String() string {
return "no message"
}

func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule,
func NewAbciApp(accounts AccountsModule, database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule,
bootstrapper DBBootstrapModule, genesisHash []byte, opts ...AbciOpt) *AbciApp {
app := &AbciApp{
genesisAppHash: genesisHash,
Expand All @@ -74,9 +74,16 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
},
bootstrapper: bootstrapper,
snapshotter: snapshotter,
accounts: accounts,

valAddrToKey: make(map[string][]byte),

mempool: &mempool{
nonceTracker: make(map[string]uint64),
accounts: make(map[string]*userAccount),
accountStore: accounts,
},

log: log.NewNoOp(),
}

Expand Down Expand Up @@ -120,9 +127,17 @@ type AbciApp struct {
snapshotter SnapshotModule

// bootstrapper is the bootstrapper module that handles bootstrapping the database
bootstrapper DBBootstrapModule
bootstrapper DBBootstrapModule

// metadataStore to track the app hash and block height
metadataStore *metadataStore

// accountStore is the store that maintains the account state
accounts AccountsModule

// mempool maintains in-memory account state to validate the unconfirmed transactions against.
mempool *mempool

log log.Logger

// Expected AppState after bootstrapping the node with a given snapshot,
Expand Down Expand Up @@ -193,34 +208,50 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response
func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseCheckTx {
logger := a.log.With(zap.String("stage", "ABCI CheckTx"))
logger.Debug("check tx")
ctx := context.Background()
var err error
code := CodeOk
newTx := incoming.Type == abciTypes.CheckTxType_New

tx := &transactions.Transaction{}
err := tx.UnmarshalBinary(incoming.Tx)
err = tx.UnmarshalBinary(incoming.Tx)
if err != nil {
code = CodeEncodingError
logger.Error("failed to unmarshal transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()}
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}

logger.Debug("",
zap.String("sender", hex.EncodeToString(tx.Sender)),
zap.String("PayloadType", tx.Body.PayloadType.String()))

err = tx.Verify()
if err != nil {
logger.Error("failed to verify transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()}
// For a new transaction (not re-check), before looking at execution cost or
// checking nonce validity, ensure the payload is recognized and signature is valid.
if newTx {
// Verify Payload type
if !tx.Body.PayloadType.Valid() {
code = CodeInvalidTxType
logger.Error("invalid payload type", zap.String("payloadType", tx.Body.PayloadType.String()))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: "invalid payload type"}
}

// Verify Signature
err = tx.Verify()
if err != nil {
code = CodeInvalidSignature
logger.Error("failed to verify transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}
}

// TODO: CheckTx must reject transactions with a nonce <= currently mined
// account nonce. Also, with re-check happening (CheckTxType_Recheck) we
// must evict transactions that become invalid.
//
// Pathological cases:
// a. accept any txns to mempool -- that's a trivial mempool flooding attack.
// b. mine them (failed) -- that's a trivial block space attack.
// c. txns paying no fee -- flood, so a. and b. are crucial with no gas
err = a.mempool.applyTransaction(ctx, tx)
if err != nil {
code = CodeInvalidNonce
logger.Error("failed to verify transaction against local mempool state", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}

return abciTypes.ResponseCheckTx{Code: 0}
return abciTypes.ResponseCheckTx{Code: code.Uint32()}
}

func (a *AbciApp) DeliverTx(req abciTypes.RequestDeliverTx) abciTypes.ResponseDeliverTx {
Expand Down Expand Up @@ -473,6 +504,8 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
logger.Debug("start commit")
ctx := context.Background()

defer a.mempool.reset()

// generate the unique id for all changes occurred thus far
id, err := a.committer.ID(ctx)
if err != nil {
Expand Down Expand Up @@ -793,17 +826,54 @@ func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes
}
}

func (a *AbciApp) validateProposalTransactions(ctx context.Context, Txns [][]byte) error {
logger := a.log.With(zap.String("stage", "ABCI ProcessProposal"))
grouped, err := groupTxsBySender(Txns)
if err != nil {
logger.Error("failed to group transaction based on sender: ", zap.Error(err))
return err
}

// ensure there are no gaps in an account's nonce, either from the
// previous best confirmed or within this block. Our current transaction
// execution does not update an accounts nonce in state unless it is the
// next nonce. Delivering transactions to a block in that way cannot happen.
for sender, txs := range grouped {
acct, err := a.accounts.GetAccount(ctx, []byte(sender))
if err != nil {
return err
}
expectedNonce := uint64(acct.Nonce) + 1

for _, tx := range txs {
if tx.Body.Nonce != expectedNonce {
logger.Error("nonce mismatch", zap.Uint64("txNonce", tx.Body.Nonce), zap.Uint64("expectedNonce", expectedNonce))
return fmt.Errorf("nonce mismatch, ExpectedNonce: %d TxNonce: %d", expectedNonce, tx.Body.Nonce)
}
expectedNonce++
}
}
return nil
}

// ProcessProposal should validate the received blocks and reject the block if:
// 1. transactions are not ordered by nonces
// 2. nonce is less than the last committed nonce for the account
// 3. duplicates or gaps in the nonces
// 4. transaction size is greater than the max_tx_bytes
// else accept the proposed block.
func (a *AbciApp) ProcessProposal(p0 abciTypes.RequestProcessProposal) abciTypes.ResponseProcessProposal {
a.log.Debug("",
zap.String("stage", "ABCI ProcessProposal"),
zap.Int64("height", p0.Height),
zap.Int("txs", len(p0.Txs)))

// TODO: ensure there are no gaps in an account's nonce, either from the
// previous best confirmed or within this block. Our current transaction
// execution does not update an accounts nonce in state unless it is the
// next nonce. Delivering transactions to a block in that way cannot happen.
ctx := context.Background()
if err := a.validateProposalTransactions(ctx, p0.Txs); err != nil {
return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_REJECT}
}

// TODO: Verify the Tx and Block sizes based on the genesis configuration
return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_ACCEPT}
}

Expand Down
Loading

0 comments on commit a5a441f

Please sign in to comment.