diff --git a/x/dex/keeper/price.go b/x/dex/keeper/price.go index ad6c472e74..d18cc1ce0f 100644 --- a/x/dex/keeper/price.go +++ b/x/dex/keeper/price.go @@ -16,12 +16,12 @@ func (k Keeper) SetPriceState(ctx sdk.Context, price types.Price, contractAddr s func (k Keeper) DeletePriceStateBefore(ctx sdk.Context, contractAddr string, timestamp uint64, pair types.Pair) { store := prefix.NewStore(ctx.KVStore(k.storeKey), types.PricePrefix(contractAddr, pair.PriceDenom, pair.AssetDenom)) - for _, key := range k.getPriceKeysToDelete(store, timestamp) { + for _, key := range k.GetPriceKeysToDelete(store, timestamp) { store.Delete(key) } } -func (k Keeper) getPriceKeysToDelete(store sdk.KVStore, timestamp uint64) [][]byte { +func (k Keeper) GetPriceKeysToDelete(store sdk.KVStore, timestamp uint64) [][]byte { keys := [][]byte{} iterator := sdk.KVStorePrefixIterator(store, []byte{}) defer iterator.Close() diff --git a/x/dex/module.go b/x/dex/module.go index d14087c0f6..231f8b3137 100644 --- a/x/dex/module.go +++ b/x/dex/module.go @@ -4,15 +4,15 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/CosmWasm/wasmd/x/wasm" + "github.com/cosmos/cosmos-sdk/store/prefix" "github.com/cosmos/cosmos-sdk/telemetry" "github.com/gorilla/mux" "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/spf13/cobra" - "go.opentelemetry.io/otel/attribute" - abci "github.com/tendermint/tendermint/abci/types" "github.com/cosmos/cosmos-sdk/client" @@ -20,7 +20,6 @@ import ( cdctypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" - seisync "github.com/sei-protocol/sei-chain/sync" "github.com/sei-protocol/sei-chain/utils" "github.com/sei-protocol/sei-chain/utils/datastructures" "github.com/sei-protocol/sei-chain/utils/tracing" @@ -256,30 +255,57 @@ func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) { am.keeper.SetEpoch(ctx, currentEpoch) } cachedCtx, cachedStore := store.GetCachedContext(ctx) - gasLimit := am.keeper.GetParams(ctx).BeginBlockGasLimit - for _, contract := range am.getAllContractInfo(ctx) { - am.beginBlockForContract(cachedCtx, contract, gasLimit) + priceRetention := am.keeper.GetParams(ctx).PriceSnapshotRetention + cutOffTime := uint64(ctx.BlockTime().Unix()) - priceRetention + wg := sync.WaitGroup{} + mutex := sync.Mutex{} + allContracts := am.getAllContractInfo(ctx) + allPricesToDelete := make(map[string][]*types.PriceStore, len(allContracts)) + + // Parallelize the logic to find all prices to delete + for _, contract := range allContracts { + wg.Add(1) + go func(contract types.ContractInfoV2) { + priceKeysToDelete := am.getPriceToDelete(cachedCtx, contract, cutOffTime) + mutex.Lock() + allPricesToDelete[contract.ContractAddr] = priceKeysToDelete + mutex.Unlock() + wg.Done() + }(contract) + } + wg.Wait() + + // Execute the deletion in order + for _, contract := range allContracts { + if priceStores, found := allPricesToDelete[contract.ContractAddr]; found { + for _, priceStore := range priceStores { + for _, key := range priceStore.PriceKeys { + priceStore.Store.Delete(key) + } + } + } } // only write if all contracts have been processed cachedStore.Write() } -func (am AppModule) beginBlockForContract(ctx sdk.Context, contract types.ContractInfoV2, gasLimit uint64) { - _, span := am.tracingInfo.Start("DexBeginBlock") - contractAddr := contract.ContractAddr - span.SetAttributes(attribute.String("contract", contractAddr)) - defer span.End() - - ctx = ctx.WithGasMeter(seisync.NewGasWrapper(dexutils.GetGasMeterForLimit(gasLimit))) - +func (am AppModule) getPriceToDelete( + ctx sdk.Context, + contract types.ContractInfoV2, + timestamp uint64, +) []*types.PriceStore { + var result []*types.PriceStore if contract.NeedOrderMatching { - currentTimestamp := uint64(ctx.BlockTime().Unix()) - ctx.Logger().Debug(fmt.Sprintf("Removing stale prices for ts %d", currentTimestamp)) - priceRetention := am.keeper.GetParams(ctx).PriceSnapshotRetention - for _, pair := range am.keeper.GetAllRegisteredPairs(ctx, contractAddr) { - am.keeper.DeletePriceStateBefore(ctx, contractAddr, currentTimestamp-priceRetention, pair) + for _, pair := range am.keeper.GetAllRegisteredPairs(ctx, contract.ContractAddr) { + store := prefix.NewStore(ctx.KVStore(am.keeper.GetStoreKey()), types.PricePrefix(contract.ContractAddr, pair.PriceDenom, pair.AssetDenom)) + keysToDelete := am.keeper.GetPriceKeysToDelete(store, timestamp) + result = append(result, &types.PriceStore{ + Store: store, + PriceKeys: keysToDelete, + }) } } + return result } // EndBlock executes all ABCI EndBlock logic respective to the capability module. It diff --git a/x/dex/types/orders.go b/x/dex/types/orders.go index 6405561803..e608bb4f8d 100644 --- a/x/dex/types/orders.go +++ b/x/dex/types/orders.go @@ -1,6 +1,7 @@ package types import ( + "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/sei-protocol/sei-chain/utils" ) @@ -151,6 +152,11 @@ func (m *LongBook) SetPrice(p sdk.Dec) { m.Price = p } +type PriceStore struct { + Store prefix.Store + PriceKeys [][]byte +} + func (m *LongBook) GetPrice() sdk.Dec { return m.Price }