From b4320ee5b930bd53647745bb35df2439fccf8c83 Mon Sep 17 00:00:00 2001 From: testinginprod <98415576+testinginprod@users.noreply.github.com> Date: Tue, 30 Jul 2024 11:26:16 +0200 Subject: [PATCH 1/3] feat(serverv2): integrate gRPC (#21038) Co-authored-by: Marko Co-authored-by: marbar3778 (cherry picked from commit 8b471416559ff189181bec0cc5eb5923c8ba9571) # Conflicts: # schema/appdata/mux_test.go # server/v2/cometbft/server.go # server/v2/go.mod --- runtime/v2/app.go | 8 +- runtime/v2/manager.go | 17 ++- runtime/v2/module.go | 11 +- schema/appdata/mux_test.go | 133 ++++++++++++++++++ .../grpc/gogoreflection/serverreflection.go | 87 ++++++------ server/v2/api/grpc/server.go | 115 +++++++++++++-- server/v2/cometbft/abci.go | 12 +- server/v2/cometbft/server.go | 5 + server/v2/go.mod | 5 + server/v2/server_test.go | 9 ++ server/v2/types.go | 2 +- 11 files changed, 330 insertions(+), 74 deletions(-) create mode 100644 schema/appdata/mux_test.go diff --git a/runtime/v2/app.go b/runtime/v2/app.go index 06bff4a8d86c..0e28d8e3b5a5 100644 --- a/runtime/v2/app.go +++ b/runtime/v2/app.go @@ -44,9 +44,9 @@ type App[T transaction.Tx] struct { amino legacy.Amino moduleManager *MM[T] - // GRPCQueryDecoders maps gRPC method name to a function that decodes the request + // GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request // bytes into a gogoproto.Message, which then can be passed to appmanager. - GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) + GRPCMethodsToMessageMap map[string]func() gogoproto.Message } // Name returns the app name. @@ -118,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] { return a.AppManager } -func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) { - return a.GRPCQueryDecoders +func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { + return a.GRPCMethodsToMessageMap } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index ba73c4c60363..2b134bdb99db 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -556,7 +556,7 @@ func (m *MM[T]) assertNoForgottenModules( func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error { c := &configurator{ - grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){}, + grpcQueryDecoders: map[string]func() gogoproto.Message{}, stfQueryRouter: app.queryRouterBuilder, stfMsgRouter: app.msgRouterBuilder, registry: registry, @@ -567,7 +567,10 @@ func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], re if err != nil { return fmt.Errorf("unable to register services: %w", err) } - app.GRPCQueryDecoders = c.grpcQueryDecoders + // merge maps + for path, decoder := range c.grpcQueryDecoders { + app.GRPCMethodsToMessageMap[path] = decoder + } return nil } @@ -576,7 +579,7 @@ var _ grpc.ServiceRegistrar = (*configurator)(nil) type configurator struct { // grpcQueryDecoders is required because module expose queries through gRPC // this provides a way to route to modules using gRPC. - grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error) + grpcQueryDecoders map[string]func() gogoproto.Message stfQueryRouter *stf.MsgRouterBuilder stfMsgRouter *stf.MsgRouterBuilder @@ -618,11 +621,11 @@ func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{ if typ == nil { return fmt.Errorf("unable to find message in gogotype registry: %w", err) } - decoderFunc := func(bytes []byte) (gogoproto.Message, error) { - msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message) - return msg, gogoproto.Unmarshal(bytes, msg) + decoderFunc := func() gogoproto.Message { + return reflect.New(typ.Elem()).Interface().(gogoproto.Message) } - c.grpcQueryDecoders[md.MethodName] = decoderFunc + methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName) + c.grpcQueryDecoders[methodName] = decoderFunc } return nil } diff --git a/runtime/v2/module.go b/runtime/v2/module.go index a88a4750543a..faa07a2b99dc 100644 --- a/runtime/v2/module.go +++ b/runtime/v2/module.go @@ -130,11 +130,12 @@ func ProvideAppBuilder[T transaction.Tx]( msgRouterBuilder := stf.NewMsgRouterBuilder() app := &App[T]{ - storeKeys: nil, - interfaceRegistrar: interfaceRegistrar, - amino: amino, - msgRouterBuilder: msgRouterBuilder, - queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router + storeKeys: nil, + interfaceRegistrar: interfaceRegistrar, + amino: amino, + msgRouterBuilder: msgRouterBuilder, + queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router + GRPCMethodsToMessageMap: map[string]func() proto.Message{}, } appBuilder := &AppBuilder[T]{app: app} diff --git a/schema/appdata/mux_test.go b/schema/appdata/mux_test.go new file mode 100644 index 000000000000..70787fada8a5 --- /dev/null +++ b/schema/appdata/mux_test.go @@ -0,0 +1,133 @@ +package appdata + +import ( + "fmt" + "testing" +) + +func TestListenerMux(t *testing.T) { + t.Run("empty", func(t *testing.T) { + listener := ListenerMux(Listener{}, Listener{}) + + if listener.InitializeModuleData != nil { + t.Error("expected nil") + } + if listener.StartBlock != nil { + t.Error("expected nil") + } + if listener.OnTx != nil { + t.Error("expected nil") + } + if listener.OnEvent != nil { + t.Error("expected nil") + } + if listener.OnKVPair != nil { + t.Error("expected nil") + } + if listener.OnObjectUpdate != nil { + t.Error("expected nil") + } + if listener.Commit != nil { + t.Error("expected nil") + } + }) + + t.Run("all called once", func(t *testing.T) { + var calls []string + onCall := func(name string, i int, _ Packet) { + calls = append(calls, fmt.Sprintf("%s %d", name, i)) + } + + res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall)) + + callAllCallbacksOnces(t, res) + + checkExpectedCallOrder(t, calls, []string{ + "InitializeModuleData 1", + "InitializeModuleData 2", + "StartBlock 1", + "StartBlock 2", + "OnTx 1", + "OnTx 2", + "OnEvent 1", + "OnEvent 2", + "OnKVPair 1", + "OnKVPair 2", + "OnObjectUpdate 1", + "OnObjectUpdate 2", + "Commit 1", + "Commit 2", + }) + }) +} + +func callAllCallbacksOnces(t *testing.T, listener Listener) { + t.Helper() + if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil { + t.Error(err) + } + if err := listener.StartBlock(StartBlockData{}); err != nil { + t.Error(err) + } + if err := listener.OnTx(TxData{}); err != nil { + t.Error(err) + } + if err := listener.OnEvent(EventData{}); err != nil { + t.Error(err) + } + if err := listener.OnKVPair(KVPairData{}); err != nil { + t.Error(err) + } + if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil { + t.Error(err) + } + if err := listener.Commit(CommitData{}); err != nil { + t.Error(err) + } +} + +func callCollector(i int, onCall func(string, int, Packet)) Listener { + return Listener{ + InitializeModuleData: func(ModuleInitializationData) error { + onCall("InitializeModuleData", i, nil) + return nil + }, + StartBlock: func(StartBlockData) error { + onCall("StartBlock", i, nil) + return nil + }, + OnTx: func(TxData) error { + onCall("OnTx", i, nil) + return nil + }, + OnEvent: func(EventData) error { + onCall("OnEvent", i, nil) + return nil + }, + OnKVPair: func(KVPairData) error { + onCall("OnKVPair", i, nil) + return nil + }, + OnObjectUpdate: func(ObjectUpdateData) error { + onCall("OnObjectUpdate", i, nil) + return nil + }, + Commit: func(CommitData) error { + onCall("Commit", i, nil) + return nil + }, + } +} + +func checkExpectedCallOrder(t *testing.T, actual, expected []string) { + t.Helper() + if len(actual) != len(expected) { + t.Fatalf("expected %d calls, got %d", len(expected), len(actual)) + } + + for i := range actual { + if actual[i] != expected[i] { + t.Errorf("expected %q, got %q", expected[i], actual[i]) + } + } +} diff --git a/server/v2/api/grpc/gogoreflection/serverreflection.go b/server/v2/api/grpc/gogoreflection/serverreflection.go index 077c15c3321a..79f520545a87 100644 --- a/server/v2/api/grpc/gogoreflection/serverreflection.go +++ b/server/v2/api/grpc/gogoreflection/serverreflection.go @@ -42,33 +42,41 @@ import ( "errors" "fmt" "io" - "log" "reflect" "sort" + "strings" "sync" - //nolint: staticcheck // keep this import for backward compatibility - "github.com/golang/protobuf/proto" + gogoproto "github.com/cosmos/gogoproto/proto" dpb "github.com/golang/protobuf/protoc-gen-go/descriptor" "google.golang.org/grpc" "google.golang.org/grpc/codes" rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + + "cosmossdk.io/core/log" ) type serverReflectionServer struct { rpb.UnimplementedServerReflectionServer s *grpc.Server + methods []string + initSymbols sync.Once serviceNames []string symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files + log log.Logger } // Register registers the server reflection service on the given gRPC server. -func Register(s *grpc.Server) { +func Register(s *grpc.Server, methods []string, logger log.Logger) { rpb.RegisterServerReflectionServer(s, &serverReflectionServer{ - s: s, + s: s, + methods: methods, + log: logger, }) } @@ -82,21 +90,12 @@ type protoMessage interface { func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) { s.initSymbols.Do(func() { - serviceInfo := s.s.GetServiceInfo() - s.symbols = map[string]*dpb.FileDescriptorProto{} - s.serviceNames = make([]string, 0, len(serviceInfo)) + services, fds := s.getServices(s.methods) + s.serviceNames = services + processed := map[string]struct{}{} - for svc, info := range serviceInfo { - s.serviceNames = append(s.serviceNames, svc) - fdenc, ok := parseMetadata(info.Metadata) - if !ok { - continue - } - fd, err := decodeFileDesc(fdenc) - if err != nil { - continue - } + for _, fd := range fds { s.processFile(fd, processed) } sort.Strings(s.serviceNames) @@ -207,7 +206,7 @@ func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) { } fd := new(dpb.FileDescriptorProto) - if err := proto.Unmarshal(raw, fd); err != nil { + if err := gogoproto.Unmarshal(raw, fd); err != nil { return nil, fmt.Errorf("bad descriptor: %w", err) } return fd, nil @@ -237,7 +236,7 @@ func typeForName(name string) (reflect.Type, error) { } func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) { - m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message) + m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message) if !ok { return nil, fmt.Errorf("failed to create message from type: %v", st) } @@ -252,7 +251,7 @@ func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescripto } func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) { - m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message) + m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message) if !ok { return nil, fmt.Errorf("failed to create message from type: %v", st) } @@ -272,7 +271,7 @@ func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors m queue = queue[1:] if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent { sentFileDescriptors[currentfd.GetName()] = true - currentfdEncoded, err := proto.Marshal(currentfd) + currentfdEncoded, err := gogoproto.Marshal(currentfd) if err != nil { return nil, err } @@ -305,24 +304,6 @@ func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFil return fileDescWithDependencies(fd, sentFileDescriptors) } -// parseMetadata finds the file descriptor bytes specified meta. -// For SupportPackageIsVersion4, m is the name of the proto file, we -// call proto.FileDescriptor to get the byte slice. -// For SupportPackageIsVersion3, m is a byte slice itself. -func parseMetadata(meta interface{}) ([]byte, bool) { - // Check if meta is the file name. - if fileNameForMeta, ok := meta.(string); ok { - return getFileDescriptor(fileNameForMeta), true - } - - // Check if meta is the byte slice. - if enc, ok := meta.([]byte); ok { - return enc, true - } - - return nil, false -} - // fileDescEncodingContainingSymbol finds the file descriptor containing the // given symbol, finds all of its previously unsent transitive dependencies, // does marshaling on them, and returns the marshaled result. The given symbol @@ -446,7 +427,6 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio ErrorMessage: err.Error(), }, } - log.Printf("OH NO: %s", err) } else { out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{ AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{ //nolint:staticcheck // SA1019: we want to keep using v1alpha @@ -476,3 +456,28 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio } } } + +// getServices gets the unique list of services given a list of methods. +func (s *serverReflectionServer) getServices(methods []string) (svcs []string, fds []*dpb.FileDescriptorProto) { + registry, err := gogoproto.MergedRegistry() + if err != nil { + s.log.Error("unable to load merged registry", "err", err) + return nil, nil + } + seenSvc := map[protoreflect.FullName]struct{}{} + for _, methodName := range methods { + methodName = strings.Join(strings.Split(methodName[1:], "/"), ".") + md, err := registry.FindDescriptorByName(protoreflect.FullName(methodName)) + if err != nil { + s.log.Error("unable to load method descriptor", "method", methodName, "err", err) + continue + } + svc := md.(protoreflect.MethodDescriptor).Parent() + if _, seen := seenSvc[svc.FullName()]; !seen { + svcs = append(svcs, string(svc.FullName())) + file := svc.ParentFile() + fds = append(fds, protodesc.ToFileDescriptorProto(file)) + } + } + return +} diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index c5e31f5dfc71..e59bd99a83c0 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -2,11 +2,20 @@ package grpc import ( "context" + "errors" "fmt" + "io" "net" + "strconv" + "github.com/cosmos/gogoproto/proto" + "github.com/spf13/pflag" "github.com/spf13/viper" + "golang.org/x/exp/maps" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -14,7 +23,12 @@ import ( "cosmossdk.io/server/v2/api/grpc/gogoreflection" ) -type GRPCServer[T transaction.Tx] struct { +const ( + BlockHeightHeader = "x-cosmos-block-height" + FlagAddress = "address" +) + +type Server[T transaction.Tx] struct { logger log.Logger config *Config cfgOptions []CfgOption @@ -23,32 +37,34 @@ type GRPCServer[T transaction.Tx] struct { } // New creates a new grpc server. -func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] { - return &GRPCServer[T]{ +func New[T transaction.Tx](cfgOptions ...CfgOption) *Server[T] { + return &Server[T]{ cfgOptions: cfgOptions, } } // Init returns a correctly configured and initialized gRPC server. // Note, the caller is responsible for starting the server. -func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { +func (s *Server[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { cfg := s.Config().(*Config) if v != nil { if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } } + methodsMap := appI.GetGPRCMethodsToMessageMap() grpcSrv := grpc.NewServer( grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()), grpc.MaxSendMsgSize(cfg.MaxSendMsgSize), grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize), + grpc.UnknownServiceHandler( + makeUnknownServiceHandler(methodsMap, appI.GetAppManager()), + ), ) - // appI.RegisterGRPCServer(grpcSrv) - // Reflection allows external clients to see what services and methods the gRPC server exposes. - gogoreflection.Register(grpcSrv) + gogoreflection.Register(grpcSrv, maps.Keys(methodsMap), logger.With("sub-module", "grpc-reflection")) s.grpcSrv = grpcSrv s.config = cfg @@ -57,11 +73,88 @@ func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.L return nil } -func (s *GRPCServer[T]) Name() string { +func (s *Server[T]) StartCmdFlags() *pflag.FlagSet { + flags := pflag.NewFlagSet("grpc", pflag.ExitOnError) + + // start flags are prefixed with the server name + // as the config in prefixed with the server name + // this allows viper to properly bind the flags + prefix := func(f string) string { + return fmt.Sprintf("%s.%s", s.Name(), f) + } + + flags.String(prefix(FlagAddress), "localhost:9090", "Listen address") + return flags +} + +func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface { + Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error) +}, +) grpc.StreamHandler { + return func(srv any, stream grpc.ServerStream) error { + method, ok := grpc.MethodFromServerStream(stream) + if !ok { + return status.Error(codes.InvalidArgument, "unable to get method") + } + makeMsg, exists := messageMap[method] + if !exists { + return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method) + } + for { + req := makeMsg() + err := stream.RecvMsg(req) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + // extract height header + ctx := stream.Context() + height, err := getHeightFromCtx(ctx) + if err != nil { + return status.Errorf(codes.InvalidArgument, "invalid get height from context: %v", err) + } + resp, err := querier.Query(ctx, height, req) + if err != nil { + return err + } + err = stream.SendMsg(resp) + if err != nil { + return err + } + } + } +} + +func getHeightFromCtx(ctx context.Context) (uint64, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return 0, nil + } + values := md.Get(BlockHeightHeader) + if len(values) == 0 { + return 0, nil + } + if len(values) != 1 { + return 0, fmt.Errorf("gRPC height metadata must be of length 1, got: %d", len(values)) + } + + heightStr := values[0] + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse height string from gRPC metadata %s: %w", heightStr, err) + } + + return height, nil +} + +func (s *Server[T]) Name() string { return "grpc" } -func (s *GRPCServer[T]) Config() any { +func (s *Server[T]) Config() any { if s.config == nil || s.config == (&Config{}) { cfg := DefaultConfig() // overwrite the default config with the provided options @@ -75,7 +168,7 @@ func (s *GRPCServer[T]) Config() any { return s.config } -func (s *GRPCServer[T]) Start(ctx context.Context) error { +func (s *Server[T]) Start(ctx context.Context) error { if !s.config.Enable { return nil } @@ -102,7 +195,7 @@ func (s *GRPCServer[T]) Start(ctx context.Context) error { return err } -func (s *GRPCServer[T]) Stop(ctx context.Context) error { +func (s *Server[T]) Stop(ctx context.Context) error { if !s.config.Enable { return nil } diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 380f1427da3a..65bbbbd15b3d 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -41,7 +41,6 @@ type Consensus[T transaction.Tx] struct { streaming streaming.Manager snapshotManager *snapshots.Manager mempool mempool.Mempool[T] - grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC cfg Config indexedEvents map[string]struct{} @@ -60,6 +59,8 @@ type Consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID + + grpcMethodsMap map[string]func() gogoproto.Message // maps gRPC method to message creator func } func NewConsensus[T transaction.Tx]( @@ -69,7 +70,7 @@ func NewConsensus[T transaction.Tx]( app *appmanager.AppManager[T], mp mempool.Mempool[T], indexedEvents map[string]struct{}, - grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error), + gRPCMethodsMap map[string]func() gogoproto.Message, store types.Store, cfg Config, txCodec transaction.Codec[T], @@ -78,7 +79,7 @@ func NewConsensus[T transaction.Tx]( appName: appName, version: getCometBFTServerVersion(), consensusAuthority: consensusAuthority, - grpcQueryDecoders: grpcQueryDecoders, + grpcMethodsMap: gRPCMethodsMap, app: app, cfg: cfg, store: store, @@ -173,9 +174,10 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { // check if it's a gRPC method - grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path] + makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path] if isGRPC { - protoRequest, err := grpcQueryDecoder(req.Data) + protoRequest := makeGRPCRequest() + err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec if err != nil { return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index dfd04ae01d30..f418919f62ae 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -79,8 +79,13 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l appI.GetAppManager(), s.serverOptions.Mempool, indexEvents, +<<<<<<< HEAD appI.GetGRPCQueryDecoders(), appI.GetStore().(types.Store), +======= + appI.GetGPRCMethodsToMessageMap(), + store, +>>>>>>> 8b4714165 (feat(serverv2): integrate gRPC (#21038)) s.config, s.initTxCodec, ) diff --git a/server/v2/go.mod b/server/v2/go.mod index 254dec7bad9a..027354571d9e 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -35,6 +35,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/sync v0.7.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 @@ -76,7 +77,11 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/tidwall/btree v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect +<<<<<<< HEAD golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect +======= + golang.org/x/crypto v0.25.0 // indirect +>>>>>>> 8b4714165 (feat(serverv2): integrate gRPC (#21038)) golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/server/v2/server_test.go b/server/v2/server_test.go index 3faef417757b..e757e7ecd5ca 100644 --- a/server/v2/server_test.go +++ b/server/v2/server_test.go @@ -16,6 +16,7 @@ import ( "cosmossdk.io/log" serverv2 "cosmossdk.io/server/v2" grpc "cosmossdk.io/server/v2/api/grpc" + "cosmossdk.io/server/v2/appmanager" ) type mockInterfaceRegistry struct{} @@ -33,6 +34,14 @@ type mockApp[T transaction.Tx] struct { serverv2.AppI[T] } +func (*mockApp[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { + return map[string]func() gogoproto.Message{} +} + +func (*mockApp[T]) GetAppManager() *appmanager.AppManager[T] { + return nil +} + func (*mockApp[T]) InterfaceRegistry() coreapp.InterfaceRegistry { return &mockInterfaceRegistry{} } diff --git a/server/v2/types.go b/server/v2/types.go index fc6caaaeb735..978b46b78810 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -17,6 +17,6 @@ type AppI[T transaction.Tx] interface { InterfaceRegistry() coreapp.InterfaceRegistry GetAppManager() *appmanager.AppManager[T] GetConsensusAuthority() string - GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) + GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message GetStore() any } From c966e82f1cb8d2c829f278222bfa51507a089df3 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 31 Jul 2024 23:16:16 +0200 Subject: [PATCH 2/3] conflicts --- schema/appdata/mux_test.go | 133 ----------------------------------- server/v2/cometbft/server.go | 7 +- server/v2/go.mod | 5 -- 3 files changed, 1 insertion(+), 144 deletions(-) delete mode 100644 schema/appdata/mux_test.go diff --git a/schema/appdata/mux_test.go b/schema/appdata/mux_test.go deleted file mode 100644 index 70787fada8a5..000000000000 --- a/schema/appdata/mux_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package appdata - -import ( - "fmt" - "testing" -) - -func TestListenerMux(t *testing.T) { - t.Run("empty", func(t *testing.T) { - listener := ListenerMux(Listener{}, Listener{}) - - if listener.InitializeModuleData != nil { - t.Error("expected nil") - } - if listener.StartBlock != nil { - t.Error("expected nil") - } - if listener.OnTx != nil { - t.Error("expected nil") - } - if listener.OnEvent != nil { - t.Error("expected nil") - } - if listener.OnKVPair != nil { - t.Error("expected nil") - } - if listener.OnObjectUpdate != nil { - t.Error("expected nil") - } - if listener.Commit != nil { - t.Error("expected nil") - } - }) - - t.Run("all called once", func(t *testing.T) { - var calls []string - onCall := func(name string, i int, _ Packet) { - calls = append(calls, fmt.Sprintf("%s %d", name, i)) - } - - res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall)) - - callAllCallbacksOnces(t, res) - - checkExpectedCallOrder(t, calls, []string{ - "InitializeModuleData 1", - "InitializeModuleData 2", - "StartBlock 1", - "StartBlock 2", - "OnTx 1", - "OnTx 2", - "OnEvent 1", - "OnEvent 2", - "OnKVPair 1", - "OnKVPair 2", - "OnObjectUpdate 1", - "OnObjectUpdate 2", - "Commit 1", - "Commit 2", - }) - }) -} - -func callAllCallbacksOnces(t *testing.T, listener Listener) { - t.Helper() - if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil { - t.Error(err) - } - if err := listener.StartBlock(StartBlockData{}); err != nil { - t.Error(err) - } - if err := listener.OnTx(TxData{}); err != nil { - t.Error(err) - } - if err := listener.OnEvent(EventData{}); err != nil { - t.Error(err) - } - if err := listener.OnKVPair(KVPairData{}); err != nil { - t.Error(err) - } - if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil { - t.Error(err) - } - if err := listener.Commit(CommitData{}); err != nil { - t.Error(err) - } -} - -func callCollector(i int, onCall func(string, int, Packet)) Listener { - return Listener{ - InitializeModuleData: func(ModuleInitializationData) error { - onCall("InitializeModuleData", i, nil) - return nil - }, - StartBlock: func(StartBlockData) error { - onCall("StartBlock", i, nil) - return nil - }, - OnTx: func(TxData) error { - onCall("OnTx", i, nil) - return nil - }, - OnEvent: func(EventData) error { - onCall("OnEvent", i, nil) - return nil - }, - OnKVPair: func(KVPairData) error { - onCall("OnKVPair", i, nil) - return nil - }, - OnObjectUpdate: func(ObjectUpdateData) error { - onCall("OnObjectUpdate", i, nil) - return nil - }, - Commit: func(CommitData) error { - onCall("Commit", i, nil) - return nil - }, - } -} - -func checkExpectedCallOrder(t *testing.T, actual, expected []string) { - t.Helper() - if len(actual) != len(expected) { - t.Fatalf("expected %d calls, got %d", len(expected), len(actual)) - } - - for i := range actual { - if actual[i] != expected[i] { - t.Errorf("expected %q, got %q", expected[i], actual[i]) - } - } -} diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index f418919f62ae..8a9817cadac8 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -79,13 +79,8 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l appI.GetAppManager(), s.serverOptions.Mempool, indexEvents, -<<<<<<< HEAD - appI.GetGRPCQueryDecoders(), - appI.GetStore().(types.Store), -======= appI.GetGPRCMethodsToMessageMap(), - store, ->>>>>>> 8b4714165 (feat(serverv2): integrate gRPC (#21038)) + appI.GetStore().(types.Store), s.config, s.initTxCodec, ) diff --git a/server/v2/go.mod b/server/v2/go.mod index 027354571d9e..430030ba80a4 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -77,11 +77,6 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/tidwall/btree v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect -<<<<<<< HEAD - golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect -======= - golang.org/x/crypto v0.25.0 // indirect ->>>>>>> 8b4714165 (feat(serverv2): integrate gRPC (#21038)) golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect From 5e4691bc5f448a61bdccb71a2128c22f69793c50 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 31 Jul 2024 23:17:14 +0200 Subject: [PATCH 3/3] build --- server/v2/store/server.go | 76 --------------------------------------- 1 file changed, 76 deletions(-) delete mode 100644 server/v2/store/server.go diff --git a/server/v2/store/server.go b/server/v2/store/server.go deleted file mode 100644 index c7a1f9035d10..000000000000 --- a/server/v2/store/server.go +++ /dev/null @@ -1,76 +0,0 @@ -package store - -import ( - "context" - "fmt" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "cosmossdk.io/core/transaction" - "cosmossdk.io/log" - serverv2 "cosmossdk.io/server/v2" -) - -// StoreComponent manages store config -// and contains prune & snapshot commands -type StoreComponent[T transaction.Tx] struct { - config *Config -} - -func New[T transaction.Tx]() *StoreComponent[T] { - return &StoreComponent[T]{} -} - -func (s *StoreComponent[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { - cfg := DefaultConfig() - if v != nil { - if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil { - return fmt.Errorf("failed to unmarshal config: %w", err) - } - } - s.config = cfg - return nil -} - -func (s *StoreComponent[T]) Name() string { - return "store" -} - -func (s *StoreComponent[T]) Start(ctx context.Context) error { - return nil -} - -func (s *StoreComponent[T]) Stop(ctx context.Context) error { - return nil -} - -func (s *StoreComponent[T]) GetCommands() []*cobra.Command { - return []*cobra.Command{ - s.PrunesCmd(), - } -} - -func (s *StoreComponent[T]) GetTxs() []*cobra.Command { - return nil -} - -func (s *StoreComponent[T]) GetQueries() []*cobra.Command { - return nil -} - -func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig { - return serverv2.CLIConfig{ - Commands: []*cobra.Command{ - s.PrunesCmd(), - }, - } -} - -func (g *StoreComponent[T]) Config() any { - if g.config == nil || g.config == (&Config{}) { - return DefaultConfig() - } - - return g.config -}