Skip to content

Commit

Permalink
Get rid of legacy client/v3/naming API.
Browse files Browse the repository at this point in the history
Update grpcproxy to use the new abstractions.
  • Loading branch information
ptabor committed Feb 9, 2021
1 parent 3fddea9 commit 858bafe
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 328 deletions.
2 changes: 1 addition & 1 deletion client/v3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts)
ops := make([]clientv3.Op, 0, len(updates))
for _, update := range updates {
if !strings.HasPrefix(update.Key, m.target+"/") {
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target)
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
}

switch update.Op {
Expand Down
133 changes: 0 additions & 133 deletions client/v3/naming/grpc.go

This file was deleted.

67 changes: 36 additions & 31 deletions server/proxy/grpcproxy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"sync"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"golang.org/x/time/rate"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)

// allow maximum 1 retry per second
Expand All @@ -38,39 +36,48 @@ type clusterProxy struct {
lg *zap.Logger
clus clientv3.Cluster
ctx context.Context
gr *naming.GRPCResolver

// advertise client URL
advaddr string
prefix string

em endpoints.Manager

umu sync.RWMutex
umap map[string]gnaming.Update
umap map[string]endpoints.Endpoint
}

// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
// The returned channel is closed when there is grpc-proxy endpoint registered
// and the client's context is canceled so the 'register' loop returns.
// TODO: Expand the API to report creation errors
func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
if lg == nil {
lg = zap.NewNop()
}

em, err := endpoints.NewManager(c, prefix)
if err != nil {
lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err))
return nil, nil
}

cp := &clusterProxy{
lg: lg,
clus: c.Cluster,
ctx: c.Ctx(),
gr: &naming.GRPCResolver{Client: c},

advaddr: advaddr,
prefix: prefix,
umap: make(map[string]gnaming.Update),
umap: make(map[string]endpoints.Endpoint),
em: em,
}

donec := make(chan struct{})
if advaddr != "" && prefix != "" {
go func() {
defer close(donec)
cp.resolve(prefix)
cp.establishEndpointWatch(prefix)
}()
return cp, donec
}
Expand All @@ -79,38 +86,36 @@ func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix
return cp, donec
}

func (cp *clusterProxy) resolve(prefix string) {
func (cp *clusterProxy) establishEndpointWatch(prefix string) {
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
for rm.Wait(cp.ctx) == nil {
wa, err := cp.gr.Resolve(prefix)
wc, err := cp.em.NewWatchChannel(cp.ctx)
if err != nil {
cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err))
cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err))
continue
}
cp.monitor(wa)
cp.monitor(wc)
}
}

func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
for cp.ctx.Err() == nil {
ups, err := wa.Next()
if err != nil {
cp.lg.Warn("clusterProxy watcher error", zap.Error(err))
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
return
}
}

cp.umu.Lock()
for i := range ups {
switch ups[i].Op {
case gnaming.Add:
cp.umap[ups[i].Addr] = *ups[i]
case gnaming.Delete:
delete(cp.umap, ups[i].Addr)
func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
for {
select {
case <-cp.ctx.Done():
cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err()))
return
case updates := <-wa:
cp.umu.Lock()
for _, up := range updates {
switch up.Op {
case endpoints.Add:
cp.umap[up.Endpoint.Addr] = up.Endpoint
case endpoints.Delete:
delete(cp.umap, up.Endpoint.Addr)
}
}
cp.umu.Unlock()
}
cp.umu.Unlock()
}
}

Expand Down
11 changes: 7 additions & 4 deletions server/proxy/grpcproxy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (

"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)

// allow maximum 1 retry per second
Expand Down Expand Up @@ -68,8 +67,12 @@ func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr str
return nil, err
}

gr := &naming.GRPCResolver{Client: c}
if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
return nil, err
}
endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()}
if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 858bafe

Please sign in to comment.