Skip to content

Commit

Permalink
Add catch all panic handler in dex endblock (#1196)
Browse files Browse the repository at this point in the history
* Add catch all panic handler in dex endblock

* test

* Add a new counter metric for recovered panics

---------

Co-authored-by: yzang2019 <[email protected]>
  • Loading branch information
codchen and yzang2019 authored Dec 29, 2023
1 parent 3a53526 commit 3734b34
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 20 deletions.
16 changes: 13 additions & 3 deletions x/dex/contract/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func EndBlockerAtomic(ctx sdk.Context, keeper *keeper.Keeper, validContractsInfo
handleDeposits(spanCtx, cachedCtx, env, keeper, tracer)

runner := NewParallelRunner(func(contract types.ContractInfoV2) {
orderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer)
OrderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer)
}, validContractsInfo, cachedCtx)

_, err := logging.LogIfNotDoneAfter(ctx.Logger(), func() (struct{}, error) {
Expand Down Expand Up @@ -227,8 +227,18 @@ func handleUnfulfilledMarketOrders(ctx context.Context, sdkCtx sdk.Context, env
}
}

func orderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) {
_, span := (*tracer).Start(ctx, "orderMatchingRunnable")
func OrderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
msg := fmt.Sprintf("PANIC RECOVERED during order matching: %s", err)
sdkContext.Logger().Error(msg)
if env != nil {
env.addError(contractInfo.ContractAddr, errors.New(msg))
}
}
}()
_, span := (*tracer).Start(ctx, "OrderMatchingRunnable")
defer span.End()
defer telemetry.MeasureSince(time.Now(), "dex", "order_matching_runnable")
defer func() {
Expand Down
9 changes: 9 additions & 0 deletions x/dex/contract/abci_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contract_test

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -32,3 +33,11 @@ func TestTransferRentFromDexToCollector(t *testing.T) {
collectorBalance := bankkeeper.GetBalance(ctx, testApp.AccountKeeper.GetModuleAddress(authtypes.FeeCollectorName), "usei")
require.Equal(t, int64(80), collectorBalance.Amount.Int64())
}

func TestOrderMatchingRunnablePanicHandler(t *testing.T) {
testApp := keepertest.TestApp()
ctx := testApp.BaseApp.NewContext(false, tmproto.Header{Time: time.Now()})
require.NotPanics(t, func() {
contract.OrderMatchingRunnable(context.Background(), ctx, nil, nil, types.ContractInfoV2{}, nil)
})
}
18 changes: 9 additions & 9 deletions x/dex/contract/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package contract
import (
"context"
"fmt"
"sync"
"time"

otrace "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -130,16 +129,17 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
cancelResults := []*types.Cancellation{}
settlements := []*types.SettlementEntry{}

mu := sync.Mutex{}
wg := sync.WaitGroup{}
// mu := sync.Mutex{}
// wg := sync.WaitGroup{}

for _, pair := range registeredPairs {
wg.Add(1)
// wg.Add(1)

pair := pair
pairCtx := ctx.WithMultiStore(multi.NewStore(ctx.MultiStore(), GetPerPairWhitelistMap(contractAddr, pair))).WithEventManager(sdk.NewEventManager())
go func() {
defer wg.Done()
// go func() {
func() {
// defer wg.Done()
pairCopy := pair
pairStr := types.GetPairString(&pairCopy)
orderbook, found := orderBooks.Load(pairStr)
Expand All @@ -150,8 +150,8 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
orderIDToSettledQuantities := GetOrderIDToSettledQuantities(pairSettlements)
PrepareCancelUnfulfilledMarketOrders(pairCtx, typedContractAddr, pairCopy, orderIDToSettledQuantities)

mu.Lock()
defer mu.Unlock()
// mu.Lock()
// defer mu.Unlock()
orders, cancels := GetMatchResults(ctx, typedContractAddr, pairCopy)
orderResults = append(orderResults, orders...)
cancelResults = append(cancelResults, cancels...)
Expand All @@ -160,7 +160,7 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
ctx.EventManager().EmitEvents(pairCtx.EventManager().Events())
}()
}
wg.Wait()
// wg.Wait()
dexkeeper.SetMatchResult(ctx, contractAddr, types.NewMatchResult(orderResults, cancelResults, settlements))

return settlements
Expand Down
24 changes: 16 additions & 8 deletions x/dex/contract/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/sei-protocol/sei-chain/utils/datastructures"
"github.com/sei-protocol/sei-chain/utils/logging"
Expand Down Expand Up @@ -134,6 +135,21 @@ func (r *ParallelRunner) Run() {
}

func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
r.sdkCtx.Logger().Error(fmt.Sprintf("panic in parallel runner recovered: %s", err))
}

atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt
select {
case r.someContractFinished <- struct{}{}:
case <-r.done:
// make sure other goroutines can also receive from 'done'
r.done <- struct{}{}
}
}()

contractInfo, _ := r.contractAddrToInfo.Load(contractAddr)
r.runnable(*contractInfo)

Expand All @@ -159,12 +175,4 @@ func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) {
}
}
}

atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt
select {
case r.someContractFinished <- struct{}{}:
case <-r.done:
// make sure other goroutines can also receive from 'done'
r.done <- struct{}{}
}
}
13 changes: 13 additions & 0 deletions x/dex/contract/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func idleRunnable(_ types.ContractInfoV2) {
atomic.AddInt64(&counter, 1)
}

func panicRunnable(_ types.ContractInfoV2) {
panic("")
}

func dependencyCheckRunnable(contractInfo types.ContractInfoV2) {
if contractInfo.ContractAddr == "C" {
_, hasA := dependencyCheck.Load("A")
Expand Down Expand Up @@ -126,3 +130,12 @@ func TestRunnerParallelContractWithInvalidDependency(t *testing.T) {
_, hasC := dependencyCheck.Load("C")
require.False(t, hasC)
}

func TestRunnerPanicContract(t *testing.T) {
contractInfo := types.ContractInfoV2{
ContractAddr: "A",
NumIncomingDependencies: 0,
}
runner := contract.NewParallelRunner(panicRunnable, []types.ContractInfoV2{contractInfo}, sdkCtx)
require.NotPanics(t, runner.Run)
}
6 changes: 6 additions & 0 deletions x/dex/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (am AppModule) getPriceToDelete(
// EndBlock executes all ABCI EndBlock logic respective to the capability module. It
// returns no validator updates.
func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) (ret []abci.ValidatorUpdate) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
ctx.Logger().Error(fmt.Sprintf("panic in endblock recovered: %s", err))
}
}()
_, span := am.tracingInfo.Start("DexEndBlock")
defer span.End()
defer dexutils.GetMemState(ctx.Context()).Clear(ctx)
Expand Down

0 comments on commit 3734b34

Please sign in to comment.