diff --git a/x/dex/contract/abci.go b/x/dex/contract/abci.go index 8e452cda63..190ac321e1 100644 --- a/x/dex/contract/abci.go +++ b/x/dex/contract/abci.go @@ -58,7 +58,7 @@ func EndBlockerAtomic(ctx sdk.Context, keeper *keeper.Keeper, validContractsInfo handleDeposits(spanCtx, cachedCtx, env, keeper, tracer) runner := NewParallelRunner(func(contract types.ContractInfoV2) { - orderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer) + OrderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer) }, validContractsInfo, cachedCtx) _, err := logging.LogIfNotDoneAfter(ctx.Logger(), func() (struct{}, error) { @@ -227,8 +227,18 @@ func handleUnfulfilledMarketOrders(ctx context.Context, sdkCtx sdk.Context, env } } -func orderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) { - _, span := (*tracer).Start(ctx, "orderMatchingRunnable") +func OrderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) { + defer func() { + if err := recover(); err != nil { + telemetry.IncrCounter(1, "recovered_panics") + msg := fmt.Sprintf("PANIC RECOVERED during order matching: %s", err) + sdkContext.Logger().Error(msg) + if env != nil { + env.addError(contractInfo.ContractAddr, errors.New(msg)) + } + } + }() + _, span := (*tracer).Start(ctx, "OrderMatchingRunnable") defer span.End() defer telemetry.MeasureSince(time.Now(), "dex", "order_matching_runnable") defer func() { diff --git a/x/dex/contract/abci_test.go b/x/dex/contract/abci_test.go index 194743acd4..3d0b7650b4 100644 --- a/x/dex/contract/abci_test.go +++ b/x/dex/contract/abci_test.go @@ -1,6 +1,7 @@ package contract_test import ( + "context" "testing" "time" @@ -32,3 +33,11 @@ func TestTransferRentFromDexToCollector(t *testing.T) { collectorBalance := bankkeeper.GetBalance(ctx, testApp.AccountKeeper.GetModuleAddress(authtypes.FeeCollectorName), "usei") require.Equal(t, int64(80), collectorBalance.Amount.Int64()) } + +func TestOrderMatchingRunnablePanicHandler(t *testing.T) { + testApp := keepertest.TestApp() + ctx := testApp.BaseApp.NewContext(false, tmproto.Header{Time: time.Now()}) + require.NotPanics(t, func() { + contract.OrderMatchingRunnable(context.Background(), ctx, nil, nil, types.ContractInfoV2{}, nil) + }) +} diff --git a/x/dex/contract/execution.go b/x/dex/contract/execution.go index 087e6ff54e..5aa0bf7263 100644 --- a/x/dex/contract/execution.go +++ b/x/dex/contract/execution.go @@ -3,7 +3,6 @@ package contract import ( "context" "fmt" - "sync" "time" otrace "go.opentelemetry.io/otel/trace" @@ -130,16 +129,17 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee cancelResults := []*types.Cancellation{} settlements := []*types.SettlementEntry{} - mu := sync.Mutex{} - wg := sync.WaitGroup{} + // mu := sync.Mutex{} + // wg := sync.WaitGroup{} for _, pair := range registeredPairs { - wg.Add(1) + // wg.Add(1) pair := pair pairCtx := ctx.WithMultiStore(multi.NewStore(ctx.MultiStore(), GetPerPairWhitelistMap(contractAddr, pair))).WithEventManager(sdk.NewEventManager()) - go func() { - defer wg.Done() + // go func() { + func() { + // defer wg.Done() pairCopy := pair pairStr := types.GetPairString(&pairCopy) orderbook, found := orderBooks.Load(pairStr) @@ -150,8 +150,8 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee orderIDToSettledQuantities := GetOrderIDToSettledQuantities(pairSettlements) PrepareCancelUnfulfilledMarketOrders(pairCtx, typedContractAddr, pairCopy, orderIDToSettledQuantities) - mu.Lock() - defer mu.Unlock() + // mu.Lock() + // defer mu.Unlock() orders, cancels := GetMatchResults(ctx, typedContractAddr, pairCopy) orderResults = append(orderResults, orders...) cancelResults = append(cancelResults, cancels...) @@ -160,7 +160,7 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee ctx.EventManager().EmitEvents(pairCtx.EventManager().Events()) }() } - wg.Wait() + // wg.Wait() dexkeeper.SetMatchResult(ctx, contractAddr, types.NewMatchResult(orderResults, cancelResults, settlements)) return settlements diff --git a/x/dex/contract/runner.go b/x/dex/contract/runner.go index 44d384c81c..6e3797f3f0 100644 --- a/x/dex/contract/runner.go +++ b/x/dex/contract/runner.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/sei-protocol/sei-chain/utils/datastructures" "github.com/sei-protocol/sei-chain/utils/logging" @@ -134,6 +135,21 @@ func (r *ParallelRunner) Run() { } func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) { + defer func() { + if err := recover(); err != nil { + telemetry.IncrCounter(1, "recovered_panics") + r.sdkCtx.Logger().Error(fmt.Sprintf("panic in parallel runner recovered: %s", err)) + } + + atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt + select { + case r.someContractFinished <- struct{}{}: + case <-r.done: + // make sure other goroutines can also receive from 'done' + r.done <- struct{}{} + } + }() + contractInfo, _ := r.contractAddrToInfo.Load(contractAddr) r.runnable(*contractInfo) @@ -159,12 +175,4 @@ func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) { } } } - - atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt - select { - case r.someContractFinished <- struct{}{}: - case <-r.done: - // make sure other goroutines can also receive from 'done' - r.done <- struct{}{} - } } diff --git a/x/dex/contract/runner_test.go b/x/dex/contract/runner_test.go index 5a45d776f2..bf7e8c1d70 100644 --- a/x/dex/contract/runner_test.go +++ b/x/dex/contract/runner_test.go @@ -29,6 +29,10 @@ func idleRunnable(_ types.ContractInfoV2) { atomic.AddInt64(&counter, 1) } +func panicRunnable(_ types.ContractInfoV2) { + panic("") +} + func dependencyCheckRunnable(contractInfo types.ContractInfoV2) { if contractInfo.ContractAddr == "C" { _, hasA := dependencyCheck.Load("A") @@ -126,3 +130,12 @@ func TestRunnerParallelContractWithInvalidDependency(t *testing.T) { _, hasC := dependencyCheck.Load("C") require.False(t, hasC) } + +func TestRunnerPanicContract(t *testing.T) { + contractInfo := types.ContractInfoV2{ + ContractAddr: "A", + NumIncomingDependencies: 0, + } + runner := contract.NewParallelRunner(panicRunnable, []types.ContractInfoV2{contractInfo}, sdkCtx) + require.NotPanics(t, runner.Run) +} diff --git a/x/dex/module.go b/x/dex/module.go index 8b8705ec70..40dae28bd3 100644 --- a/x/dex/module.go +++ b/x/dex/module.go @@ -304,6 +304,12 @@ func (am AppModule) getPriceToDelete( // EndBlock executes all ABCI EndBlock logic respective to the capability module. It // returns no validator updates. func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) (ret []abci.ValidatorUpdate) { + defer func() { + if err := recover(); err != nil { + telemetry.IncrCounter(1, "recovered_panics") + ctx.Logger().Error(fmt.Sprintf("panic in endblock recovered: %s", err)) + } + }() _, span := am.tracingInfo.Start("DexEndBlock") defer span.End() defer dexutils.GetMemState(ctx.Context()).Clear(ctx)