Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(runtime/v2): untie runtimev2 from the legacy usage of gRPC #22043

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"errors"
"slices"

gogoproto "github.com/cosmos/gogoproto/proto"

runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/registry"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
Expand Down Expand Up @@ -43,9 +42,8 @@ type App[T transaction.Tx] struct {
amino registry.AminoRegistrar
moduleManager *MM[T]

// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
// QueryHandlers defines the query handlers
QueryHandlers map[string]appmodulev2.Handler

storeLoader StoreLoader
}
Expand Down Expand Up @@ -120,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return a.GRPCMethodsToMessageMap
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
return a.QueryHandlers
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}
156 changes: 88 additions & 68 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,46 +615,46 @@
}

func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
grpcQueryDecoders: map[string]func() gogoproto.Message{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}

// case module with services
if services, ok := s.(hasServicesV1); ok {
c := &configurator{
queryHandlers: map[string]appmodulev2.Handler{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}
Comment on lines +620 to +626
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify struct initialization and avoid field alignment

Per the Uber Go style guide, aligning struct fields using extra spaces is discouraged. Additionally, explicitly initializing the err field to nil is unnecessary since the zero value of an error is nil.

Apply this diff to simplify the struct initialization:

 c := &configurator{
-    queryHandlers:  map[string]appmodulev2.Handler{},
-    stfQueryRouter: app.queryRouterBuilder,
-    stfMsgRouter:   app.msgRouterBuilder,
-    registry:       registry,
-    err:            nil,
+    queryHandlers: map[string]appmodulev2.Handler{},
+    stfQueryRouter: app.queryRouterBuilder,
+    stfMsgRouter: app.msgRouterBuilder,
+    registry: registry,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
c := &configurator{
queryHandlers: map[string]appmodulev2.Handler{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}
c := &configurator{
queryHandlers: map[string]appmodulev2.Handler{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
}

if err := services.RegisterServices(c); err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
} else {
// If module not implement RegisterServices, register msg & query handler.
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterMsgHandlers(&wrapper)
if wrapper.error != nil {
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
}
}

if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterQueryHandlers(&wrapper)

for path, decoder := range wrapper.decoders {
app.GRPCMethodsToMessageMap[path] = decoder
}
if c.err != nil {
app.logger.Warn("error registering services", "error", c.err)
}

// merge maps
for path, decoder := range c.queryHandlers {
app.QueryHandlers[path] = decoder
}
Comment on lines +636 to +638

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
}

if c.err != nil {
app.logger.Warn("error registering services", "error", c.err)
// if module implements register msg handlers
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
module.RegisterMsgHandlers(&wrapper)
if wrapper.error != nil {
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
}
}

// merge maps
for path, decoder := range c.grpcQueryDecoders {
app.GRPCMethodsToMessageMap[path] = decoder
// if module implements register query handlers
if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
wrapper := stfRouterWrapper{stfRouter: app.queryRouterBuilder}
module.RegisterQueryHandlers(&wrapper)

for path, handler := range wrapper.handlers {
app.QueryHandlers[path] = handler
}
Comment on lines +655 to +657

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
}

return nil
Expand All @@ -663,9 +663,7 @@
var _ grpc.ServiceRegistrar = (*configurator)(nil)

type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func() gogoproto.Message
queryHandlers map[string]appmodulev2.Handler

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
Expand Down Expand Up @@ -697,61 +695,59 @@
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
// TODO(tip): what if a query is not deterministic?
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)

handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register query handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
return fmt.Errorf("unable to make a appmodulev2.HandlerFunc from gRPC handler (%s, %s): %w", sd.ServiceName, md.MethodName, err)
}

// register gRPC query method.
typ := gogoproto.MessageType(requestFullName)
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func() gogoproto.Message {
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
// register to stf query router.
err = c.stfQueryRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
if err != nil {
return fmt.Errorf("unable to register handler to stf router (%s, %s): %w", sd.ServiceName, md.MethodName, err)
}
methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName)
c.grpcQueryDecoders[methodName] = decoderFunc

// register query handler using the same mapping used in stf
c.queryHandlers[gogoproto.MessageName(handler.MakeMsg())] = handler
}
return nil
}

func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
if err != nil {
return err
}
err = c.stfMsgRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
if err != nil {
return fmt.Errorf("unable to register msg handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
}
}
return nil
}

// requestFullNameFromMethodDesc returns the fully-qualified name of the request message of the provided service's method.
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, error) {
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return "", fmt.Errorf("invalid method descriptor %s", methodFullName)
}
return methodDesc.Input().FullName(), nil
}

func registerMethod(
stfRouter *stf.MsgRouterBuilder,
// grpcHandlerToAppModuleHandler converts a gRPC handler into an appmodulev2.HandlerFunc.
func grpcHandlerToAppModuleHandler(
sd *grpc.ServiceDesc,
md grpc.MethodDesc,
ss interface{},
) (string, error) {
requestName, err := requestFullNameFromMethodDesc(sd, md)
) (appmodulev2.Handler, error) {
requestName, responseName, err := requestFullNameFromMethodDesc(sd, md)
if err != nil {
return "", err
return appmodulev2.Handler{}, err
}

return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
requestTyp := gogoproto.MessageType(string(requestName))
if requestTyp == nil {
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", requestName)
}
responseTyp := gogoproto.MessageType(string(responseName))
if responseTyp == nil {
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", responseName)
}

handlerFunc := func(
ctx context.Context,
msg transaction.Msg,
) (resp transaction.Msg, err error) {
Expand All @@ -760,7 +756,17 @@
return nil, err
}
return res.(transaction.Msg), nil
})
}

return appmodulev2.Handler{
Func: handlerFunc,
MakeMsg: func() transaction.Msg {
return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
},
MakeMsgResp: func() transaction.Msg {
return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
},
Comment on lines +763 to +768
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add checks for type assertions in MakeMsg and MakeMsgResp

The type assertions in MakeMsg and MakeMsgResp may cause panics if the created instances do not implement transaction.Msg. Consider adding checks to handle potential type assertion failures.

Apply this diff to safely handle the type assertions:

 MakeMsg: func() transaction.Msg {
-	return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
+	msg, ok := reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
+	if !ok {
+		return nil
+	}
+	return msg
},
MakeMsgResp: func() transaction.Msg {
-	return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
+	msgResp, ok := reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
+	if !ok {
+		return nil
+	}
+	return msgResp
},
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
MakeMsg: func() transaction.Msg {
return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
},
MakeMsgResp: func() transaction.Msg {
return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
},
MakeMsg: func() transaction.Msg {
msg, ok := reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
if !ok {
return nil
}
return msg
},
MakeMsgResp: func() transaction.Msg {
msgResp, ok := reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
if !ok {
return nil
}
return msgResp
},

}, nil
}

func noopDecoder(_ interface{}) error { return nil }
Expand All @@ -776,6 +782,20 @@
}
}

// requestFullNameFromMethodDesc returns the fully-qualified name of the request message and response of the provided service's method.
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, protoreflect.FullName, error) {
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return "", "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return "", "", fmt.Errorf("invalid method descriptor %s", methodFullName)
}
return methodDesc.Input().FullName(), methodDesc.Output().FullName(), nil
}

// defaultMigrationsOrder returns a default migrations order: ascending alphabetical by module name,
// except x/auth which will run last, see:
// https://github.com/cosmos/cosmos-sdk/issues/10591
Expand Down Expand Up @@ -815,7 +835,7 @@

error error

decoders map[string]func() gogoproto.Message
handlers map[string]appmodulev2.Handler
}

func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) {
Expand All @@ -831,7 +851,7 @@

// also make the decoder
if s.error == nil {
s.decoders = map[string]func() gogoproto.Message{}
s.handlers = map[string]appmodulev2.Handler{}
}
s.decoders[requestName] = handler.MakeMsg
s.handlers[requestName] = handler
Comment on lines +854 to +856
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid re-initializing the handlers map to prevent overwriting

The handlers map is being re-initialized inside the RegisterHandler method whenever s.error == nil, which can cause previously registered handlers to be lost. Initialize the handlers map only once, preferably when the stfRouterWrapper is created, or check if it is nil before initializing.

Apply this diff to fix the issue:

 if s.error == nil {
-	s.handlers = map[string]appmodulev2.Handler{}
+	if s.handlers == nil {
+		s.handlers = map[string]appmodulev2.Handler{}
+	}
 }
 s.handlers[requestName] = handler
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
s.handlers = map[string]appmodulev2.Handler{}
}
s.decoders[requestName] = handler.MakeMsg
s.handlers[requestName] = handler
if s.error == nil {
if s.handlers == nil {
s.handlers = map[string]appmodulev2.Handler{}
}
}
s.handlers[requestName] = handler

}
14 changes: 7 additions & 7 deletions runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func ProvideAppBuilder[T transaction.Tx](

msgRouterBuilder := stf.NewMsgRouterBuilder()
app := &App[T]{
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
GRPCMethodsToMessageMap: map[string]func() proto.Message{},
storeLoader: DefaultStoreLoader,
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
QueryHandlers: map[string]appmodulev2.Handler{},
storeLoader: DefaultStoreLoader,
}
appBuilder := &AppBuilder[T]{app: app}

Expand Down
9 changes: 5 additions & 4 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"
"strconv"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"github.com/cosmos/gogoproto/proto"
"github.com/spf13/pflag"
"google.golang.org/grpc"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.L
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
methodsMap := appI.GetGPRCMethodsToMessageMap()
methodsMap := appI.GetQueryHandlers()

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
Expand All @@ -80,7 +81,7 @@ func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
return flags
}

func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface {
func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier interface {
Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error)
},
) grpc.StreamHandler {
Expand All @@ -89,12 +90,12 @@ func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, queri
if !ok {
return status.Error(codes.InvalidArgument, "unable to get method")
}
makeMsg, exists := messageMap[method]
handler, exists := handlers[method]
if !exists {
return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method)
}
for {
req := makeMsg()
req := handler()
err := stream.RecvMsg(req)
if err != nil {
if errors.Is(err, io.EOF) {
Expand Down
9 changes: 6 additions & 3 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"fmt"
"sync/atomic"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -61,7 +62,7 @@
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

grpcMethodsMap map[string]func() transaction.Msg // maps gRPC method to message creator func
queryHandlersMap map[string]appmodulev2.Handler
}

func NewConsensus[T transaction.Tx](
Expand All @@ -70,7 +71,7 @@
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
gRPCMethodsMap map[string]func() transaction.Msg,
queryHandlersMap map[string]appmodulev2.Handler,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
Expand All @@ -79,7 +80,7 @@
return &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
grpcMethodsMap: gRPCMethodsMap,
queryHandlersMap: queryHandlersMap,
app: app,
cfg: cfg,
store: store,
Expand Down Expand Up @@ -192,6 +193,8 @@
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// check if it's a gRPC method
desc, err := gogoproto.MergedRegistry()
Fixed Show fixed Hide fixed

makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path]
if isGRPC {
protoRequest := makeGRPCRequest()
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
appI.GetAppManager(),
s.serverOptions.Mempool(cfg),
indexEvents,
appI.GetGPRCMethodsToMessageMap(),
appI.GetQueryHandlers(),
store,
s.config,
s.initTxCodec,
Expand Down
Loading
Loading