Skip to content

Commit

Permalink
Merge pull request #6632 from heyitsanthony/grpc-naming
Browse files Browse the repository at this point in the history
clientv3/naming: support resolving to multiple hosts
  • Loading branch information
Anthony Romano authored Oct 12, 2016
2 parents a97866b + 3dbd30f commit 546873f
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 63 deletions.
65 changes: 65 additions & 0 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# gRPC naming and discovery

etcd provides a gRPC resolver to support an alternative name system that fetches endpoints from etcd for discovering gRPC services. The underlying mechanism is based on watching updates to keys prefixed with the service name.

## Using etcd discovery with go-grpc

The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:

```go
import (
"github.com/coreos/etcd/clientv3"
etcdnaming "github.com/coroes/etcd/clientv3/naming"

"google.golang.org/grpc"
)

...

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b))
```

## Managing service endpoints

The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.

### Adding an endpoint

New endpoints can be added to the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
```

The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
```

### Deleting an endpoint

Hosts can be deleted from the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
```

The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
```

### Registering an endpoint with a lease

Registering an endpoint with a lease ensures that if the host can't maintain a keepalive heartbeat (e.g., its machine fails), it will be removed from the service:

```sh
lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease
```
2 changes: 2 additions & 0 deletions Documentation/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The easiest way to get started using etcd as a distributed key-value store is to
- [Interacting with etcd][interacting]
- [API references][api_ref]
- [gRPC gateway][api_grpc_gateway]
- [gRPC naming and discovery][grpc_naming]
- [Embedding etcd][embed_etcd]
- [Experimental features and APIs][experimental]

Expand Down Expand Up @@ -60,6 +61,7 @@ To learn more about the concepts and internals behind etcd, read the following p
[demo]: demo.md
[download_build]: dl_build.md
[embed_etcd]: https://godoc.org/github.com/coreos/etcd/embed
[grpc_naming]: dev-guide/grpc_naming.md
[failures]: op-guide/failures.md
[gateway]: op-guide/gateway.md
[glossary]: learning/glossary.md
Expand Down
133 changes: 73 additions & 60 deletions clientv3/naming/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,99 +16,112 @@ package naming

import (
"encoding/json"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
etcd "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)

const (
gRPCNamingPrefix = "/github.com/grpc/"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/naming"
)

// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
type GRPCResolver struct {
// Client is an initialized etcd client
Client *clientv3.Client
// Timeout for update/delete request.
Timeout time.Duration
// Client is an initialized etcd client.
Client *etcd.Client
}

func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error {
update := naming.Update{
Addr: addr,
Metadata: metadata,
}
val, err := json.Marshal(update)
if err != nil {
return err
}

ctx := context.Background()
if gr.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
defer cancel()
}

_, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val))
return err
}

func (gr *GRPCResolver) Delete(target string) error {
ctx := context.Background()
if gr.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
defer cancel()
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update) (err error) {
switch nm.Op {
case naming.Add:
var v []byte
if v, err = json.Marshal(nm); err != nil {
return grpc.Errorf(codes.InvalidArgument, err.Error())
}
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v))
case naming.Delete:
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr)
default:
return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
}

_, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target)
return err
}

func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
cctx, cancel := context.WithCancel(context.Background())

wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target)

w := &gRPCWatcher{
cancel: cancel,
wch: wch,
}

ctx, cancel := context.WithCancel(context.Background())
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
return w, nil
}

type gRPCWatcher struct {
c *etcd.Client
target string
ctx context.Context
cancel context.CancelFunc
wch clientv3.WatchChan
wch etcd.WatchChan
err error
}

// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
if gw.wch == nil {
// first Next() returns all addresses
return gw.firstNext()
}
if gw.err != nil {
return nil, gw.err
}

// process new events on target/*
wr, ok := <-gw.wch
if !ok {
return nil, wr.Err()
gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
return nil, gw.err
}
if gw.err = wr.Err(); gw.err != nil {
return nil, gw.err
}

updates := make([]*naming.Update, 0, len(wr.Events))

for _, e := range wr.Events {
var jupdate naming.Update
var err error
switch e.Type {
case mvccpb.PUT:
var jupdate naming.Update
err := json.Unmarshal(e.Kv.Value, &jupdate)
if err != nil {
continue
}
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &jupdate)
jupdate.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
jupdate.Op = naming.Delete
}
if err == nil {
updates = append(updates, &jupdate)
case mvccpb.DELETE:
updates = append(updates, &naming.Update{Op: naming.Delete})
}
}
return updates, nil
}

func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// Use serialized request so resolution still works if the target etcd
// server is partitioned away from the quorum.
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
if gw.err = err; err != nil {
return nil, err
}

updates := make([]*naming.Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var jupdate naming.Update
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
continue
}
updates = append(updates, &jupdate)
}

opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
return updates, nil
}

Expand Down
64 changes: 61 additions & 3 deletions clientv3/naming/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package naming

import (
"encoding/json"
"reflect"
"testing"

"golang.org/x/net/context"
"google.golang.org/grpc/naming"

etcd "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)
Expand All @@ -40,7 +43,8 @@ func TestGRPCResolver(t *testing.T) {
}
defer w.Close()

err = r.Add("foo", "127.0.0.1", "metadata")
addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"}
err = r.Update(context.TODO(), "foo", addOp)
if err != nil {
t.Fatal("failed to add foo", err)
}
Expand All @@ -60,18 +64,72 @@ func TestGRPCResolver(t *testing.T) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}

err = r.Delete("foo")
delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"}
err = r.Update(context.TODO(), "foo", delOp)

us, err = w.Next()
if err != nil {
t.Fatal("failed to get udpate", err)
}

wu = &naming.Update{
Op: naming.Delete,
Op: naming.Delete,
Addr: "127.0.0.1",
Metadata: "metadata",
}

if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
}

// TestGRPCResolverMultiInit ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestGRPCResolverMulti(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
c := clus.RandClient()

v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"})
if verr != nil {
t.Fatal(verr)
}
if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
t.Fatal(err)
}
if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
t.Fatal(err)
}

r := GRPCResolver{c}

w, err := r.Resolve("foo")
if err != nil {
t.Fatal("failed to resolve foo", err)
}
defer w.Close()

updates, nerr := w.Next()
if nerr != nil {
t.Fatal(nerr)
}
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}

_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}

updates, nerr = w.Next()
if nerr != nil {
t.Fatal(nerr)
}
if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
t.Fatalf("expected two updates, got %+v", updates)
}
}

0 comments on commit 546873f

Please sign in to comment.