diff --git a/Documentation/dev-guide/grpc_naming.md b/Documentation/dev-guide/grpc_naming.md new file mode 100644 index 00000000000..23769954c91 --- /dev/null +++ b/Documentation/dev-guide/grpc_naming.md @@ -0,0 +1,65 @@ +# gRPC naming and discovery + +etcd provides a gRPC resolver to support an alternative name system that fetches endpoints from etcd for discovering gRPC services. The underlying mechanism is based on watching updates to keys prefixed with the service name. + +## Using etcd discovery with go-grpc + +The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution: + +```go +import ( + "github.com/coreos/etcd/clientv3" + etcdnaming "github.com/coroes/etcd/clientv3/naming" + + "google.golang.org/grpc" +) + +... + +cli, cerr := clientv3.NewFromURL("http://localhost:2379") +r := &etcdnaming.GRPCResolver{Client: cli} +b := grpc.RoundRobin(r) +conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b)) +``` + +## Managing service endpoints + +The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys. + +### Adding an endpoint + +New endpoints can be added to the service through `etcdctl`: + +```sh +ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' +``` + +The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`: + +```go +r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."}) +``` + +### Deleting an endpoint + +Hosts can be deleted from the service through `etcdctl`: + +```sh +ETCDCTL_API=3 etcdctl del my-service/1.2.3.4 +``` + +The etcd client's `GRPCResolver.Update` method also supports deleting endpoints: + +```go +r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"}) +``` + +### Registering an endpoint with a lease + +Registering an endpoint with a lease ensures that if the host can't maintain a keepalive heartbeat (e.g., its machine fails), it will be removed from the service: + +```sh +lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '` +ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' +ETCDCTL_API=3 etcdctl lease keep-alive $lease +``` diff --git a/Documentation/docs.md b/Documentation/docs.md index 36dfd4c17e2..27be7bcd1ab 100644 --- a/Documentation/docs.md +++ b/Documentation/docs.md @@ -14,6 +14,7 @@ The easiest way to get started using etcd as a distributed key-value store is to - [Interacting with etcd][interacting] - [API references][api_ref] - [gRPC gateway][api_grpc_gateway] + - [gRPC naming and discovery][grpc_naming] - [Embedding etcd][embed_etcd] - [Experimental features and APIs][experimental] @@ -60,6 +61,7 @@ To learn more about the concepts and internals behind etcd, read the following p [demo]: demo.md [download_build]: dl_build.md [embed_etcd]: https://godoc.org/github.com/coreos/etcd/embed +[grpc_naming]: dev-guide/grpc_naming.md [failures]: op-guide/failures.md [gateway]: op-guide/gateway.md [glossary]: learning/glossary.md diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go index 100899ea09f..0f9973a157d 100644 --- a/clientv3/naming/grpc.go +++ b/clientv3/naming/grpc.go @@ -16,99 +16,112 @@ package naming import ( "encoding/json" - "time" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" + etcd "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" - "google.golang.org/grpc/naming" -) -const ( - gRPCNamingPrefix = "/github.com/grpc/" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/naming" ) // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. type GRPCResolver struct { - // Client is an initialized etcd client - Client *clientv3.Client - // Timeout for update/delete request. - Timeout time.Duration + // Client is an initialized etcd client. + Client *etcd.Client } -func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error { - update := naming.Update{ - Addr: addr, - Metadata: metadata, - } - val, err := json.Marshal(update) - if err != nil { - return err - } - - ctx := context.Background() - if gr.Timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) - defer cancel() - } - - _, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val)) - return err -} - -func (gr *GRPCResolver) Delete(target string) error { - ctx := context.Background() - if gr.Timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) - defer cancel() +func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update) (err error) { + switch nm.Op { + case naming.Add: + var v []byte + if v, err = json.Marshal(nm); err != nil { + return grpc.Errorf(codes.InvalidArgument, err.Error()) + } + _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v)) + case naming.Delete: + _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr) + default: + return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op") } - - _, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target) return err } func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { - cctx, cancel := context.WithCancel(context.Background()) - - wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target) - - w := &gRPCWatcher{ - cancel: cancel, - wch: wch, - } - + ctx, cancel := context.WithCancel(context.Background()) + w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} return w, nil } type gRPCWatcher struct { + c *etcd.Client + target string + ctx context.Context cancel context.CancelFunc - wch clientv3.WatchChan + wch etcd.WatchChan + err error } +// Next gets the next set of updates from the etcd resolver. +// Calls to Next should be serialized; concurrent calls are not safe since +// there is no way to reconcile the update ordering. func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { + if gw.wch == nil { + // first Next() returns all addresses + return gw.firstNext() + } + if gw.err != nil { + return nil, gw.err + } + + // process new events on target/* wr, ok := <-gw.wch if !ok { - return nil, wr.Err() + gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed") + return nil, gw.err + } + if gw.err = wr.Err(); gw.err != nil { + return nil, gw.err } updates := make([]*naming.Update, 0, len(wr.Events)) - for _, e := range wr.Events { + var jupdate naming.Update + var err error switch e.Type { - case mvccpb.PUT: - var jupdate naming.Update - err := json.Unmarshal(e.Kv.Value, &jupdate) - if err != nil { - continue - } + case etcd.EventTypePut: + err = json.Unmarshal(e.Kv.Value, &jupdate) + jupdate.Op = naming.Add + case etcd.EventTypeDelete: + err = json.Unmarshal(e.PrevKv.Value, &jupdate) + jupdate.Op = naming.Delete + } + if err == nil { updates = append(updates, &jupdate) - case mvccpb.DELETE: - updates = append(updates, &naming.Update{Op: naming.Delete}) } } + return updates, nil +} + +func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { + // Use serialized request so resolution still works if the target etcd + // server is partitioned away from the quorum. + resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) + if gw.err = err; err != nil { + return nil, err + } + + updates := make([]*naming.Update, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var jupdate naming.Update + if err := json.Unmarshal(kv.Value, &jupdate); err != nil { + continue + } + updates = append(updates, &jupdate) + } + opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} + gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) return updates, nil } diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go index 8d0248376d0..ad2d206626b 100644 --- a/clientv3/naming/grpc_test.go +++ b/clientv3/naming/grpc_test.go @@ -15,11 +15,14 @@ package naming import ( + "encoding/json" "reflect" "testing" + "golang.org/x/net/context" "google.golang.org/grpc/naming" + etcd "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" ) @@ -40,7 +43,8 @@ func TestGRPCResolver(t *testing.T) { } defer w.Close() - err = r.Add("foo", "127.0.0.1", "metadata") + addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"} + err = r.Update(context.TODO(), "foo", addOp) if err != nil { t.Fatal("failed to add foo", err) } @@ -60,7 +64,8 @@ func TestGRPCResolver(t *testing.T) { t.Fatalf("up = %#v, want %#v", us[0], wu) } - err = r.Delete("foo") + delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"} + err = r.Update(context.TODO(), "foo", delOp) us, err = w.Next() if err != nil { @@ -68,10 +73,63 @@ func TestGRPCResolver(t *testing.T) { } wu = &naming.Update{ - Op: naming.Delete, + Op: naming.Delete, + Addr: "127.0.0.1", + Metadata: "metadata", } if !reflect.DeepEqual(us[0], wu) { t.Fatalf("up = %#v, want %#v", us[0], wu) } } + +// TestGRPCResolverMultiInit ensures the resolver will initialize +// correctly with multiple hosts and correctly receive multiple +// updates in a single revision. +func TestGRPCResolverMulti(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + c := clus.RandClient() + + v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"}) + if verr != nil { + t.Fatal(verr) + } + if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil { + t.Fatal(err) + } + if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil { + t.Fatal(err) + } + + r := GRPCResolver{c} + + w, err := r.Resolve("foo") + if err != nil { + t.Fatal("failed to resolve foo", err) + } + defer w.Close() + + updates, nerr := w.Next() + if nerr != nil { + t.Fatal(nerr) + } + if len(updates) != 2 { + t.Fatalf("expected two updates, got %+v", updates) + } + + _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() + if err != nil { + t.Fatal(err) + } + + updates, nerr = w.Next() + if nerr != nil { + t.Fatal(nerr) + } + if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) { + t.Fatalf("expected two updates, got %+v", updates) + } +}