From 80657a51b5ac1f1ebc302d57eb0f6f30cace76f2 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 13:23:34 +0200 Subject: [PATCH 1/7] handlers continuation --- runtime/v2/app.go | 12 ++- runtime/v2/manager.go | 156 ++++++++++++++++++++--------------- runtime/v2/module.go | 14 ++-- server/v2/api/grpc/server.go | 9 +- server/v2/cometbft/abci.go | 9 +- server/v2/cometbft/server.go | 2 +- server/v2/server_test.go | 5 +- server/v2/types.go | 4 +- 8 files changed, 117 insertions(+), 94 deletions(-) diff --git a/runtime/v2/app.go b/runtime/v2/app.go index b7104f9e4774..caf55b92ce01 100644 --- a/runtime/v2/app.go +++ b/runtime/v2/app.go @@ -5,9 +5,8 @@ import ( "errors" "slices" - gogoproto "github.com/cosmos/gogoproto/proto" - runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2" + appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/registry" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -43,9 +42,8 @@ type App[T transaction.Tx] struct { amino registry.AminoRegistrar moduleManager *MM[T] - // GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request - // bytes into a gogoproto.Message, which then can be passed to appmanager. - GRPCMethodsToMessageMap map[string]func() gogoproto.Message + // QueryHandlers defines the query handlers + QueryHandlers map[string]appmodulev2.Handler storeLoader StoreLoader } @@ -120,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] { return a.AppManager } -func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { - return a.GRPCMethodsToMessageMap +func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler { + return a.QueryHandlers } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 02df9ac750f3..01b0c93971ab 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -615,46 +615,46 @@ func (m *MM[T]) assertNoForgottenModules( } func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], registry *protoregistry.Files) error { - c := &configurator{ - grpcQueryDecoders: map[string]func() gogoproto.Message{}, - stfQueryRouter: app.queryRouterBuilder, - stfMsgRouter: app.msgRouterBuilder, - registry: registry, - err: nil, - } - + // case module with services if services, ok := s.(hasServicesV1); ok { + c := &configurator{ + queryHandlers: map[string]appmodulev2.Handler{}, + stfQueryRouter: app.queryRouterBuilder, + stfMsgRouter: app.msgRouterBuilder, + registry: registry, + err: nil, + } if err := services.RegisterServices(c); err != nil { return fmt.Errorf("unable to register services: %w", err) } - } else { - // If module not implement RegisterServices, register msg & query handler. - if module, ok := s.(appmodulev2.HasMsgHandlers); ok { - wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder} - module.RegisterMsgHandlers(&wrapper) - if wrapper.error != nil { - return fmt.Errorf("unable to register handlers: %w", wrapper.error) - } - } - if module, ok := s.(appmodulev2.HasQueryHandlers); ok { - wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder} - module.RegisterQueryHandlers(&wrapper) - - for path, decoder := range wrapper.decoders { - app.GRPCMethodsToMessageMap[path] = decoder - } + if c.err != nil { + app.logger.Warn("error registering services", "error", c.err) } + // merge maps + for path, decoder := range c.queryHandlers { + app.QueryHandlers[path] = decoder + } } - if c.err != nil { - app.logger.Warn("error registering services", "error", c.err) + // if module implements register msg handlers + if module, ok := s.(appmodulev2.HasMsgHandlers); ok { + wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder} + module.RegisterMsgHandlers(&wrapper) + if wrapper.error != nil { + return fmt.Errorf("unable to register handlers: %w", wrapper.error) + } } - // merge maps - for path, decoder := range c.grpcQueryDecoders { - app.GRPCMethodsToMessageMap[path] = decoder + // if module implements register query handlers + if module, ok := s.(appmodulev2.HasQueryHandlers); ok { + wrapper := stfRouterWrapper{stfRouter: app.queryRouterBuilder} + module.RegisterQueryHandlers(&wrapper) + + for path, handler := range wrapper.handlers { + app.QueryHandlers[path] = handler + } } return nil @@ -663,9 +663,7 @@ func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], re var _ grpc.ServiceRegistrar = (*configurator)(nil) type configurator struct { - // grpcQueryDecoders is required because module expose queries through gRPC - // this provides a way to route to modules using gRPC. - grpcQueryDecoders map[string]func() gogoproto.Message + queryHandlers map[string]appmodulev2.Handler stfQueryRouter *stf.MsgRouterBuilder stfMsgRouter *stf.MsgRouterBuilder @@ -697,28 +695,31 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) { func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error { for _, md := range sd.Methods { // TODO(tip): what if a query is not deterministic? - requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss) + + handler, err := grpcHandlerToAppModuleHandler(sd, md, ss) if err != nil { - return fmt.Errorf("unable to register query handler %s.%s: %w", sd.ServiceName, md.MethodName, err) + return fmt.Errorf("unable to make a appmodulev2.HandlerFunc from gRPC handler (%s, %s): %w", sd.ServiceName, md.MethodName, err) } - // register gRPC query method. - typ := gogoproto.MessageType(requestFullName) - if typ == nil { - return fmt.Errorf("unable to find message in gogotype registry: %w", err) - } - decoderFunc := func() gogoproto.Message { - return reflect.New(typ.Elem()).Interface().(gogoproto.Message) + // register to stf query router. + err = c.stfQueryRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func) + if err != nil { + return fmt.Errorf("unable to register handler to stf router (%s, %s): %w", sd.ServiceName, md.MethodName, err) } - methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName) - c.grpcQueryDecoders[methodName] = decoderFunc + + // register query handler using the same mapping used in stf + c.queryHandlers[gogoproto.MessageName(handler.MakeMsg())] = handler } return nil } func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error { for _, md := range sd.Methods { - _, err := registerMethod(c.stfMsgRouter, sd, md, ss) + handler, err := grpcHandlerToAppModuleHandler(sd, md, ss) + if err != nil { + return err + } + err = c.stfMsgRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func) if err != nil { return fmt.Errorf("unable to register msg handler %s.%s: %w", sd.ServiceName, md.MethodName, err) } @@ -726,32 +727,27 @@ func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) return nil } -// requestFullNameFromMethodDesc returns the fully-qualified name of the request message of the provided service's method. -func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, error) { - methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName)) - desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName) - if err != nil { - return "", fmt.Errorf("cannot find method descriptor %s", methodFullName) - } - methodDesc, ok := desc.(protoreflect.MethodDescriptor) - if !ok { - return "", fmt.Errorf("invalid method descriptor %s", methodFullName) - } - return methodDesc.Input().FullName(), nil -} - -func registerMethod( - stfRouter *stf.MsgRouterBuilder, +// grpcHandlerToAppModuleHandler converts a gRPC handler into an appmodulev2.HandlerFunc. +func grpcHandlerToAppModuleHandler( sd *grpc.ServiceDesc, md grpc.MethodDesc, ss interface{}, -) (string, error) { - requestName, err := requestFullNameFromMethodDesc(sd, md) +) (appmodulev2.Handler, error) { + requestName, responseName, err := requestFullNameFromMethodDesc(sd, md) if err != nil { - return "", err + return appmodulev2.Handler{}, err } - return string(requestName), stfRouter.RegisterHandler(string(requestName), func( + requestTyp := gogoproto.MessageType(string(requestName)) + if requestTyp == nil { + return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", requestName) + } + responseTyp := gogoproto.MessageType(string(responseName)) + if responseTyp == nil { + return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", responseName) + } + + handlerFunc := func( ctx context.Context, msg transaction.Msg, ) (resp transaction.Msg, err error) { @@ -760,7 +756,17 @@ func registerMethod( return nil, err } return res.(transaction.Msg), nil - }) + } + + return appmodulev2.Handler{ + Func: handlerFunc, + MakeMsg: func() transaction.Msg { + return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg) + }, + MakeMsgResp: func() transaction.Msg { + return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg) + }, + }, nil } func noopDecoder(_ interface{}) error { return nil } @@ -776,6 +782,20 @@ func messagePassingInterceptor(msg transaction.Msg) grpc.UnaryServerInterceptor } } +// requestFullNameFromMethodDesc returns the fully-qualified name of the request message and response of the provided service's method. +func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, protoreflect.FullName, error) { + methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName)) + desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName) + if err != nil { + return "", "", fmt.Errorf("cannot find method descriptor %s", methodFullName) + } + methodDesc, ok := desc.(protoreflect.MethodDescriptor) + if !ok { + return "", "", fmt.Errorf("invalid method descriptor %s", methodFullName) + } + return methodDesc.Input().FullName(), methodDesc.Output().FullName(), nil +} + // defaultMigrationsOrder returns a default migrations order: ascending alphabetical by module name, // except x/auth which will run last, see: // https://github.com/cosmos/cosmos-sdk/issues/10591 @@ -815,7 +835,7 @@ type stfRouterWrapper struct { error error - decoders map[string]func() gogoproto.Message + handlers map[string]appmodulev2.Handler } func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) { @@ -831,7 +851,7 @@ func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) { // also make the decoder if s.error == nil { - s.decoders = map[string]func() gogoproto.Message{} + s.handlers = map[string]appmodulev2.Handler{} } - s.decoders[requestName] = handler.MakeMsg + s.handlers[requestName] = handler } diff --git a/runtime/v2/module.go b/runtime/v2/module.go index 8e8b7e0ce832..a95e38f96ea5 100644 --- a/runtime/v2/module.go +++ b/runtime/v2/module.go @@ -127,13 +127,13 @@ func ProvideAppBuilder[T transaction.Tx]( msgRouterBuilder := stf.NewMsgRouterBuilder() app := &App[T]{ - storeKeys: nil, - interfaceRegistrar: interfaceRegistrar, - amino: amino, - msgRouterBuilder: msgRouterBuilder, - queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router - GRPCMethodsToMessageMap: map[string]func() proto.Message{}, - storeLoader: DefaultStoreLoader, + storeKeys: nil, + interfaceRegistrar: interfaceRegistrar, + amino: amino, + msgRouterBuilder: msgRouterBuilder, + queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router + QueryHandlers: map[string]appmodulev2.Handler{}, + storeLoader: DefaultStoreLoader, } appBuilder := &AppBuilder[T]{app: app} diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 170343fd512c..f8d09b75a26f 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -10,6 +10,7 @@ import ( "slices" "strconv" + appmodulev2 "cosmossdk.io/core/appmodule/v2" "github.com/cosmos/gogoproto/proto" "github.com/spf13/pflag" "google.golang.org/grpc" @@ -53,7 +54,7 @@ func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.L return fmt.Errorf("failed to unmarshal config: %w", err) } } - methodsMap := appI.GetGPRCMethodsToMessageMap() + methodsMap := appI.GetQueryHandlers() grpcSrv := grpc.NewServer( grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()), @@ -80,7 +81,7 @@ func (s *Server[T]) StartCmdFlags() *pflag.FlagSet { return flags } -func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface { +func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier interface { Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error) }, ) grpc.StreamHandler { @@ -89,12 +90,12 @@ func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, queri if !ok { return status.Error(codes.InvalidArgument, "unable to get method") } - makeMsg, exists := messageMap[method] + handler, exists := handlers[method] if !exists { return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method) } for { - req := makeMsg() + req := handler() err := stream.RecvMsg(req) if err != nil { if errors.Is(err, io.EOF) { diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 02966c874770..28bfcf89e160 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -7,6 +7,7 @@ import ( "fmt" "sync/atomic" + appmodulev2 "cosmossdk.io/core/appmodule/v2" abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" gogoproto "github.com/cosmos/gogoproto/proto" @@ -61,7 +62,7 @@ type Consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID - grpcMethodsMap map[string]func() transaction.Msg // maps gRPC method to message creator func + queryHandlersMap map[string]appmodulev2.Handler } func NewConsensus[T transaction.Tx]( @@ -70,7 +71,7 @@ func NewConsensus[T transaction.Tx]( app *appmanager.AppManager[T], mp mempool.Mempool[T], indexedEvents map[string]struct{}, - gRPCMethodsMap map[string]func() transaction.Msg, + queryHandlersMap map[string]appmodulev2.Handler, store types.Store, cfg Config, txCodec transaction.Codec[T], @@ -79,7 +80,7 @@ func NewConsensus[T transaction.Tx]( return &Consensus[T]{ appName: appName, version: getCometBFTServerVersion(), - grpcMethodsMap: gRPCMethodsMap, + queryHandlersMap: queryHandlersMap, app: app, cfg: cfg, store: store, @@ -192,6 +193,8 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { // check if it's a gRPC method + desc, err := gogoproto.MergedRegistry() + makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path] if isGRPC { protoRequest := makeGRPCRequest() diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 21b6d59ddc72..90f2dcbf50c0 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -105,7 +105,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg appI.GetAppManager(), s.serverOptions.Mempool(cfg), indexEvents, - appI.GetGPRCMethodsToMessageMap(), + appI.GetQueryHandlers(), store, s.config, s.initTxCodec, diff --git a/server/v2/server_test.go b/server/v2/server_test.go index e84bcd598890..c23f2e79de88 100644 --- a/server/v2/server_test.go +++ b/server/v2/server_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + appmodulev2 "cosmossdk.io/core/appmodule/v2" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/viper" "github.com/stretchr/testify/require" @@ -35,8 +36,8 @@ type mockApp[T transaction.Tx] struct { serverv2.AppI[T] } -func (*mockApp[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { - return map[string]func() gogoproto.Message{} +func (*mockApp[T]) GetQueryHandlers() map[string]appmodulev2.Handler { + return map[string]appmodulev2.Handler{} } func (*mockApp[T]) GetAppManager() *appmanager.AppManager[T] { diff --git a/server/v2/types.go b/server/v2/types.go index afa82969131a..60e085f2d2f2 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -1,7 +1,7 @@ package serverv2 import ( - gogoproto "github.com/cosmos/gogoproto/proto" + appmodulev2 "cosmossdk.io/core/appmodule/v2" "github.com/spf13/viper" "cosmossdk.io/core/server" @@ -16,6 +16,6 @@ type AppI[T transaction.Tx] interface { Name() string InterfaceRegistry() server.InterfaceRegistry GetAppManager() *appmanager.AppManager[T] - GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message + GetQueryHandlers() map[string]appmodulev2.Handler GetStore() any } From afc9b067ede1b2fe3f8ea1c7dd2888b49f7003cb Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 14:17:28 +0200 Subject: [PATCH 2/7] checkpoint --- server/v2/api/grpc/server.go | 33 ++++++++++++++++++++++++++---- server/v2/cometbft/abci.go | 39 ++++++++++++++++++++++++++---------- server/v2/go.mod | 2 +- server/v2/go.sum | 4 ++-- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index f8d09b75a26f..8ea2ea2f9fc3 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -9,14 +9,18 @@ import ( "net" "slices" "strconv" + "strings" + "sync" appmodulev2 "cosmossdk.io/core/appmodule/v2" - "github.com/cosmos/gogoproto/proto" + gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/pflag" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -82,20 +86,41 @@ func (s *Server[T]) StartCmdFlags() *pflag.FlagSet { } func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier interface { - Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error) + Query(ctx context.Context, version uint64, msg gogoproto.Message) (gogoproto.Message, error) }, ) grpc.StreamHandler { + getRegistry := sync.OnceValues(func() (*protoregistry.Files, error) { + return gogoproto.MergedRegistry() + }) + return func(srv any, stream grpc.ServerStream) error { method, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Error(codes.InvalidArgument, "unable to get method") } - handler, exists := handlers[method] + // if this fails we cannot serve queries anymore... + registry, err := getRegistry() + if err != nil { + return err + } + fullName := protoreflect.FullName(strings.ReplaceAll(method, "/", ".")) + // get descriptor from the invoke method + desc, err := registry.FindDescriptorByName(fullName) + if err != nil { + return fmt.Errorf("failed to find descriptor %s: %w", method, err) + } + md, ok := desc.(protoreflect.MethodDescriptor) + if !ok { + return fmt.Errorf("%s is not a method", method) + } + // find handler + handler, exists := handlers[string(md.Input().FullName())] if !exists { return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method) } + for { - req := handler() + req := handler.MakeMsg() err := stream.RecvMsg(req) if err != nil { if errors.Is(err, io.EOF) { diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 28bfcf89e160..aba85ab6621a 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -5,12 +5,16 @@ import ( "crypto/sha256" "errors" "fmt" + "strings" + "sync" "sync/atomic" appmodulev2 "cosmossdk.io/core/appmodule/v2" abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" gogoproto "github.com/cosmos/gogoproto/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" "cosmossdk.io/core/comet" @@ -63,6 +67,7 @@ type Consensus[T transaction.Tx] struct { idPeerFilter types.PeerFilter // filter peers by node ID queryHandlersMap map[string]appmodulev2.Handler + getProtoRegistry func() (*protoregistry.Files, error) } func NewConsensus[T transaction.Tx]( @@ -78,14 +83,12 @@ func NewConsensus[T transaction.Tx]( chainId string, ) *Consensus[T] { return &Consensus[T]{ + logger: logger, appName: appName, version: getCometBFTServerVersion(), - queryHandlersMap: queryHandlersMap, app: app, - cfg: cfg, - store: store, - logger: logger, txCodec: txCodec, + store: store, streaming: streaming.Manager{}, snapshotManager: nil, mempool: mp, @@ -94,9 +97,8 @@ func NewConsensus[T transaction.Tx]( processProposalHandler: nil, verifyVoteExt: nil, extendVote: nil, - chainID: chainId, - indexedEvents: indexedEvents, - initialHeight: 0, + queryHandlersMap: queryHandlersMap, + getProtoRegistry: sync.OnceValues(func() (*protoregistry.Files, error) { return gogoproto.MergedRegistry() }), } } @@ -192,12 +194,27 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // Query implements types.Application. // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { - // check if it's a gRPC method - desc, err := gogoproto.MergedRegistry() + // if this fails then we cannot serve queries anymore + registry, err := c.getProtoRegistry() + if err != nil { + return nil, err + } + + fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", ".")) + + desc, err := registry.FindDescriptorByName(fullName) + if err != nil { + return nil, err + } + + md, ok := desc.(protoreflect.MethodDescriptor) + if !ok { + return nil, fmt.Errorf("proto descriptor %s not found in registry", req.Path) + } - makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path] + handler, isGRPC := c.queryHandlersMap[string(md.Input().FullName())] if isGRPC { - protoRequest := makeGRPCRequest() + protoRequest := handler.MakeMsg() err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec if err != nil { return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) diff --git a/server/v2/go.mod b/server/v2/go.mod index 3885246d5b48..4bc824e1a164 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -13,7 +13,7 @@ replace ( require ( cosmossdk.io/api v0.7.6 - cosmossdk.io/core v1.0.0-alpha.3 + cosmossdk.io/core v1.0.0-alpha.4 cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 cosmossdk.io/log v1.4.1 cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000 diff --git a/server/v2/go.sum b/server/v2/go.sum index 4406baf611d3..e22bb8743eac 100644 --- a/server/v2/go.sum +++ b/server/v2/go.sum @@ -1,7 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cosmossdk.io/core v1.0.0-alpha.3 h1:pnxaYAas7llXgVz1lM7X6De74nWrhNKnB3yMKe4OUUA= -cosmossdk.io/core v1.0.0-alpha.3/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY= +cosmossdk.io/core v1.0.0-alpha.4 h1:9iuroT9ejDYETCsGkzkvs/wAY/5UFl7nCIINFRxyMJY= +cosmossdk.io/core v1.0.0-alpha.4/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY= cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 h1:NxxUo0GMJUbIuVg0R70e3cbn9eFTEuMr7ev1AFvypdY= cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29/go.mod h1:8s2tPeJtSiQuoyPmr2Ag7meikonISO4Fv4MoO8+ORrs= cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqFG1+UgeU1JzZrWtwuWzI3ZfwA= From 316578bfd2960e71b237f1cc31bc4ada48e9c46f Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 14:29:53 +0200 Subject: [PATCH 3/7] checkpoint --- server/v2/cometbft/abci.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index aba85ab6621a..7513e3b01dd5 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -83,12 +83,13 @@ func NewConsensus[T transaction.Tx]( chainId string, ) *Consensus[T] { return &Consensus[T]{ - logger: logger, appName: appName, version: getCometBFTServerVersion(), app: app, - txCodec: txCodec, + cfg: cfg, store: store, + logger: logger, + txCodec: txCodec, streaming: streaming.Manager{}, snapshotManager: nil, mempool: mp, @@ -97,6 +98,9 @@ func NewConsensus[T transaction.Tx]( processProposalHandler: nil, verifyVoteExt: nil, extendVote: nil, + chainID: chainId, + indexedEvents: indexedEvents, + initialHeight: 0, queryHandlersMap: queryHandlersMap, getProtoRegistry: sync.OnceValues(func() (*protoregistry.Files, error) { return gogoproto.MergedRegistry() }), } From 2e406ceeeb400ad0912372a33e0671bcbdb50654 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 14:33:00 +0200 Subject: [PATCH 4/7] lints --- server/v2/api/grpc/server.go | 14 +++++--------- server/v2/cometbft/abci.go | 2 +- server/v2/server_test.go | 2 +- server/v2/types.go | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 8ea2ea2f9fc3..ba14a52e8cf8 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -13,6 +13,10 @@ import ( "sync" appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" + "cosmossdk.io/server/v2/api/grpc/gogoreflection" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/pflag" "google.golang.org/grpc" @@ -20,12 +24,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/reflect/protoregistry" - - "cosmossdk.io/core/transaction" - "cosmossdk.io/log" - serverv2 "cosmossdk.io/server/v2" - "cosmossdk.io/server/v2/api/grpc/gogoreflection" ) const ( @@ -89,9 +87,7 @@ func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier Query(ctx context.Context, version uint64, msg gogoproto.Message) (gogoproto.Message, error) }, ) grpc.StreamHandler { - getRegistry := sync.OnceValues(func() (*protoregistry.Files, error) { - return gogoproto.MergedRegistry() - }) + getRegistry := sync.OnceValues(gogoproto.MergedRegistry) return func(srv any, stream grpc.ServerStream) error { method, ok := grpc.MethodFromServerStream(stream) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 7513e3b01dd5..4757aa052e38 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -9,7 +9,6 @@ import ( "sync" "sync/atomic" - appmodulev2 "cosmossdk.io/core/appmodule/v2" abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" gogoproto "github.com/cosmos/gogoproto/proto" @@ -17,6 +16,7 @@ import ( "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" + appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/comet" corecontext "cosmossdk.io/core/context" "cosmossdk.io/core/event" diff --git a/server/v2/server_test.go b/server/v2/server_test.go index c23f2e79de88..97afa40fdaa9 100644 --- a/server/v2/server_test.go +++ b/server/v2/server_test.go @@ -7,11 +7,11 @@ import ( "testing" "time" - appmodulev2 "cosmossdk.io/core/appmodule/v2" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/viper" "github.com/stretchr/testify/require" + appmodulev2 "cosmossdk.io/core/appmodule/v2" coreserver "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" "cosmossdk.io/log" diff --git a/server/v2/types.go b/server/v2/types.go index 60e085f2d2f2..7cd15fd10307 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -1,9 +1,9 @@ package serverv2 import ( - appmodulev2 "cosmossdk.io/core/appmodule/v2" "github.com/spf13/viper" + appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" "cosmossdk.io/log" From 4f082deb918c765199fecf64e0d3838cb4afe5d3 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 15:05:08 +0200 Subject: [PATCH 5/7] fix grpc query logic --- server/v2/api/grpc/server.go | 11 ++--- server/v2/cometbft/abci.go | 78 +++++++++++++++++++++--------------- 2 files changed, 51 insertions(+), 38 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index ba14a52e8cf8..3a1c9cf6c13f 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -12,11 +12,6 @@ import ( "strings" "sync" - appmodulev2 "cosmossdk.io/core/appmodule/v2" - "cosmossdk.io/core/transaction" - "cosmossdk.io/log" - serverv2 "cosmossdk.io/server/v2" - "cosmossdk.io/server/v2/api/grpc/gogoreflection" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/pflag" "google.golang.org/grpc" @@ -24,6 +19,12 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" + + appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" + "cosmossdk.io/server/v2/api/grpc/gogoreflection" ) const ( diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 4757aa052e38..12b8b026183c 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -198,40 +198,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // Query implements types.Application. // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { - // if this fails then we cannot serve queries anymore - registry, err := c.getProtoRegistry() - if err != nil { - return nil, err - } - - fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", ".")) - - desc, err := registry.FindDescriptorByName(fullName) - if err != nil { - return nil, err - } - - md, ok := desc.(protoreflect.MethodDescriptor) - if !ok { - return nil, fmt.Errorf("proto descriptor %s not found in registry", req.Path) - } - - handler, isGRPC := c.queryHandlersMap[string(md.Input().FullName())] + resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req) if isGRPC { - protoRequest := handler.MakeMsg() - err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec - if err != nil { - return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) - } - res, err := c.app.Query(ctx, uint64(req.Height), protoRequest) - if err != nil { - resp := QueryResult(err, c.cfg.AppTomlConfig.Trace) - resp.Height = req.Height - return resp, err - - } - - return queryResponse(res, req.Height) + return resp, err } // this error most probably means that we can't handle it with a proto message, so @@ -262,6 +231,49 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return resp, nil } +func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) { + // if this fails then we cannot serve queries anymore + registry, err := c.getProtoRegistry() + if err != nil { + return nil, false, err + } + + // in order to check if it's a gRPC query we ensure that there's a descriptor + // for the path, if such descriptor exists, and it is a method descriptor + // then we assume this is a gRPC query. + fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", ".")) + + desc, err := registry.FindDescriptorByName(fullName) + if err != nil { + return nil, false, err + } + + md, isGRPC := desc.(protoreflect.MethodDescriptor) + if !isGRPC { + return nil, false, nil + } + + handler, found := c.queryHandlersMap[string(md.Input().FullName())] + if !found { + return nil, true, fmt.Errorf("no query handler found for %s", fullName) + } + protoRequest := handler.MakeMsg() + err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec + if err != nil { + return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) + } + res, err := c.app.Query(ctx, uint64(req.Height), protoRequest) + if err != nil { + resp := QueryResult(err, c.cfg.AppTomlConfig.Trace) + resp.Height = req.Height + return resp, true, err + + } + + resp, err = queryResponse(res, req.Height) + return resp, isGRPC, err +} + // InitChain implements types.Application. func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId) From 39633c9defd802c13b2f1d776e0ecc394dedced9 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 16:55:34 +0200 Subject: [PATCH 6/7] tidy --- server/v2/cometbft/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/cometbft/go.mod b/server/v2/cometbft/go.mod index 5670b3e108e2..4a173709f90f 100644 --- a/server/v2/cometbft/go.mod +++ b/server/v2/cometbft/go.mod @@ -35,6 +35,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + google.golang.org/protobuf v1.34.2 sigs.k8s.io/yaml v1.4.0 ) @@ -179,7 +180,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect google.golang.org/grpc v1.67.1 // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.5.1 // indirect From a79efec9a3b50a71e872273e0c9c207f44331a39 Mon Sep 17 00:00:00 2001 From: testinginprod <98415576+testinginprod@users.noreply.github.com> Date: Wed, 2 Oct 2024 18:49:34 +0200 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- server/v2/api/grpc/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 3a1c9cf6c13f..3fb531f2ec1b 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -98,7 +98,7 @@ func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier // if this fails we cannot serve queries anymore... registry, err := getRegistry() if err != nil { - return err + return fmt.Errorf("failed to get registry: %w", err) } fullName := protoreflect.FullName(strings.ReplaceAll(method, "/", ".")) // get descriptor from the invoke method