Skip to content

Commit

Permalink
feat(stf): change router service to extract the router at runtime rat…
Browse files Browse the repository at this point in the history
…her than build time (#20724)

Co-authored-by: unknown unknown <unknown@unknown>
  • Loading branch information
testinginprod and unknown unknown authored Jun 20, 2024
1 parent 7b5d3e5 commit 11b8f11
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 137 deletions.
22 changes: 9 additions & 13 deletions runtime/v2/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,12 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) {
return nil, err
}

stfMsgHandler, err := a.app.msgRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("failed to build STF message handler: %w", err)
}

stfQueryHandler, err := a.app.queryRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("failed to build query handler: %w", err)
}

endBlocker, valUpdate := a.app.moduleManager.EndBlock()

a.app.stf = stf.NewSTF[transaction.Tx](
stfMsgHandler,
stfQueryHandler,
stf, err := stf.NewSTF[transaction.Tx](
a.app.logger.With("module", "stf"),
a.app.msgRouterBuilder,
a.app.queryRouterBuilder,
a.app.moduleManager.PreBlocker(),
a.app.moduleManager.BeginBlock(),
endBlocker,
Expand All @@ -119,6 +110,11 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) {
a.postTxExec,
a.branch,
)
if err != nil {
return nil, fmt.Errorf("failed to create STF: %w", err)
}

a.app.stf = stf

rs, err := rootstore.CreateRootStore(a.storeOptions)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func ProvideEnvironment(logger log.Logger, config *runtimev2.Module, key depinje
EventService: stf.NewEventService(),
GasService: stf.NewGasMeterService(),
HeaderService: stf.HeaderService{},
QueryRouterService: stf.NewQueryRouterService(appBuilder.app.queryRouterBuilder),
MsgRouterService: stf.NewMsgRouterService(appBuilder.app.msgRouterBuilder),
QueryRouterService: stf.NewQueryRouterService(),
MsgRouterService: stf.NewMsgRouterService([]byte(key.Name())),
TransactionService: services.NewContextAwareTransactionService(),
KVStoreService: kvService,
MemStoreService: memKvService,
Expand Down
13 changes: 6 additions & 7 deletions server/v2/stf/core_branch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/stf/branch"
"cosmossdk.io/server/v2/stf/gas"
"cosmossdk.io/server/v2/stf/mock"
)

func TestBranchService(t *testing.T) {
s := &STF[mock.Tx]{
handleMsg: func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) {
kvSet(t, ctx, "exec")
return nil, nil
},
handleQuery: nil,
doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil },
doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil },
doBeginBlock: func(ctx context.Context) error {
kvSet(t, ctx, "begin-block")
return nil
Expand All @@ -43,6 +38,10 @@ func TestBranchService(t *testing.T) {
makeGasMeter: gas.DefaultGasMeter,
makeGasMeteredState: gas.DefaultWrapWithGasMeter,
}
addMsgHandlerToSTF(t, s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) {
kvSet(t, ctx, "exec")
return nil, nil
})

makeContext := func() *executionContext {
state := mock.DB()
Expand Down
92 changes: 22 additions & 70 deletions server/v2/stf/core_router_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,72 @@ package stf

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"google.golang.org/protobuf/runtime/protoiface"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/router"
"cosmossdk.io/core/transaction"
)

// NewMsgRouterService implements router.Service.
func NewMsgRouterService(msgRouterBuilder *MsgRouterBuilder) router.Service {
msgRouter, err := msgRouterBuilder.Build()
if err != nil {
panic(fmt.Errorf("cannot create msgRouter: %w", err))
}

return &msgRouterService{
builder: msgRouterBuilder,
handler: msgRouter,
}
func NewMsgRouterService(identity transaction.Identity) router.Service {
return msgRouterService{identity: identity}
}

var _ router.Service = (*msgRouterService)(nil)

type msgRouterService struct {
builder *MsgRouterBuilder
handler appmodulev2.Handler
// TODO(tip): the identity sits here for the purpose of disallowing modules to impersonate others (sudo).
// right now this is not used, but it serves the reminder of something that we should be eventually
// looking into.
identity []byte
}

// CanInvoke returns an error if the given message cannot be invoked.
func (m *msgRouterService) CanInvoke(ctx context.Context, typeURL string) error {
if typeURL == "" {
return errors.New("missing type url")
}

typeURL = strings.TrimPrefix(typeURL, "/")
if exists := m.builder.HandlerExists(typeURL); exists {
return fmt.Errorf("unknown request: %s", typeURL)
}

return nil
func (m msgRouterService) CanInvoke(ctx context.Context, typeURL string) error {
return ctx.(*executionContext).msgRouter.CanInvoke(ctx, typeURL)
}

// InvokeTyped execute a message and fill-in a response.
// The response must be known and passed as a parameter.
// Use InvokeUntyped if the response type is not known.
func (m *msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error {
// see https://github.com/cosmos/cosmos-sdk/pull/20349
panic("not implemented")
func (m msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error {
return ctx.(*executionContext).msgRouter.InvokeTyped(ctx, msg, resp)
}

// InvokeUntyped execute a message and returns a response.
func (m *msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) {
return m.handler(ctx, msg)
func (m msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) {
return ctx.(*executionContext).msgRouter.InvokeUntyped(ctx, msg)
}

// NewQueryRouterService implements router.Service.
func NewQueryRouterService(queryRouterBuilder *MsgRouterBuilder) router.Service {
return &queryRouterService{
builder: queryRouterBuilder,
}
func NewQueryRouterService() router.Service {
return queryRouterService{}
}

var _ router.Service = (*queryRouterService)(nil)

type queryRouterService struct {
builder *MsgRouterBuilder
handler appmodulev2.Handler
}
type queryRouterService struct{}

// CanInvoke returns an error if the given request cannot be invoked.
func (m *queryRouterService) CanInvoke(ctx context.Context, typeURL string) error {
if typeURL == "" {
return errors.New("missing type url")
}

typeURL = strings.TrimPrefix(typeURL, "/")
if exists := m.builder.HandlerExists(typeURL); exists {
return fmt.Errorf("unknown request: %s", typeURL)
}

return nil
func (m queryRouterService) CanInvoke(ctx context.Context, typeURL string) error {
return ctx.(*executionContext).queryRouter.CanInvoke(ctx, typeURL)
}

// InvokeTyped execute a message and fill-in a response.
// The response must be known and passed as a parameter.
// Use InvokeUntyped if the response type is not known.
func (m *queryRouterService) InvokeTyped(
func (m queryRouterService) InvokeTyped(
ctx context.Context,
req, resp protoiface.MessageV1,
) error {
// TODO lazy initialization is ugly and not thread safe. we don't want to check a mutex on every InvokeTyped either.
if m.handler == nil {
var err error
m.handler, err = m.builder.Build()
if err != nil {
return fmt.Errorf("cannot create queryRouter: %w", err)
}
}
// reflection is required, see https://github.com/cosmos/cosmos-sdk/pull/20349
res, err := m.handler(ctx, req)
if err != nil {
return err
}
reflect.Indirect(reflect.ValueOf(resp)).Set(reflect.Indirect(reflect.ValueOf(res)))
return nil
return ctx.(*executionContext).queryRouter.InvokeTyped(ctx, req, resp)
}

// InvokeUntyped execute a message and returns a response.
func (m *queryRouterService) InvokeUntyped(
func (m queryRouterService) InvokeUntyped(
ctx context.Context,
req protoiface.MessageV1,
) (protoiface.MessageV1, error) {
return m.handler(ctx, req)
return ctx.(*executionContext).queryRouter.InvokeUntyped(ctx, req)
}
57 changes: 38 additions & 19 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"cosmossdk.io/core/gas"
"cosmossdk.io/core/header"
"cosmossdk.io/core/log"
"cosmossdk.io/core/router"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
stfgas "cosmossdk.io/server/v2/stf/gas"
Expand All @@ -23,9 +24,10 @@ var Identity = []byte("stf")

// STF is a struct that manages the state transition component of the app.
type STF[T transaction.Tx] struct {
logger log.Logger
handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error)
handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error)
logger log.Logger

msgRouter Router
queryRouter Router

doPreBlock func(ctx context.Context, txs []T) error
doBeginBlock func(ctx context.Context) error
Expand All @@ -42,29 +44,40 @@ type STF[T transaction.Tx] struct {

// NewSTF returns a new STF instance.
func NewSTF[T transaction.Tx](
handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error),
handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error),
logger log.Logger,
msgRouterBuilder *MsgRouterBuilder,
queryRouterBuilder *MsgRouterBuilder,
doPreBlock func(ctx context.Context, txs []T) error,
doBeginBlock func(ctx context.Context) error,
doEndBlock func(ctx context.Context) error,
doTxValidation func(ctx context.Context, tx T) error,
doValidatorUpdate func(ctx context.Context) ([]appmodulev2.ValidatorUpdate, error),
postTxExec func(ctx context.Context, tx T, success bool) error,
branch func(store store.ReaderMap) store.WriterMap,
) *STF[T] {
) (*STF[T], error) {
msgRouter, err := msgRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("build msg router: %w", err)
}
queryRouter, err := queryRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("build query router: %w", err)
}

return &STF[T]{
handleMsg: handleMsg,
handleQuery: handleQuery,
logger: logger,
msgRouter: msgRouter,
queryRouter: queryRouter,
doPreBlock: doPreBlock,
doBeginBlock: doBeginBlock,
doEndBlock: doEndBlock,
doTxValidation: doTxValidation,
doValidatorUpdate: doValidatorUpdate,
doTxValidation: doTxValidation,
postTxExec: postTxExec, // TODO
branchFn: branch,
makeGasMeter: stfgas.DefaultGasMeter,
makeGasMeteredState: stfgas.DefaultWrapWithGasMeter,
}
}, nil
}

// DeliverBlock is our state transition function.
Expand Down Expand Up @@ -310,7 +323,7 @@ func (s STF[T]) runTxMsgs(
execCtx.setGasLimit(gasLimit)
for i, msg := range msgs {
execCtx.sender = txSenders[i]
resp, err := s.handleMsg(execCtx, msg)
resp, err := s.msgRouter.InvokeUntyped(execCtx, msg)
if err != nil {
return nil, 0, nil, fmt.Errorf("message execution at index %d failed: %w", i, err)
}
Expand Down Expand Up @@ -346,7 +359,7 @@ func (s STF[T]) runConsensusMessages(
) ([]transaction.Msg, error) {
responses := make([]transaction.Msg, len(messages))
for i := range messages {
resp, err := s.handleMsg(ctx, messages[i])
resp, err := s.msgRouter.InvokeUntyped(ctx, messages[i])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,11 +511,7 @@ func (s STF[T]) Query(
queryCtx := s.makeContext(ctx, nil, queryState, internal.ExecModeSimulate)
queryCtx.setHeaderInfo(hi)
queryCtx.setGasLimit(gasLimit)
return s.handleQuery(queryCtx, req)
}

func (s STF[T]) Message(ctx context.Context, msg transaction.Msg) (response transaction.Msg, err error) {
return s.handleMsg(ctx, msg)
return s.queryRouter.InvokeUntyped(queryCtx, req)
}

// RunWithCtx is made to support genesis, if genesis was just the execution of messages instead
Expand All @@ -521,8 +530,9 @@ func (s STF[T]) RunWithCtx(
// clone clones STF.
func (s STF[T]) clone() STF[T] {
return STF[T]{
handleMsg: s.handleMsg,
handleQuery: s.handleQuery,
logger: s.logger,
msgRouter: s.msgRouter,
queryRouter: s.queryRouter,
doPreBlock: s.doPreBlock,
doBeginBlock: s.doBeginBlock,
doEndBlock: s.doEndBlock,
Expand Down Expand Up @@ -558,6 +568,9 @@ type executionContext struct {
branchFn branchFn
makeGasMeter makeGasMeterFn
makeGasMeteredStore makeGasMeteredStateFn

msgRouter router.Service
queryRouter router.Service
}

// setHeaderInfo sets the header info in the state to be used by queries in the future.
Expand Down Expand Up @@ -599,6 +612,8 @@ func (s STF[T]) makeContext(
sender,
store,
execMode,
s.msgRouter,
s.queryRouter,
)
}

Expand All @@ -610,6 +625,8 @@ func newExecutionContext(
sender transaction.Identity,
state store.WriterMap,
execMode transaction.ExecMode,
msgRouter Router,
queryRouter Router,
) *executionContext {
meter := makeGasMeterFn(gas.NoGasLimit)
meteredState := makeGasMeteredStoreFn(meter, state)
Expand All @@ -626,6 +643,8 @@ func newExecutionContext(
branchFn: branchFn,
makeGasMeter: makeGasMeterFn,
makeGasMeteredStore: makeGasMeteredStoreFn,
msgRouter: msgRouter,
queryRouter: queryRouter,
}
}

Expand Down
Loading

0 comments on commit 11b8f11

Please sign in to comment.