Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: remove grpc query routing through tendermint #10045

Merged
merged 21 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
29aa15f
Revert "Make gRPC requests go through tendermint Query (#8549)"
tac0turtle Sep 1, 2021
d2e2eae
update iavl
tac0turtle Sep 1, 2021
a75aefe
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 2, 2021
3c31623
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 13, 2021
5b573de
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 16, 2021
8010317
address comments
tac0turtle Sep 16, 2021
8918090
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 16, 2021
375267f
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 17, 2021
3cbbebe
address comments
tac0turtle Sep 17, 2021
9e994b1
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 21, 2021
b7e8422
fix build and add changelog
tac0turtle Sep 21, 2021
a3d068c
Merge branch 'master' into revert-tendermint-routing
amaury1093 Sep 22, 2021
8d97882
remove duplicate line
tac0turtle Sep 22, 2021
0ff40ba
remove deadcode
tac0turtle Sep 23, 2021
c087340
Merge branch 'master' into revert-tendermint-routing
amaury1093 Sep 23, 2021
737c36a
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 23, 2021
bf713f7
Merge branch 'master' into revert-tendermint-routing
tac0turtle Sep 23, 2021
60fe1e3
Merge branch 'master' into revert-tendermint-routing
amaury1093 Sep 27, 2021
a2397c2
Merge branch 'master' into revert-tendermint-routing
robert-zaremba Sep 29, 2021
35b641b
Merge branch 'master' into revert-tendermint-routing
amaury1093 Sep 29, 2021
21d5424
Merge branch 'master' into revert-tendermint-routing
mergify[bot] Sep 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package baseapp

import (
"fmt"
"reflect"

"github.com/cosmos/cosmos-sdk/client/grpc/reflection"

Expand All @@ -14,19 +13,13 @@ import (

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

var protoCodec = encoding.GetCodec(proto.Name)

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
// returnTypes is a map of FQ method name => its return type. It is used
// for cache purposes: the first time a method handler is run, we save its
// return type in this map. Then, on subsequent method handler calls, we
// decode the ABCI response bytes using the cached return type.
returnTypes map[string]reflect.Type
routes map[string]GRPCQueryHandler
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}
Expand All @@ -42,8 +35,7 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
returnTypes: map[string]reflect.Type{},
routes: map[string]GRPCQueryHandler{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -98,17 +90,8 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
if qrt.interfaceRegistry != nil {
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
}

return nil
}, nil)

// If it's the first time we call this handler, then we save
// the return type of the handler in the `returnTypes` map.
// The return type will be used for decoding subsequent requests.
if _, found := qrt.returnTypes[fqName]; !found {
qrt.returnTypes[fqName] = reflect.TypeOf(res)
}

if err != nil {
return abci.ResponseQuery{}, err
}
Expand Down Expand Up @@ -144,16 +127,3 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
reflection.NewReflectionServiceServer(interfaceRegistry),
)
}

// returnTypeOf returns the return type of a gRPC method handler. With the way the
// `returnTypes` cache map is set up, the return type of a method handler is
// guaranteed to be found if it's retrieved **after** the method handler ran at
// least once. If not, then a logic error is return.
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
returnType, found := qrt.returnTypes[method]
if !found {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
}

return returnType, nil
}
83 changes: 36 additions & 47 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,67 @@ package baseapp

import (
"context"
"reflect"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/tx"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will route
// the query through the `clientCtx`, which itself queries Tendermint.
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}

return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}

// Case 2. Querying state.
inMd, _ := metadata.FromIncomingContext(grpcCtx)
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
if err != nil {
return nil, err
// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
}
}

// We need to know the return type of the grpc method for
// unmarshalling abciRes.Value.
//
// When we call each method handler for the first time, we save its
// return type in the `returnTypes` map (see the method handler in
// `grpcrouter.go`). By this time, the method handler has already run
// at least once (in the RunGRPCQuery call), so we're sure the
// returnType maps is populated for this method. We're retrieving it
// for decoding.
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
if err != nil {
return nil, err
}

// returnType is a pointer to a struct. Here, we're creating res which
// is a new pointer to the underlying struct.
res := reflect.New(returnType.Elem()).Interface()
// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

err = protoCodec.Unmarshal(abciRes.Value, res)
if err != nil {
return nil, err
}

// Send the metadata header back. The metadata currently includes:
// - block height.
err = grpc.SendHeader(grpcCtx, outMd)
if err != nil {
return nil, err
// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return res, nil
return handler(grpcCtx, req)
}

// Loop through all services and methods, add the interceptor, and register
Expand Down
99 changes: 45 additions & 54 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,110 +24,101 @@ var _ gogogrpc.ClientConn = Context{}
var protoCodec = encoding.GetCodec(proto.Name)

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
amaury1093 marked this conversation as resolved.
Show resolved Hide resolved
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// In both cases, we don't allow empty request req (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if isBroadcast(method) {
amaury1093 marked this conversation as resolved.
Show resolved Hide resolved
req, ok := args.(*tx.BroadcastTxRequest)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
}
resProto, ok := reply.(*tx.BroadcastTxResponse)
res, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
if err != nil {
return err
}
*resProto = *broadcastRes
*res = *broadcastRes

return err
}

// Case 2. Querying state.
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
reqBz, err := protoCodec.Marshal(args)
if err != nil {
return err
}

err = protoCodec.Unmarshal(abciRes.Value, reply)
if err != nil {
return err
}

for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = outMd
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
// arguments for the gRPC method, and returns the ABCI response. It is used
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
// gRPC handlers.
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
reqBz, err := protoCodec.Marshal(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return abci.ResponseQuery{}, nil, err
return err
}
if height < 0 {
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.RequestQuery{
req := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
}

abciRes, err := ctx.QueryABCI(abciReq)
res, err := ctx.QueryABCI(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
return err
}

err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
}

// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = md
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

return abciRes, md, nil
func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/coinbase/rosetta-sdk-go v0.6.10
github.com/confio/ics23/go v0.6.6
github.com/cosmos/go-bip39 v1.0.0
github.com/cosmos/iavl v0.16.0
github.com/cosmos/iavl v0.17.1-0.20210901093355-89f6b77e9284
github.com/cosmos/ledger-cosmos-go v0.11.1
github.com/enigmampc/btcutil v1.0.3-0.20200723161021-e2fb6adb2a25
github.com/gogo/gateway v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ github.com/cosmos/iavl v0.15.0-rc5/go.mod h1:WqoPL9yPTQ85QBMT45OOUzPxG/U/JcJoN7u
github.com/cosmos/iavl v0.15.3/go.mod h1:OLjQiAQ4fGD2KDZooyJG9yz+p2ao2IAYSbke8mVvSA4=
github.com/cosmos/iavl v0.16.0 h1:ICIOB8xysirTX27GmVAaoeSpeozzgSu9d49w36xkVJA=
github.com/cosmos/iavl v0.16.0/go.mod h1:2A8O/Jz9YwtjqXMO0CjnnbTYEEaovE8jWcwrakH3PoE=
github.com/cosmos/iavl v0.17.1-0.20210901093355-89f6b77e9284 h1:K5I1Z3qKksdun+GbjxBmhJN/cZ1hDDsIQs3oejVeQ7c=
github.com/cosmos/iavl v0.17.1-0.20210901093355-89f6b77e9284/go.mod h1:bopHqfvADWWa2ngnLBVS79NeHV8qiNjTA2EK61bdbGs=
github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76 h1:DdzS1m6o/pCqeZ8VOAit/gyATedRgjvkVI+UCrLpyuU=
github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76/go.mod h1:0mkLWIoZuQ7uBoospo5Q9zIpqq6rYCPJDSUdeCJvPM8=
github.com/cosmos/ledger-cosmos-go v0.11.1 h1:9JIYsGnXP613pb2vPjFeMMjBI5lEDsEaF6oYorTy6J4=
Expand Down
2 changes: 1 addition & 1 deletion server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(clientCtx, grpcSrv)
app.RegisterGRPCServer(grpcSrv)
// reflection allows consumers to build dynamic clients that can write
// to any cosmos-sdk application without relying on application packages at compile time
err := reflection.Register(grpcSrv, reflection.Config{
Expand Down
Loading