Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax locking contention #427

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@

TmConfig *tmcfg.Config

TracingInfo *tracing.Info
TracingInfo *tracing.Info
TracingEnabled bool

concurrencyWorkers int
occEnabled bool
Expand Down Expand Up @@ -244,7 +245,8 @@
tp := trace.NewNoopTracerProvider()
otel.SetTracerProvider(trace.NewNoopTracerProvider())
tr := tp.Tracer("component-main")
if tracingEnabled := cast.ToBool(appOpts.Get(tracing.FlagTracing)); tracingEnabled {
tracingEnabled := cast.ToBool(appOpts.Get(tracing.FlagTracing))
if tracingEnabled {
tp, err := tracing.DefaultTracerProvider()
if err != nil {
panic(err)
Expand All @@ -267,8 +269,9 @@
grpcQueryRouter: NewGRPCQueryRouter(),
msgServiceRouter: NewMsgServiceRouter(),
},
txDecoder: txDecoder,
TmConfig: tmConfig,
txDecoder: txDecoder,
TmConfig: tmConfig,
TracingEnabled: tracingEnabled,
TracingInfo: &tracing.Info{
Tracer: &tr,
},
Expand Down Expand Up @@ -316,8 +319,8 @@
}

// ConcurrencyWorkers returns the number of concurrent workers for the BaseApp.
func (app *BaseApp) ConcurrencyWorkers() int {
return app.concurrencyWorkers

Check warning on line 323 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L322-L323

Added lines #L322 - L323 were not covered by tests
}

// OccEnabled returns the whether OCC is enabled for the BaseApp.
Expand Down Expand Up @@ -613,8 +616,8 @@
}
}

func (app *BaseApp) prepareProcessProposalState(headerHash []byte) {
app.processProposalState.SetContext(app.processProposalState.Context().

Check warning on line 620 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L619-L620

Added lines #L619 - L620 were not covered by tests
WithHeaderHash(headerHash).
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())))

Expand Down Expand Up @@ -844,11 +847,13 @@
// resources are acceessed by the ante handlers and message handlers.
defer acltypes.SendAllSignalsForTx(ctx.TxCompletionChannels())
acltypes.WaitForAllSignalsForTx(ctx.TxBlockingChannels())
// check for existing parent tracer, and if applicable, use it
spanCtx, span := app.TracingInfo.StartWithContext("RunTx", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(txBytes))))
if app.TracingEnabled {
// check for existing parent tracer, and if applicable, use it
spanCtx, span := app.TracingInfo.StartWithContext("RunTx", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(txBytes))))
}

Check warning on line 856 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L851-L856

Added lines #L851 - L856 were not covered by tests

// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
Expand Down Expand Up @@ -881,9 +886,12 @@
}

if app.anteHandler != nil {
// trace AnteHandler
_, anteSpan := app.TracingInfo.StartWithContext("AnteHandler", ctx.TraceSpanContext())
defer anteSpan.End()
var anteSpan trace.Span
if app.TracingEnabled {
// trace AnteHandler
_, anteSpan := app.TracingInfo.StartWithContext("AnteHandler", ctx.TraceSpanContext())
defer anteSpan.End()
}

Check warning on line 894 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L891-L894

Added lines #L891 - L894 were not covered by tests
var (
anteCtx sdk.Context
msCache sdk.CacheMultiStore
Expand Down Expand Up @@ -927,7 +935,7 @@
storeAccessOpEvents := msCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[acltypes.ANTE_MSG_INDEX]

// TODO: (occ) This is an example of where we do our current validation. Note that this validation operates on the declared dependencies for a TX / antehandler + the utilized dependencies, whereas the validation

Check warning on line 938 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L938

Added line #L938 was not covered by tests
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
Expand All @@ -942,7 +950,9 @@
priority = ctx.Priority()
msCache.Write()
anteEvents = events.ToABCIEvents()
anteSpan.End()
if app.TracingEnabled {
anteSpan.End()
}

Check warning on line 955 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L954-L955

Added lines #L954 - L955 were not covered by tests
}

// Create a new Context based off of the existing Context with a MultiStore branch
Expand Down Expand Up @@ -987,9 +997,11 @@
panic(err)
}
}()
spanCtx, span := app.TracingInfo.StartWithContext("RunMsgs", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
if app.TracingEnabled {
spanCtx, span := app.TracingInfo.StartWithContext("RunMsgs", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
}

Check warning on line 1004 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L1001-L1004

Added lines #L1001 - L1004 were not covered by tests
msgLogs := make(sdk.ABCIMessageLogs, 0, len(msgs))
events := sdk.EmptyEvents()
txMsgData := &sdk.TxMsgData{
Expand Down Expand Up @@ -1069,8 +1081,8 @@
storeAccessOpEvents := msgMsCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[i]
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
// TODO: (occ) This is where we are currently validating our per message dependencies,
// whereas validation will be done holistically based on the mvkv for OCC approach

Check warning on line 1085 in baseapp/baseapp.go

View check run for this annotation

Codecov / codecov/patch

baseapp/baseapp.go#L1084-L1085

Added lines #L1084 - L1085 were not covered by tests
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
ctx.Logger().Info((fmt.Sprintf("eventMsgName=%s Missing Access Operation:%s ", eventMsgName, op.String())))
Expand Down
23 changes: 8 additions & 15 deletions store/cachekv/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
//
// TODO: Optimize by memoizing.
type cacheMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
storeKey sdktypes.StoreKey
eventManager *sdktypes.EventManager
parent types.Iterator
cache types.Iterator
ascending bool
storeKey sdktypes.StoreKey
}

var _ types.Iterator = (*cacheMergeIterator)(nil)
Expand All @@ -29,14 +28,12 @@ func NewCacheMergeIterator(
parent, cache types.Iterator,
ascending bool,
storeKey sdktypes.StoreKey,
eventManager *sdktypes.EventManager,
) *cacheMergeIterator {
iter := &cacheMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
storeKey: storeKey,
eventManager: eventManager,
parent: parent,
cache: cache,
ascending: ascending,
storeKey: storeKey,
}

return iter
Expand Down Expand Up @@ -138,14 +135,12 @@ func (iter *cacheMergeIterator) Value() []byte {
// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.cache.Key(), value)
return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.parent.Key(), value)
return value
}

Expand All @@ -156,11 +151,9 @@ func (iter *cacheMergeIterator) Value() []byte {
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyP, value)
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyC, value)
return value
default:
panic("invalid comparison result")
Expand Down
28 changes: 6 additions & 22 deletions store/cachekv/mergeiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
)

func TestMangerIterator(t *testing.T) {
// initiate mock kvstore
mem := dbadapter.Store{DB: dbm.NewMemDB()}
eventManager := sdktypes.NewEventManager()
kvstore := cachekv.NewStore(mem, types.NewKVStoreKey("CacheKvTest"), types.DefaultCacheSizeLimit)
value := randSlice(defaultValueSizeBz)
startKey := randSlice(32)
Expand All @@ -29,27 +27,13 @@ func TestMangerIterator(t *testing.T) {
cache := kvstore.Iterator(nil, nil)
for ; cache.Valid(); cache.Next() {
}
iter := cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"), eventManager)
iter := cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"))

// get the next value
iter.Value()

// assert the resource access is still emitted correctly when the cache store is unavailable
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))

// assert event emission when cache is available
cache = kvstore.Iterator(keys[1], keys[2])
iter = cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"), eventManager)
// get the next value and it should not be nil
nextValue := iter.Value()
require.NotNil(t, nextValue)

// get the next value
iter.Value()

// assert the resource access is still emitted correctly when the cache store is available
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))
nextValue = iter.Value()
require.NotNil(t, nextValue)
}
7 changes: 1 addition & 6 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func (store *Store) GetEvents() []abci.Event {

// Implements Store
func (store *Store) ResetEvents() {
store.mtx.Lock()
defer store.mtx.Unlock()
store.eventManager = sdktypes.NewEventManager()
}

Expand All @@ -77,7 +75,6 @@ func (store *Store) getFromCache(key []byte) []byte {
// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
types.AssertValidKey(key)
store.eventManager.EmitResourceAccessReadEvent("get", store.storeKey, key, value)
return store.getFromCache(key)
}

Expand All @@ -86,13 +83,11 @@ func (store *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)
store.setCacheValue(key, value, false, true)
store.eventManager.EmitResourceAccessWriteEvent("set", store.storeKey, key, value)
}

// Has implements types.KVStore.
func (store *Store) Has(key []byte) bool {
value := store.Get(key)
store.eventManager.EmitResourceAccessReadEvent("has", store.storeKey, key, value)
return value != nil
}

Expand Down Expand Up @@ -194,7 +189,7 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
}()
store.dirtyItems(start, end)
cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending, store.eventManager, store.storeKey)
return NewCacheMergeIterator(parent, cache, ascending, store.storeKey, store.eventManager)
return NewCacheMergeIterator(parent, cache, ascending, store.storeKey)
}

func findStartIndex(strL []string, startQ string) int {
Expand Down
Loading