Skip to content

Commit

Permalink
go/consensus/tendermint: Expire txes when CheckTx is disabled
Browse files Browse the repository at this point in the history
When CheckTx is disabled (for debug purposes only, e.g. in E2E tests), we still
need to periodically remove old transactions as otherwise the mempool will fill
up. Keep track of transactions were added and invalidate them when they expire.
  • Loading branch information
kostko committed Feb 25, 2020
1 parent f0536ab commit 7be9849
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changelog/2720.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/consensus/tendermint: Expire txes when CheckTx is disabled

When CheckTx is disabled (for debug purposes only, e.g. in E2E tests), we
still need to periodically remove old transactions as otherwise the mempool
will fill up. Keep track of transactions were added and invalidate them when
they expire.
62 changes: 50 additions & 12 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
stateKeyInitChainEvents = "OasisInitChainEvents"

metricsUpdateInterval = 10 * time.Second

// debugTxLifetime is the transaction mempool lifetime when CheckTx is disabled (debug only).
debugTxLifetime = 1 * time.Minute
)

var (
Expand Down Expand Up @@ -302,6 +305,9 @@ type abciMux struct {
// invalidatedTxs maps transaction hashes (hash.Hash) to a subscriber
// waiting for that transaction to become invalid.
invalidatedTxs sync.Map
// debugExpiringTxs maps transaction hashes to the time at which they were created. This is only
// used in case CheckTx is disabled (for debug purposes only).
debugExpiringTxs map[hash.Hash]time.Time
}

type invalidatedTxSubscription struct {
Expand Down Expand Up @@ -682,9 +688,46 @@ func (mux *abciMux) EstimateGas(caller signature.PublicKey, tx *transaction.Tran
return ctx.Gas().GasUsed(), nil
}

func (mux *abciMux) notifyInvalidatedCheckTx(txHash hash.Hash, err error) {
if item, exists := mux.invalidatedTxs.Load(txHash); exists {
// Notify subscriber.
sub := item.(*invalidatedTxSubscription)
select {
case sub.resultCh <- err:
default:
}
close(sub.resultCh)

mux.invalidatedTxs.Delete(txHash)
}
}

func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
if mux.state.disableCheckTx {
// Blindly accept all transactions if configured to do so.
// Blindly accept all transactions if configured to do so. We still need to periodically
// remove old transactions as otherwise the mempool will fill up, so keep track of when
// transactions were added and invalidate them after the configured interval.
var txHash hash.Hash
txHash.FromBytes(req.Tx)

if req.Type == types.CheckTxType_Recheck {
// Check timestamp.
if ts, ok := mux.debugExpiringTxs[txHash]; ok && mux.currentTime.Sub(ts) > debugTxLifetime {
delete(mux.debugExpiringTxs, txHash)

err := fmt.Errorf("mux: transaction expired (debug only)")
mux.notifyInvalidatedCheckTx(txHash, err)

return types.ResponseCheckTx{
Codespace: errors.UnknownModule,
Code: 1,
Log: err.Error(),
}
}
} else {
mux.debugExpiringTxs[txHash] = mux.currentTime
}

return types.ResponseCheckTx{
Code: types.CodeTypeOK,
}
Expand All @@ -706,17 +749,7 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
var txHash hash.Hash
txHash.FromBytes(req.Tx)

if item, exists := mux.invalidatedTxs.Load(txHash); exists {
// Notify subscriber.
sub := item.(*invalidatedTxSubscription)
select {
case sub.resultCh <- err:
default:
}
close(sub.resultCh)

mux.invalidatedTxs.Delete(txHash)
}
mux.notifyInvalidatedCheckTx(txHash, err)
}

return types.ResponseCheckTx{
Expand Down Expand Up @@ -911,6 +944,11 @@ func newABCIMux(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationC
lastBeginBlock: -1,
}

// Create a map of expiring transactions if CheckTx is disabled (debug only).
if state.disableCheckTx {
mux.debugExpiringTxs = make(map[hash.Hash]time.Time)
}

mux.logger.Debug("ABCI multiplexer initialized",
"block_height", state.BlockHeight(),
"block_hash", hex.EncodeToString(state.BlockHash()),
Expand Down

0 comments on commit 7be9849

Please sign in to comment.