From 066ae1d4a3f8e7437d8138492652d42ae361cb02 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Mon, 25 Mar 2019 17:05:14 -0700 Subject: [PATCH] etcdserver: filter rpc request to learner Hardcoded allowed rpc for learner node. Added filtering in grpc interceptor to check if rpc is allowed for learner node. --- etcdserver/api/v3rpc/interceptor.go | 9 +++++++++ etcdserver/api/v3rpc/rpctypes/error.go | 1 + etcdserver/api/v3rpc/util.go | 13 +++++++++++++ 3 files changed, 23 insertions(+) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 16865e22c75..586cf67a0e0 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -48,6 +48,11 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return nil, rpctypes.ErrGRPCNotCapable } + // TODO: add test in clientv3/integration to verify behavior + if s.IsLearner() && !isRPCEnabledForLearner(req) { + return nil, rpctypes.ErrGPRCNotSupportedForLearner + } + md, ok := metadata.FromIncomingContext(ctx) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { @@ -190,6 +195,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNotCapable } + if s.IsLearner() { // learner does not support Watch and LeaseKeepAlive RPC + return rpctypes.ErrGPRCNotSupportedForLearner + } + md, ok := metadata.FromIncomingContext(ss.Context()) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 9e45cea5b6e..3d1ee11b083 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -69,6 +69,7 @@ var ( ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() + ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() errStringToError = map[string]error{ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 5887dfeba44..0d8b980c80b 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" @@ -116,3 +117,15 @@ func isClientCtxErr(ctxErr error, err error) bool { } return false } + +// in v3.4, learner is allowed to serve serializable read and endpoint status +func isRPCEnabledForLearner(req interface{}) bool { + switch r := req.(type) { + case *pb.StatusRequest: + return true + case *pb.RangeRequest: + return r.Serializable + default: + return false + } +}