From 55d87c6dad1d02d29e5aea1bbff805a7eb1a27b2 Mon Sep 17 00:00:00 2001 From: Matt Braymer-Hayes Date: Tue, 14 Jun 2022 11:29:10 -0400 Subject: [PATCH] Add health checks (lytics/lio#27939) (PR #181) --- backoff.go | 40 +++++++++ client.go | 193 ++++++++++++++++++++---------------------- client_health.go | 118 ++++++++++++++++++++++++++ client_health_test.go | 23 +++++ client_test.go | 32 ++++--- context_test.go | 2 +- go.mod | 2 +- mailbox_test.go | 10 +-- namespace_test.go | 6 +- query_test.go | 4 +- registry/registry.go | 21 ++++- server.go | 58 ++++++++----- server_test.go | 11 ++- 13 files changed, 363 insertions(+), 157 deletions(-) create mode 100644 backoff.go create mode 100644 client_health.go create mode 100644 client_health_test.go diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..1b2afdc --- /dev/null +++ b/backoff.go @@ -0,0 +1,40 @@ +package grid + +import ( + "context" + "time" +) + +type backoff struct { + timer *time.Timer + d time.Duration +} + +func newBackoff() *backoff { + return &backoff{timer: time.NewTimer(0)} +} + +func (b *backoff) Backoff(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-b.timer.C: + b.bumpd() + b.timer.Reset(b.d) + return nil + } +} + +func (b *backoff) bumpd() { + switch b.d { + case 0: + b.d = 125 * time.Millisecond + case 64 * time.Second: + default: + b.d = b.d * 2 + } +} + +func (b *backoff) Stop() { + b.timer.Stop() +} diff --git a/client.go b/client.go index ac68fc2..fcdbbcc 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package grid import ( "bytes" "context" + "errors" "fmt" "math/rand" "strconv" @@ -16,6 +17,7 @@ import ( "github.com/lytics/retry" etcdv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // Register a message so it may be sent and received. @@ -30,7 +32,7 @@ func Register(v interface{}) error { return codec.Register(v) } -//clientAndConnPool is a pool of clientAndConn +// clientAndConnPool is a pool of clientAndConn type clientAndConnPool struct { // The 'id' is used in a kind of CAS when // deleting the client pool. This allows @@ -46,7 +48,7 @@ type clientAndConnPool struct { func (ccp *clientAndConnPool) next() (*clientAndConn, error) { // Testing hook, used easily check // a code path in the client. - if ccp == nil || len(ccp.clientConns) == 0 { + if ccp == nil { return nil, fmt.Errorf("client and conn pool is nil") } if len(ccp.clientConns) == 0 { @@ -79,7 +81,8 @@ func (ccp *clientAndConnPool) close() error { // plus the actual gRPC client connection. type clientAndConn struct { conn *grpc.ClientConn - client WireClient + wire WireClient + health healthpb.HealthClient } // close the gRPC connection. @@ -178,102 +181,35 @@ func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) } var res *Delivery - retry.X(3, 1*time.Second, func() bool { + retry.X(3, time.Second, func() bool { var client WireClient var clientID int64 client, clientID, err = c.getWireClient(ctx, nsReceiver) - if err != nil && strings.Contains(err.Error(), ErrUnregisteredMailbox.Error()) { - // Test hook. - c.cs.Inc(numErrUnregisteredMailbox) - // Receiver is currently unregistered, so - // clear them out of the cache and don't - // try finding them again. - c.deleteAddress(nsReceiver) - return false - } if err != nil { return false } + res, err = client.Process(ctx, req) - if err != nil && strings.Contains(err.Error(), "Error while dialing") { - // Test hook. - c.cs.Inc(numErrWhileDialing) - // The request is via a client that cannot - // dial to the requested receiver. - c.deleteClientAndConn(nsReceiver, clientID) - select { - case <-ctx.Done(): - return false - default: - return true - } - } - if err != nil && strings.Contains(err.Error(), "the client connection is closing") { - // Test hook. - c.cs.Inc(numErrClientConnectionClosing) - // The request is via a client that is - // closing and gRPC is reporting that - // a request is not a valid operation. - c.deleteClientAndConn(nsReceiver, clientID) - select { - case <-ctx.Done(): - return false - default: - return true - } - } - if err != nil && strings.Contains(err.Error(), "the connection is unavailable") { - // Test hook. - c.cs.Inc(numErrConnectionUnavailable) - // Receiver is on a host that may have died. - // The error "connection is unavailable" - // comes from gRPC itself. In such a case - // it's best to try and replace the client. - c.deleteClientAndConn(nsReceiver, clientID) - select { - case <-ctx.Done(): - return false - default: - return true - } - } - if err != nil && strings.Contains(err.Error(), "connection refused") { - // Test hook. - c.cs.Inc(numErrConnectionRefused) - // Receiver is on a host that may have died. - // The error "connection refused" comes from - // gRPC itself. In such a case it's best to - // try and replace the client. - c.deleteClientAndConn(nsReceiver, clientID) - select { - case <-ctx.Done(): - return false - default: - return true - } - } - if err != nil && strings.Contains(err.Error(), ErrUnknownMailbox.Error()) { - // Test hook. - c.cs.Inc(numErrUnknownMailbox) - // Receiver possibly moved to different - // host for one reason or another. Get - // rid of old address and try discovering - // new host, and send again. - c.deleteAddress(nsReceiver) - select { - case <-ctx.Done(): - return false - default: - return true + if err != nil { + switch { + case c.handleGRPCErrs(ctx, err, nsReceiver, clientID): + case strings.Contains(err.Error(), ErrUnknownMailbox.Error()): + // Test hook. + c.cs.Inc(numErrUnknownMailbox) + // Receiver possibly moved to different + // host for one reason or another. Get + // rid of old address and try discovering + // new host, and send again. + c.deleteAddress(nsReceiver) + case strings.Contains(err.Error(), ErrReceiverBusy.Error()): + // Test hook. + c.cs.Inc(numErrReceiverBusy) + // Receiver was busy, ie: the receiving channel + // was at capacity. Also, the reciever definitely + // did NOT get the message, so there is no risk + // of duplication if the request is tried again. } - } - if err != nil && strings.Contains(err.Error(), ErrReceiverBusy.Error()) { - // Test hook. - c.cs.Inc(numErrReceiverBusy) - // Receiver was busy, ie: the receiving channel - // was at capacity. Also, the reciever definitely - // did NOT get the message, so there is no risk - // of duplication if the request is tried again. + select { case <-ctx.Done(): return false @@ -281,6 +217,7 @@ func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) return true } } + return false }) if err != nil { @@ -300,15 +237,29 @@ func (c *Client) getWireClient(ctx context.Context, nsReceiver string) (WireClie c.mu.Lock() defer c.mu.Unlock() - const noID = -1 + cc, id, err := c.getCCLocked(ctx, nsReceiver) + if err != nil { + return nil, id, err + } + + return cc.wire, id, nil +} +func (c *Client) getCCLocked(ctx context.Context, nsReceiver string) (*clientAndConn, int64, error) { + const noID = -1 // Test hook. c.cs.Inc(numGetWireClient) address, ok := c.addresses[nsReceiver] if !ok { reg, err := c.registry.FindRegistration(ctx, nsReceiver) - if err != nil && err == registry.ErrUnknownKey { + if errors.Is(err, registry.ErrUnknownKey) { + // Test hook. + c.cs.Inc(numErrUnregisteredMailbox) + // Receiver is currently unregistered, so + // clear them out of the cache and don't + // try finding them again. + delete(c.addresses, nsReceiver) return nil, noID, ErrUnregisteredMailbox } if err != nil { @@ -330,20 +281,24 @@ func (c *Client) getWireClient(ctx context.Context, nsReceiver string) (WireClie if err != nil { return nil, noID, err } - client := NewWireClient(conn) + wclient := NewWireClient(conn) + hclient := healthpb.NewHealthClient(conn) cc := &clientAndConn{ conn: conn, - client: client, + wire: wclient, + health: hclient, } ccpool.clientConns[i] = cc } c.clientsAndConns[address] = ccpool } + cc, err := ccpool.next() if err != nil { return nil, noID, err } - return cc.client, ccpool.id, nil + + return cc, ccpool.id, nil } func (c *Client) deleteAddress(nsReceiver string) { @@ -380,13 +335,53 @@ func (c *Client) deleteClientAndConn(nsReceiver string, clientID int64) { if clientID != ccpool.id { return } - err := ccpool.close() - if err != nil { + if err := ccpool.close(); err != nil { c.logf("error closing client and connection: %v", err) } delete(c.clientsAndConns, address) } +func (c *Client) handleGRPCErrs(ctx context.Context, err error, nsReceiver string, clientID int64) bool { + switch { + case err == nil: + return false + case strings.Contains(err.Error(), "Error while dialing"): + // Test hook. + c.cs.Inc(numErrWhileDialing) + // The request is via a client that cannot + // dial to the requested receiver. + c.deleteClientAndConn(nsReceiver, clientID) + return true + case strings.Contains(err.Error(), "the client connection is closing"): + // Test hook. + c.cs.Inc(numErrClientConnectionClosing) + // The request is via a client that is + // closing and gRPC is reporting that + // a request is not a valid operation. + c.deleteClientAndConn(nsReceiver, clientID) + return true + case strings.Contains(err.Error(), "the connection is unavailable"): + // Test hook. + c.cs.Inc(numErrConnectionUnavailable) + // Receiver is on a host that may have died. + // The error "connection is unavailable" + // comes from gRPC itself. In such a case + // it's best to try and replace the client. + c.deleteClientAndConn(nsReceiver, clientID) + return true + case strings.Contains(err.Error(), "connection refused"): + // Test hook. + c.cs.Inc(numErrConnectionRefused) + // Receiver is on a host that may have died. + // The error "connection refused" comes from + // gRPC itself. In such a case it's best to + // try and replace the client. + c.deleteClientAndConn(nsReceiver, clientID) + return true + } + return false +} + func (c *Client) logf(format string, v ...interface{}) { if c.cfg.Logger != nil { c.cfg.Logger.Printf(format, v...) @@ -458,7 +453,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr // if the group is configured to Fastest, and we had at least one successful // request, then don't return an error - if g.fastest && broadcastErr != nil && atomic.LoadInt32(&successes) > 0 { + if broadcastErr != nil && g.fastest && successes > 0 { broadcastErr = nil } return res, broadcastErr diff --git a/client_health.go b/client_health.go new file mode 100644 index 0000000..fb5b2dc --- /dev/null +++ b/client_health.go @@ -0,0 +1,118 @@ +package grid + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/lytics/retry" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +func (c *Client) Check(ctx context.Context, peer string) (*healthpb.HealthCheckResponse, error) { + nsReceiver, err := namespaceName(Peers, c.cfg.Namespace, peer) + if err != nil { + return nil, fmt.Errorf("namespacing name: %w", err) + } + + var resp *healthpb.HealthCheckResponse + retry.X(3, time.Second, func() bool { + var client healthpb.HealthClient + client, _, err = c.getHealthClient(ctx, nsReceiver) + if err != nil { + return false + } + + resp, err = client.Check(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + if ctx.Err() != nil { + return false + } + return true + } + + return false + }) + if err != nil { + return nil, fmt.Errorf("checking health: %w", err) + } + + return resp, nil +} + +func (c *Client) Watch(ctx context.Context, peer string) (healthpb.Health_WatchClient, error) { + nsReceiver, err := namespaceName(Peers, c.cfg.Namespace, peer) + if err != nil { + return nil, fmt.Errorf("namespacing name: %w", err) + } + + var recv healthpb.Health_WatchClient + retry.X(3, time.Second, func() bool { + var client healthpb.HealthClient + client, _, err = c.getHealthClient(ctx, nsReceiver) + if err != nil { + return false + } + + recv, err = client.Watch(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + if ctx.Err() != nil { + return false + } + return true + } + + return false + }) + if err != nil { + return nil, fmt.Errorf("checking health: %w", err) + } + + return recv, nil +} + +// WaitUntilServing blocks until the peer is serving or the context is done. +// Will retry with exponential backoff. +func (c *Client) WaitUntilServing(ctx context.Context, peer string) error { + b := newBackoff() + defer b.Stop() + +LOOP: + for { + if err := b.Backoff(ctx); err != nil { + return fmt.Errorf("backing off: %w", err) + } + + stream, err := c.Watch(ctx, peer) + if err != nil { + c.logf("watching peer: %v", err) + continue + } + + resp := new(healthpb.HealthCheckResponse) + for resp.Status != healthpb.HealthCheckResponse_SERVING { + resp, err = stream.Recv() + if errors.Is(err, io.EOF) { + c.logf("stream ended, restarting") + continue LOOP + } + if err != nil { + return fmt.Errorf("receiving: %w", err) + } + } + + return nil + } +} + +func (c *Client) getHealthClient(ctx context.Context, nsReceiver string) (healthpb.HealthClient, int64, error) { + c.mu.Lock() + defer c.mu.Unlock() + cc, id, err := c.getCCLocked(ctx, nsReceiver) + if err != nil { + return nil, id, err + } + return cc.health, id, nil +} diff --git a/client_health_test.go b/client_health_test.go new file mode 100644 index 0000000..9b42ec2 --- /dev/null +++ b/client_health_test.go @@ -0,0 +1,23 @@ +package grid + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestClientCheck(t *testing.T) { + t.Parallel() + _, server, client := bootstrapClientTest(t) + + peer := server.registry.Registry() + resp, err := client.Check(context.Background(), peer) + require.NoError(t, err) + assert.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) +} + +// NOTE (2022-06) (mh): WaitUntilServing() is tested via bootstrapClientTest(). +// func TestClientWaitUntilServing(t *testing.T) {} diff --git a/client_test.go b/client_test.go index 31feb7b..d957ae1 100644 --- a/client_test.go +++ b/client_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/lytics/grid/v3/testetcd" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -87,7 +88,7 @@ func TestNewClient(t *testing.T) { embed := testetcd.NewEmbedded(t) etcd := testetcd.StartAndConnect(t, embed.Endpoints()) - client, err := NewClient(etcd, ClientCfg{Namespace: newNamespace()}) + client, err := NewClient(etcd, ClientCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -99,7 +100,7 @@ func TestNewClient(t *testing.T) { func TestNewClientWithNilEtcd(t *testing.T) { t.Parallel() - _, err := NewClient(nil, ClientCfg{Namespace: newNamespace()}) + _, err := NewClient(nil, ClientCfg{Namespace: newNamespace(t)}) if err == nil { t.Fatal("expected error") } @@ -112,7 +113,7 @@ func TestClientClose(t *testing.T) { etcd := testetcd.StartAndConnect(t, embed.Endpoints()) // Create client. - client, err := NewClient(etcd, ClientCfg{Namespace: newNamespace()}) + client, err := NewClient(etcd, ClientCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -531,7 +532,7 @@ func TestNilClientStats(t *testing.T) { func bootstrapClientTest(t testing.TB) (*clientv3.Client, *Server, *Client) { t.Helper() // Namespace for test. - namespace := newNamespace() + namespace := newNamespace(t) // Start etcd. embed := testetcd.NewEmbedded(t) @@ -542,15 +543,11 @@ func bootstrapClientTest(t testing.TB) (*clientv3.Client, *Server, *Client) { // Create the server. server, err := NewServer(etcd, ServerCfg{Namespace: namespace, Logger: logger}) - if err != nil { - t.Fatalf("starting server: %v", err) - } + require.NoError(t, err) // Create the listener on a random port. lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("starting listener: %v", err) - } + require.NoError(t, err) // Start the server in the background. done := make(chan struct{}) @@ -562,18 +559,19 @@ func bootstrapClientTest(t testing.TB) (*clientv3.Client, *Server, *Client) { }() t.Cleanup(func() { <-done }) t.Cleanup(server.Stop) - time.Sleep(3 * time.Second) + err = server.WaitUntilStarted(context.Background()) + require.NoError(t, err) // Create a grid client. client, err := NewClient(etcd, ClientCfg{Namespace: namespace, Logger: logger}) - if err != nil { - t.Fatalf("creating test client: %v", err) - } + require.NoError(t, err) t.Cleanup(func() { - if err := client.Close(); err != nil { - t.Fatalf("closing client: %v", err) - } + err := client.Close() + require.NoError(t, err) }) + err = client.WaitUntilServing(context.Background(), server.Name()) + require.NoError(t, err) + return etcd, server, client } diff --git a/context_test.go b/context_test.go index ce7b278..a674642 100644 --- a/context_test.go +++ b/context_test.go @@ -71,7 +71,7 @@ func TestValidContext(t *testing.T) { a := &contextActor{started: make(chan bool)} - server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index b1594d6..8dc7444 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/stretchr/testify v1.7.0 go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.0 - golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 ) @@ -62,6 +61,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect + golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect golang.org/x/text v0.3.5 // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect diff --git a/mailbox_test.go b/mailbox_test.go index 7855d2f..3ed047e 100644 --- a/mailbox_test.go +++ b/mailbox_test.go @@ -1,12 +1,12 @@ package grid import ( + "context" "fmt" "net" "strconv" "sync" "testing" - "time" "github.com/lytics/grid/v3/testetcd" "github.com/stretchr/testify/assert" @@ -198,7 +198,7 @@ func TestMailboxClose(t *testing.T) { embed := testetcd.NewEmbedded(t) etcd := testetcd.StartAndConnect(t, embed.Endpoints()) - s, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + s, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) require.NoError(t, err) lis, err := net.Listen("tcp", "localhost:0") @@ -213,10 +213,8 @@ func TestMailboxClose(t *testing.T) { }() t.Cleanup(func() { <-done }) t.Cleanup(s.Stop) - - for s.isServing() != nil { - time.Sleep(time.Second) - } + err = s.WaitUntilStarted(context.Background()) + require.NoError(t, err) m, err := s.NewMailbox("name", 1) require.NoError(t, err) diff --git a/namespace_test.go b/namespace_test.go index 18e5624..3bc366a 100644 --- a/namespace_test.go +++ b/namespace_test.go @@ -2,9 +2,9 @@ package grid import ( "fmt" - "math/rand" + "testing" ) -func newNamespace() string { - return fmt.Sprintf("test-namespace-%d", rand.Int63()) +func newNamespace(t testing.TB) string { + return fmt.Sprintf("test-namespace-%v", t.Name()) } diff --git a/query_test.go b/query_test.go index 659f863..671781e 100644 --- a/query_test.go +++ b/query_test.go @@ -18,7 +18,7 @@ func TestQuery(t *testing.T) { timeout = 1 * time.Second ) - namespace := newNamespace() + namespace := newNamespace(t) embed := testetcd.NewEmbedded(t) etcd := testetcd.StartAndConnect(t, embed.Endpoints()) @@ -78,7 +78,7 @@ func TestQueryWatch(t *testing.T) { timeout = 1 * time.Second ) - namespace := newNamespace() + namespace := newNamespace(t) embed := testetcd.NewEmbedded(t) etcd := testetcd.StartAndConnect(t, embed.Endpoints()) diff --git a/registry/registry.go b/registry/registry.go index d303859..d03ecd9 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -93,6 +93,7 @@ func (we *WatchEvent) String() string { // Registry for discovery. type Registry struct { mu sync.RWMutex + started bool done chan bool exited chan bool kv etcdv3.KV @@ -259,6 +260,7 @@ func (rr *Registry) Start(addr net.Addr) error { } }() + rr.started = true return nil } @@ -277,6 +279,12 @@ func (rr *Registry) Registry() string { return rr.name } +func (rr *Registry) Started() bool { + rr.mu.RLock() + defer rr.mu.RUnlock() + return rr.started +} + // Stop Registry. func (rr *Registry) Stop() error { rr.mu.Lock() @@ -304,9 +312,16 @@ func (rr *Registry) Stop() error { // all keys associated with this registry // from etcd. timeout, cancel := context.WithTimeout(context.Background(), rr.Timeout) - _, err := rr.lease.Revoke(timeout, rr.leaseID) - cancel() - return err + defer cancel() + if _, err := rr.lease.Revoke(timeout, rr.leaseID); err != nil { + return err + } + + if err := rr.lease.Close(); err != nil { + return err + } + + return nil } // Watch a prefix in the registry. diff --git a/server.go b/server.go index 6f57f90..3c6dd45 100644 --- a/server.go +++ b/server.go @@ -15,6 +15,8 @@ import ( "github.com/lytics/retry" etcdv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) var ( @@ -35,13 +37,13 @@ type Server struct { // mu protects the follow fields, use accessors mu sync.RWMutex finalErr error - serving bool ctx context.Context cancel func() cfg ServerCfg etcd *etcdv3.Client grpc *grpc.Server + health *health.Server stop sync.Once fatalErr chan error actors *makeActorRegistry @@ -80,6 +82,7 @@ func NewServer(etcd *etcdv3.Client, cfg ServerCfg) (*Server, error) { cfg: cfg, etcd: etcd, grpc: grpc.NewServer(), + health: health.NewServer(), actors: newMakeActorRegistry(), fatalErr: make(chan error, 1), registry: r, @@ -142,8 +145,8 @@ func (s *Server) NewMailbox(name string, size int) (Mailbox, error) { } func (s *Server) newMailbox(name, nsName string, size int) (*GRPCMailbox, error) { - if err := s.isServing(); err != nil { - return nil, err + if !s.registry.Started() { + return nil, ErrServerNotRunning } _, ok := s.mailboxes.Get(nsName) @@ -223,6 +226,13 @@ func (s *Server) Context() context.Context { return s.ctx } +// Name of the server. Only valid after Serve() is called and +// the registry has started (server's name is the registry's name). +// Use +func (s *Server) Name() string { + return s.registry.Registry() +} + // Serve the grid on the listener. The listener address type must be // net.TCPAddr, otherwise an error will be returned. func (s *Server) Serve(lis net.Listener) error { @@ -230,11 +240,8 @@ func (s *Server) Serve(lis net.Listener) error { return err } - // Peer's name is the registry's name. - name := s.registry.Registry() - // Namespaced name, which just includes the namespace. - nsName, err := namespaceName(Peers, s.cfg.Namespace, name) + nsName, err := namespaceName(Peers, s.cfg.Namespace, s.Name()) if err != nil { return err } @@ -248,15 +255,11 @@ func (s *Server) Serve(lis net.Listener) error { return err } - s.mu.Lock() - s.serving = true - s.mu.Unlock() - // Start a mailbox, this is critical because starting // actors in a grid is just done via a normal request // sending the message ActorDef to a listening peer's // mailbox. - mailbox, err := s.NewMailbox(name, 100) + mailbox, err := s.NewMailbox(s.Name(), 100) if err != nil { return err } @@ -274,6 +277,8 @@ func (s *Server) Serve(lis net.Listener) error { // gRPC dance to start the gRPC server. The Serve // method blocks still stopped via a call to Stop. RegisterWireServer(s.grpc, s) + healthpb.RegisterHealthServer(s.grpc, s.health) + err = s.grpc.Serve(lis) // Something in gRPC returns the "use of..." error // message even though it stopped fine. Catch that @@ -287,6 +292,26 @@ func (s *Server) Serve(lis net.Listener) error { return s.getFinalErr() } +// WaitUntilStarted waits until the registry has started or until the context is done. +// This allows users to safely access some runtime-specific parameters (e.g., Name()). +// There is no guarantee that the gRPC client has started: use Client.WaitUntilServing() +// for that. +func (s *Server) WaitUntilStarted(ctx context.Context) error { + b := newBackoff() + defer b.Stop() + + for { + if err := b.Backoff(ctx); err != nil { + return fmt.Errorf("backing off: %w", err) + } + if !s.registry.Started() { + s.logf("not yet started") + continue + } + return nil + } +} + // Stop the server, blocking until all mailboxes registered with // this server have called their close method. func (s *Server) Stop() { @@ -576,12 +601,3 @@ func (s *Server) logf(format string, v ...interface{}) { s.cfg.Logger.Printf(format, v...) } } - -func (s *Server) isServing() error { - s.mu.RLock() - defer s.mu.RUnlock() - if !s.serving { - return ErrServerNotRunning - } - return nil -} diff --git a/server_test.go b/server_test.go index 3de2d83..ae521f0 100644 --- a/server_test.go +++ b/server_test.go @@ -35,7 +35,7 @@ func TestServerStartStop(t *testing.T) { stopped: make(chan bool), } - server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -98,7 +98,7 @@ func TestServerWithFatalError(t *testing.T) { stopped: make(chan bool), } - server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -146,7 +146,7 @@ func TestServerStartNoEtcdRunning(t *testing.T) { etcd := testetcd.StartAndConnect(t, embed.Endpoints()) etcd.Close() - server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func TestServerStartThenEtcdStop(t *testing.T) { embed := testetcd.NewEmbedded(t) etcd := testetcd.StartAndConnect(t, embed.Endpoints()) - server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace()}) + server, err := NewServer(etcd, ServerCfg{Namespace: newNamespace(t)}) if err != nil { t.Fatal(err) } @@ -210,3 +210,6 @@ func TestServerStartThenEtcdStop(t *testing.T) { } } } + +// NOTE (2022-06) (mh): WaitUntilStarted() is tested via bootstrapClientTest(). +// func TestServerWaitUntilStarted(t *testing.T) {}