Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate transactions against local mempool state #338

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {

bootstrapperModule := buildBootstrapper(d)

abciApp := buildAbci(d, closers, datasetsModule, validatorModule,
abciApp := buildAbci(d, closers, accs, datasetsModule, validatorModule,
ac, snapshotModule, bootstrapperModule)

cometBftNode := buildCometNode(d, closers, abciApp)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *closeFuncs) closeAll() error {
return err
}

func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.AccountsModule, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, kwild.ABCIInfoSubDirName)
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
Expand All @@ -157,6 +157,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.Data

genesisHash := d.genesisCfg.ComputeGenesisHash()
return abci.NewAbciApp(
accountsModule,
charithabandi marked this conversation as resolved.
Show resolved Hide resolved
datasetsModule,
validatorModule,
atomicKv,
Expand Down Expand Up @@ -251,9 +252,15 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
closer.addCloser(db.Close)

joinExpiry := d.genesisCfg.ConsensusParams.Validator.JoinExpiry
feeMultiplier := 1
if d.genesisCfg.ConsensusParams.WithoutGasCosts {
feeMultiplier = 0
}

v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
vmgr.WithJoinExpiry(joinExpiry),
vmgr.WithFeeMultiplier(int64(feeMultiplier)),
)
if err != nil {
failBuild(err, "failed to build validator store")
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/grpc/txsvc/v1/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ func (s *Service) priceAction(ctx context.Context, txBody *txpb.Transaction_Body

// TODO: Later to be moved to validator module (or) manager
func (s *Service) priceValidatorJoin(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceJoin(ctx)
}

func (s *Service) priceValidatorLeave(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceLeave(ctx)
}

func (s *Service) priceValidatorApprove(ctx context.Context, txBody *txpb.Transaction_Body) (*big.Int, error) {
return big.NewInt(10000000000000), nil
return s.vstore.PriceApprove(ctx)
}
3 changes: 3 additions & 0 deletions internal/controller/grpc/txsvc/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ type ValidatorReader interface {
CurrentValidators(ctx context.Context) ([]*validators.Validator, error)
ActiveVotes(ctx context.Context) ([]*validators.JoinRequest, error)
// JoinStatus(ctx context.Context, joiner []byte) ([]*JoinRequest, error)
PriceJoin(ctx context.Context) (*big.Int, error)
PriceLeave(ctx context.Context) (*big.Int, error)
PriceApprove(ctx context.Context) (*big.Int, error)
}
112 changes: 91 additions & 21 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ds nilStringer) String() string {
return "no message"
}

func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule,
func NewAbciApp(accounts AccountsModule, database DatasetsModule, vldtrs ValidatorModule, kv KVStore, committer AtomicCommitter, snapshotter SnapshotModule,
bootstrapper DBBootstrapModule, genesisHash []byte, opts ...AbciOpt) *AbciApp {
app := &AbciApp{
genesisAppHash: genesisHash,
Expand All @@ -74,9 +74,16 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
},
bootstrapper: bootstrapper,
snapshotter: snapshotter,
accounts: accounts,

valAddrToKey: make(map[string][]byte),

mempool: &mempool{
nonceTracker: make(map[string]uint64),
accounts: make(map[string]*userAccount),
accountStore: accounts,
},

log: log.NewNoOp(),
}

Expand Down Expand Up @@ -120,9 +127,17 @@ type AbciApp struct {
snapshotter SnapshotModule

// bootstrapper is the bootstrapper module that handles bootstrapping the database
bootstrapper DBBootstrapModule
bootstrapper DBBootstrapModule

// metadataStore to track the app hash and block height
metadataStore *metadataStore

// accountStore is the store that maintains the account state
accounts AccountsModule

// mempool maintains in-memory account state to validate the unconfirmed transactions against.
mempool *mempool

log log.Logger

// Expected AppState after bootstrapping the node with a given snapshot,
Expand Down Expand Up @@ -193,34 +208,50 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response
func (a *AbciApp) CheckTx(incoming abciTypes.RequestCheckTx) abciTypes.ResponseCheckTx {
logger := a.log.With(zap.String("stage", "ABCI CheckTx"))
logger.Debug("check tx")
ctx := context.Background()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation.
It seems all AbciApp.*Tx methods, won't provide us a context through parameter, so it's really to us to manage context.
And all our abci modules methods receive Context.
So it's bit awkward to see in those AbciApp.*Tx methods, we just created a context.Background() context, without any kind of context management, I guess we'll add later ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should manage our own contexts. Will raise an Issue on it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is genuinely good, it feels right ABCI call the app with context

var err error
code := CodeOk
newTx := incoming.Type == abciTypes.CheckTxType_New

tx := &transactions.Transaction{}
err := tx.UnmarshalBinary(incoming.Tx)
err = tx.UnmarshalBinary(incoming.Tx)
if err != nil {
code = CodeEncodingError
logger.Error("failed to unmarshal transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()}
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}

logger.Debug("",
zap.String("sender", hex.EncodeToString(tx.Sender)),
zap.String("PayloadType", tx.Body.PayloadType.String()))

err = tx.Verify()
if err != nil {
logger.Error("failed to verify transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: 1, Log: err.Error()}
// For a new transaction (not re-check), before looking at execution cost or
// checking nonce validity, ensure the payload is recognized and signature is valid.
if newTx {
// Verify Payload type
if !tx.Body.PayloadType.Valid() {
code = CodeInvalidTxType
logger.Error("invalid payload type", zap.String("payloadType", tx.Body.PayloadType.String()))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: "invalid payload type"}
}

// Verify Signature
err = tx.Verify()
charithabandi marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
code = CodeInvalidSignature
logger.Error("failed to verify transaction", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}
}

// TODO: CheckTx must reject transactions with a nonce <= currently mined
// account nonce. Also, with re-check happening (CheckTxType_Recheck) we
// must evict transactions that become invalid.
//
// Pathological cases:
// a. accept any txns to mempool -- that's a trivial mempool flooding attack.
// b. mine them (failed) -- that's a trivial block space attack.
// c. txns paying no fee -- flood, so a. and b. are crucial with no gas
err = a.mempool.applyTransaction(ctx, tx)
if err != nil {
code = CodeInvalidNonce
logger.Error("failed to verify transaction against local mempool state", zap.Error(err))
return abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}
}

return abciTypes.ResponseCheckTx{Code: 0}
return abciTypes.ResponseCheckTx{Code: code.Uint32()}
}

func (a *AbciApp) DeliverTx(req abciTypes.RequestDeliverTx) abciTypes.ResponseDeliverTx {
Expand Down Expand Up @@ -473,6 +504,8 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
logger.Debug("start commit")
ctx := context.Background()

defer a.mempool.reset()

// generate the unique id for all changes occurred thus far
id, err := a.committer.ID(ctx)
if err != nil {
Expand Down Expand Up @@ -793,17 +826,54 @@ func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes
}
}

func (a *AbciApp) validateProposalTransactions(ctx context.Context, Txns [][]byte) error {
logger := a.log.With(zap.String("stage", "ABCI ProcessProposal"))
grouped, err := groupTxsBySender(Txns)
if err != nil {
logger.Error("failed to group transaction based on sender: ", zap.Error(err))
return err
}

// ensure there are no gaps in an account's nonce, either from the
// previous best confirmed or within this block. Our current transaction
// execution does not update an accounts nonce in state unless it is the
// next nonce. Delivering transactions to a block in that way cannot happen.
for sender, txs := range grouped {
acct, err := a.accounts.GetAccount(ctx, []byte(sender))
if err != nil {
return err
}
expectedNonce := uint64(acct.Nonce) + 1

for _, tx := range txs {
if tx.Body.Nonce != expectedNonce {
logger.Error("nonce mismatch", zap.Uint64("txNonce", tx.Body.Nonce), zap.Uint64("expectedNonce", expectedNonce))
return fmt.Errorf("nonce mismatch, ExpectedNonce: %d TxNonce: %d", expectedNonce, tx.Body.Nonce)
}
expectedNonce++
}
}
return nil
}

// ProcessProposal should validate the received blocks and reject the block if:
// 1. transactions are not ordered by nonces
// 2. nonce is less than the last committed nonce for the account
// 3. duplicates or gaps in the nonces
// 4. transaction size is greater than the max_tx_bytes
// else accept the proposed block.
func (a *AbciApp) ProcessProposal(p0 abciTypes.RequestProcessProposal) abciTypes.ResponseProcessProposal {
a.log.Debug("",
zap.String("stage", "ABCI ProcessProposal"),
zap.Int64("height", p0.Height),
zap.Int("txs", len(p0.Txs)))

// TODO: ensure there are no gaps in an account's nonce, either from the
// previous best confirmed or within this block. Our current transaction
// execution does not update an accounts nonce in state unless it is the
// next nonce. Delivering transactions to a block in that way cannot happen.
ctx := context.Background()
if err := a.validateProposalTransactions(ctx, p0.Txs); err != nil {
return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_REJECT}
}

// TODO: Verify the Tx and Block sizes based on the genesis configuration
return abciTypes.ResponseProcessProposal{Status: abciTypes.ResponseProcessProposal_ACCEPT}
}

Expand Down
Loading