Skip to content

Commit

Permalink
Add health checks (lytics/lio#27939) (PR #181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes authored Jun 14, 2022
1 parent 3cd38c9 commit 55d87c6
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 157 deletions.
40 changes: 40 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package grid

import (
"context"
"time"
)

type backoff struct {
timer *time.Timer
d time.Duration
}

func newBackoff() *backoff {
return &backoff{timer: time.NewTimer(0)}
}

func (b *backoff) Backoff(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.timer.C:
b.bumpd()
b.timer.Reset(b.d)
return nil
}
}

func (b *backoff) bumpd() {
switch b.d {
case 0:
b.d = 125 * time.Millisecond
case 64 * time.Second:
default:
b.d = b.d * 2
}
}

func (b *backoff) Stop() {
b.timer.Stop()
}
193 changes: 94 additions & 99 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grid
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"strconv"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/lytics/retry"
etcdv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// Register a message so it may be sent and received.
Expand All @@ -30,7 +32,7 @@ func Register(v interface{}) error {
return codec.Register(v)
}

//clientAndConnPool is a pool of clientAndConn
// clientAndConnPool is a pool of clientAndConn
type clientAndConnPool struct {
// The 'id' is used in a kind of CAS when
// deleting the client pool. This allows
Expand All @@ -46,7 +48,7 @@ type clientAndConnPool struct {
func (ccp *clientAndConnPool) next() (*clientAndConn, error) {
// Testing hook, used easily check
// a code path in the client.
if ccp == nil || len(ccp.clientConns) == 0 {
if ccp == nil {
return nil, fmt.Errorf("client and conn pool is nil")
}
if len(ccp.clientConns) == 0 {
Expand Down Expand Up @@ -79,7 +81,8 @@ func (ccp *clientAndConnPool) close() error {
// plus the actual gRPC client connection.
type clientAndConn struct {
conn *grpc.ClientConn
client WireClient
wire WireClient
health healthpb.HealthClient
}

// close the gRPC connection.
Expand Down Expand Up @@ -178,109 +181,43 @@ func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{})
}

var res *Delivery
retry.X(3, 1*time.Second, func() bool {
retry.X(3, time.Second, func() bool {
var client WireClient
var clientID int64
client, clientID, err = c.getWireClient(ctx, nsReceiver)
if err != nil && strings.Contains(err.Error(), ErrUnregisteredMailbox.Error()) {
// Test hook.
c.cs.Inc(numErrUnregisteredMailbox)
// Receiver is currently unregistered, so
// clear them out of the cache and don't
// try finding them again.
c.deleteAddress(nsReceiver)
return false
}
if err != nil {
return false
}

res, err = client.Process(ctx, req)
if err != nil && strings.Contains(err.Error(), "Error while dialing") {
// Test hook.
c.cs.Inc(numErrWhileDialing)
// The request is via a client that cannot
// dial to the requested receiver.
c.deleteClientAndConn(nsReceiver, clientID)
select {
case <-ctx.Done():
return false
default:
return true
}
}
if err != nil && strings.Contains(err.Error(), "the client connection is closing") {
// Test hook.
c.cs.Inc(numErrClientConnectionClosing)
// The request is via a client that is
// closing and gRPC is reporting that
// a request is not a valid operation.
c.deleteClientAndConn(nsReceiver, clientID)
select {
case <-ctx.Done():
return false
default:
return true
}
}
if err != nil && strings.Contains(err.Error(), "the connection is unavailable") {
// Test hook.
c.cs.Inc(numErrConnectionUnavailable)
// Receiver is on a host that may have died.
// The error "connection is unavailable"
// comes from gRPC itself. In such a case
// it's best to try and replace the client.
c.deleteClientAndConn(nsReceiver, clientID)
select {
case <-ctx.Done():
return false
default:
return true
}
}
if err != nil && strings.Contains(err.Error(), "connection refused") {
// Test hook.
c.cs.Inc(numErrConnectionRefused)
// Receiver is on a host that may have died.
// The error "connection refused" comes from
// gRPC itself. In such a case it's best to
// try and replace the client.
c.deleteClientAndConn(nsReceiver, clientID)
select {
case <-ctx.Done():
return false
default:
return true
}
}
if err != nil && strings.Contains(err.Error(), ErrUnknownMailbox.Error()) {
// Test hook.
c.cs.Inc(numErrUnknownMailbox)
// Receiver possibly moved to different
// host for one reason or another. Get
// rid of old address and try discovering
// new host, and send again.
c.deleteAddress(nsReceiver)
select {
case <-ctx.Done():
return false
default:
return true
if err != nil {
switch {
case c.handleGRPCErrs(ctx, err, nsReceiver, clientID):
case strings.Contains(err.Error(), ErrUnknownMailbox.Error()):
// Test hook.
c.cs.Inc(numErrUnknownMailbox)
// Receiver possibly moved to different
// host for one reason or another. Get
// rid of old address and try discovering
// new host, and send again.
c.deleteAddress(nsReceiver)
case strings.Contains(err.Error(), ErrReceiverBusy.Error()):
// Test hook.
c.cs.Inc(numErrReceiverBusy)
// Receiver was busy, ie: the receiving channel
// was at capacity. Also, the reciever definitely
// did NOT get the message, so there is no risk
// of duplication if the request is tried again.
}
}
if err != nil && strings.Contains(err.Error(), ErrReceiverBusy.Error()) {
// Test hook.
c.cs.Inc(numErrReceiverBusy)
// Receiver was busy, ie: the receiving channel
// was at capacity. Also, the reciever definitely
// did NOT get the message, so there is no risk
// of duplication if the request is tried again.

select {
case <-ctx.Done():
return false
default:
return true
}
}

return false
})
if err != nil {
Expand All @@ -300,15 +237,29 @@ func (c *Client) getWireClient(ctx context.Context, nsReceiver string) (WireClie
c.mu.Lock()
defer c.mu.Unlock()

const noID = -1
cc, id, err := c.getCCLocked(ctx, nsReceiver)
if err != nil {
return nil, id, err
}

return cc.wire, id, nil
}

func (c *Client) getCCLocked(ctx context.Context, nsReceiver string) (*clientAndConn, int64, error) {
const noID = -1
// Test hook.
c.cs.Inc(numGetWireClient)

address, ok := c.addresses[nsReceiver]
if !ok {
reg, err := c.registry.FindRegistration(ctx, nsReceiver)
if err != nil && err == registry.ErrUnknownKey {
if errors.Is(err, registry.ErrUnknownKey) {
// Test hook.
c.cs.Inc(numErrUnregisteredMailbox)
// Receiver is currently unregistered, so
// clear them out of the cache and don't
// try finding them again.
delete(c.addresses, nsReceiver)
return nil, noID, ErrUnregisteredMailbox
}
if err != nil {
Expand All @@ -330,20 +281,24 @@ func (c *Client) getWireClient(ctx context.Context, nsReceiver string) (WireClie
if err != nil {
return nil, noID, err
}
client := NewWireClient(conn)
wclient := NewWireClient(conn)
hclient := healthpb.NewHealthClient(conn)
cc := &clientAndConn{
conn: conn,
client: client,
wire: wclient,
health: hclient,
}
ccpool.clientConns[i] = cc
}
c.clientsAndConns[address] = ccpool
}

cc, err := ccpool.next()
if err != nil {
return nil, noID, err
}
return cc.client, ccpool.id, nil

return cc, ccpool.id, nil
}

func (c *Client) deleteAddress(nsReceiver string) {
Expand Down Expand Up @@ -380,13 +335,53 @@ func (c *Client) deleteClientAndConn(nsReceiver string, clientID int64) {
if clientID != ccpool.id {
return
}
err := ccpool.close()
if err != nil {
if err := ccpool.close(); err != nil {
c.logf("error closing client and connection: %v", err)
}
delete(c.clientsAndConns, address)
}

func (c *Client) handleGRPCErrs(ctx context.Context, err error, nsReceiver string, clientID int64) bool {
switch {
case err == nil:
return false
case strings.Contains(err.Error(), "Error while dialing"):
// Test hook.
c.cs.Inc(numErrWhileDialing)
// The request is via a client that cannot
// dial to the requested receiver.
c.deleteClientAndConn(nsReceiver, clientID)
return true
case strings.Contains(err.Error(), "the client connection is closing"):
// Test hook.
c.cs.Inc(numErrClientConnectionClosing)
// The request is via a client that is
// closing and gRPC is reporting that
// a request is not a valid operation.
c.deleteClientAndConn(nsReceiver, clientID)
return true
case strings.Contains(err.Error(), "the connection is unavailable"):
// Test hook.
c.cs.Inc(numErrConnectionUnavailable)
// Receiver is on a host that may have died.
// The error "connection is unavailable"
// comes from gRPC itself. In such a case
// it's best to try and replace the client.
c.deleteClientAndConn(nsReceiver, clientID)
return true
case strings.Contains(err.Error(), "connection refused"):
// Test hook.
c.cs.Inc(numErrConnectionRefused)
// Receiver is on a host that may have died.
// The error "connection refused" comes from
// gRPC itself. In such a case it's best to
// try and replace the client.
c.deleteClientAndConn(nsReceiver, clientID)
return true
}
return false
}

func (c *Client) logf(format string, v ...interface{}) {
if c.cfg.Logger != nil {
c.cfg.Logger.Printf(format, v...)
Expand Down Expand Up @@ -458,7 +453,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr

// if the group is configured to Fastest, and we had at least one successful
// request, then don't return an error
if g.fastest && broadcastErr != nil && atomic.LoadInt32(&successes) > 0 {
if broadcastErr != nil && g.fastest && successes > 0 {
broadcastErr = nil
}
return res, broadcastErr
Expand Down
Loading

0 comments on commit 55d87c6

Please sign in to comment.