Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add catch all panic handler in dex endblock #1196

Merged
merged 3 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions x/dex/contract/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
handleDeposits(spanCtx, cachedCtx, env, keeper, tracer)

runner := NewParallelRunner(func(contract types.ContractInfoV2) {
orderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer)
OrderMatchingRunnable(spanCtx, cachedCtx, env, keeper, contract, tracer)

Check warning on line 61 in x/dex/contract/abci.go

View check run for this annotation

Codecov / codecov/patch

x/dex/contract/abci.go#L61

Added line #L61 was not covered by tests
}, validContractsInfo, cachedCtx)

_, err := logging.LogIfNotDoneAfter(ctx.Logger(), func() (struct{}, error) {
Expand Down Expand Up @@ -227,8 +227,18 @@
}
}

func orderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) {
_, span := (*tracer).Start(ctx, "orderMatchingRunnable")
func OrderMatchingRunnable(ctx context.Context, sdkContext sdk.Context, env *environment, keeper *keeper.Keeper, contractInfo types.ContractInfoV2, tracer *otrace.Tracer) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
msg := fmt.Sprintf("PANIC RECOVERED during order matching: %s", err)
sdkContext.Logger().Error(msg)
if env != nil {
env.addError(contractInfo.ContractAddr, errors.New(msg))
}

Check warning on line 238 in x/dex/contract/abci.go

View check run for this annotation

Codecov / codecov/patch

x/dex/contract/abci.go#L237-L238

Added lines #L237 - L238 were not covered by tests
}
}()
_, span := (*tracer).Start(ctx, "OrderMatchingRunnable")
defer span.End()
defer telemetry.MeasureSince(time.Now(), "dex", "order_matching_runnable")
defer func() {
Expand Down
9 changes: 9 additions & 0 deletions x/dex/contract/abci_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contract_test

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -32,3 +33,11 @@ func TestTransferRentFromDexToCollector(t *testing.T) {
collectorBalance := bankkeeper.GetBalance(ctx, testApp.AccountKeeper.GetModuleAddress(authtypes.FeeCollectorName), "usei")
require.Equal(t, int64(80), collectorBalance.Amount.Int64())
}

func TestOrderMatchingRunnablePanicHandler(t *testing.T) {
testApp := keepertest.TestApp()
ctx := testApp.BaseApp.NewContext(false, tmproto.Header{Time: time.Now()})
require.NotPanics(t, func() {
contract.OrderMatchingRunnable(context.Background(), ctx, nil, nil, types.ContractInfoV2{}, nil)
})
}
18 changes: 9 additions & 9 deletions x/dex/contract/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package contract
import (
"context"
"fmt"
"sync"
"time"

otrace "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -130,16 +129,17 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
cancelResults := []*types.Cancellation{}
settlements := []*types.SettlementEntry{}

mu := sync.Mutex{}
wg := sync.WaitGroup{}
// mu := sync.Mutex{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We leave them here so that we can revert them in case we need?

// wg := sync.WaitGroup{}

for _, pair := range registeredPairs {
wg.Add(1)
// wg.Add(1)

pair := pair
pairCtx := ctx.WithMultiStore(multi.NewStore(ctx.MultiStore(), GetPerPairWhitelistMap(contractAddr, pair))).WithEventManager(sdk.NewEventManager())
go func() {
defer wg.Done()
// go func() {
func() {
// defer wg.Done()
pairCopy := pair
pairStr := types.GetPairString(&pairCopy)
orderbook, found := orderBooks.Load(pairStr)
Expand All @@ -150,8 +150,8 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
orderIDToSettledQuantities := GetOrderIDToSettledQuantities(pairSettlements)
PrepareCancelUnfulfilledMarketOrders(pairCtx, typedContractAddr, pairCopy, orderIDToSettledQuantities)

mu.Lock()
defer mu.Unlock()
// mu.Lock()
// defer mu.Unlock()
orders, cancels := GetMatchResults(ctx, typedContractAddr, pairCopy)
orderResults = append(orderResults, orders...)
cancelResults = append(cancelResults, cancels...)
Expand All @@ -160,7 +160,7 @@ func ExecutePairsInParallel(ctx sdk.Context, contractAddr string, dexkeeper *kee
ctx.EventManager().EmitEvents(pairCtx.EventManager().Events())
}()
}
wg.Wait()
// wg.Wait()
dexkeeper.SetMatchResult(ctx, contractAddr, types.NewMatchResult(orderResults, cancelResults, settlements))

return settlements
Expand Down
24 changes: 16 additions & 8 deletions x/dex/contract/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/sei-protocol/sei-chain/utils/datastructures"
"github.com/sei-protocol/sei-chain/utils/logging"
Expand Down Expand Up @@ -134,6 +135,21 @@ func (r *ParallelRunner) Run() {
}

func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
r.sdkCtx.Logger().Error(fmt.Sprintf("panic in parallel runner recovered: %s", err))
}

atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt
select {
case r.someContractFinished <- struct{}{}:
case <-r.done:
// make sure other goroutines can also receive from 'done'
r.done <- struct{}{}
}
}()

contractInfo, _ := r.contractAddrToInfo.Load(contractAddr)
r.runnable(*contractInfo)

Expand All @@ -159,12 +175,4 @@ func (r *ParallelRunner) wrapRunnable(contractAddr types.ContractAddress) {
}
}
}

atomic.AddInt64(&r.inProgressCnt, -1) // this has to happen after any potential increment to readyCnt
select {
case r.someContractFinished <- struct{}{}:
case <-r.done:
// make sure other goroutines can also receive from 'done'
r.done <- struct{}{}
}
}
13 changes: 13 additions & 0 deletions x/dex/contract/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func idleRunnable(_ types.ContractInfoV2) {
atomic.AddInt64(&counter, 1)
}

func panicRunnable(_ types.ContractInfoV2) {
panic("")
}

func dependencyCheckRunnable(contractInfo types.ContractInfoV2) {
if contractInfo.ContractAddr == "C" {
_, hasA := dependencyCheck.Load("A")
Expand Down Expand Up @@ -126,3 +130,12 @@ func TestRunnerParallelContractWithInvalidDependency(t *testing.T) {
_, hasC := dependencyCheck.Load("C")
require.False(t, hasC)
}

func TestRunnerPanicContract(t *testing.T) {
contractInfo := types.ContractInfoV2{
ContractAddr: "A",
NumIncomingDependencies: 0,
}
runner := contract.NewParallelRunner(panicRunnable, []types.ContractInfoV2{contractInfo}, sdkCtx)
require.NotPanics(t, runner.Run)
}
6 changes: 6 additions & 0 deletions x/dex/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@
// EndBlock executes all ABCI EndBlock logic respective to the capability module. It
// returns no validator updates.
func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) (ret []abci.ValidatorUpdate) {
defer func() {
if err := recover(); err != nil {
telemetry.IncrCounter(1, "recovered_panics")
ctx.Logger().Error(fmt.Sprintf("panic in endblock recovered: %s", err))
}

Check warning on line 311 in x/dex/module.go

View check run for this annotation

Codecov / codecov/patch

x/dex/module.go#L309-L311

Added lines #L309 - L311 were not covered by tests
}()
_, span := am.tracingInfo.Start("DexEndBlock")
defer span.End()
defer dexutils.GetMemState(ctx.Context()).Clear(ctx)
Expand Down
Loading