Skip to content

Commit

Permalink
Get rid of legacy client/v3/naming API.
Browse files Browse the repository at this point in the history
Update grpcproxy to use the new abstractions.
  • Loading branch information
ptabor committed Feb 8, 2021
1 parent 3fddea9 commit 7938563
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 324 deletions.
133 changes: 0 additions & 133 deletions client/v3/naming/grpc.go

This file was deleted.

63 changes: 32 additions & 31 deletions server/proxy/grpcproxy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"sync"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"golang.org/x/time/rate"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)

// allow maximum 1 retry per second
Expand All @@ -38,14 +36,13 @@ type clusterProxy struct {
lg *zap.Logger
clus clientv3.Cluster
ctx context.Context
gr *naming.GRPCResolver

// advertise client URL
advaddr string
prefix string

umu sync.RWMutex
umap map[string]gnaming.Update
umap map[string]endpoints.Endpoint
}

// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
Expand All @@ -55,22 +52,28 @@ func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix
if lg == nil {
lg = zap.NewNop()
}

cp := &clusterProxy{
lg: lg,
clus: c.Cluster,
ctx: c.Ctx(),
gr: &naming.GRPCResolver{Client: c},

advaddr: advaddr,
prefix: prefix,
umap: make(map[string]gnaming.Update),
umap: make(map[string]endpoints.Endpoint),
}

donec := make(chan struct{})
if advaddr != "" && prefix != "" {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
cp.lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err))
close(donec)
return cp, donec
}
go func() {
defer close(donec)
cp.resolve(prefix)
cp.establishEndpointWatch(em, prefix)
}()
return cp, donec
}
Expand All @@ -79,38 +82,36 @@ func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix
return cp, donec
}

func (cp *clusterProxy) resolve(prefix string) {
func (cp *clusterProxy) establishEndpointWatch(em endpoints.Manager, prefix string) {
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
for rm.Wait(cp.ctx) == nil {
wa, err := cp.gr.Resolve(prefix)
wc, err := em.NewWatchChannel(cp.ctx)
if err != nil {
cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err))
cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err))
continue
}
cp.monitor(wa)
cp.monitor(wc)
}
}

func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
for cp.ctx.Err() == nil {
ups, err := wa.Next()
if err != nil {
cp.lg.Warn("clusterProxy watcher error", zap.Error(err))
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
return
}
}

cp.umu.Lock()
for i := range ups {
switch ups[i].Op {
case gnaming.Add:
cp.umap[ups[i].Addr] = *ups[i]
case gnaming.Delete:
delete(cp.umap, ups[i].Addr)
func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
for {
select {
case <-cp.ctx.Done():
cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err()))
return
case updates := <-wa:
cp.umu.Lock()
for _, up := range updates {
switch up.Op {
case endpoints.Add:
cp.umap[up.Endpoint.Addr] = up.Endpoint
case endpoints.Delete:
delete(cp.umap, up.Endpoint.Addr)
}
}
cp.umu.Unlock()
}
cp.umu.Unlock()
}
}

Expand Down
11 changes: 7 additions & 4 deletions server/proxy/grpcproxy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (

"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)

// allow maximum 1 retry per second
Expand Down Expand Up @@ -68,8 +67,12 @@ func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr str
return nil, err
}

gr := &naming.GRPCResolver{Client: c}
if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
return nil, err
}
endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()}
if err = em.AddEndpoint(c.Ctx(), prefix, endpoint, clientv3.WithLease(ss.Lease())); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 7938563

Please sign in to comment.