Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce log spam on gossip and raft transport #8529

Merged
merged 1 commit into from
Aug 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion GLOCKFILE
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ golang.org/x/sys a646d33e2ee3172a661fc09bca23bb4889a41bc8
golang.org/x/text 2910a502d2bf9e43193af9d68ca516529614eed3
golang.org/x/tools 0e9f43fcb67267967af8c15d7dc54b373e341d20
google.golang.org/appengine e951d3868b377b14f4e60efa3a301532ee3c1ebf
google.golang.org/grpc c2c110d5cf950aef5e2af86b2f5d0a53400ff90b
google.golang.org/grpc 8a81ddda27e5fdd3b56f3ce841b37dbf7387dd26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This picks up the DialContext method.

gopkg.in/check.v1 4f90aeace3a26ad7021961c297b22c42160c7b25
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/yaml.v1 9f9df34309c04878acc86042b16630b0f696e1de
Expand Down
70 changes: 38 additions & 32 deletions gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/roachpb"
Expand Down Expand Up @@ -69,27 +70,46 @@ func newClient(addr net.Addr, nodeMetrics Metrics) *client {
// start dials the remote addr and commences gossip once connected. Upon exit,
// the client is sent on the disconnected channel. This method starts client
// processing in a goroutine and returns immediately.
func (c *client) start(g *Gossip, disconnected chan *client, rpcCtx *rpc.Context, stopper *stop.Stopper, nodeID roachpb.NodeID) {
ctx := context.TODO()
log.Infof(ctx, "node %d: starting client to %s", nodeID, c.addr)

func (c *client) start(
g *Gossip,
disconnected chan *client,
rpcCtx *rpc.Context,
stopper *stop.Stopper,
nodeID roachpb.NodeID,
breaker *circuit.Breaker,
) {
stopper.RunWorker(func() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer func() {
disconnected <- c
}()

// Note: avoid using `grpc.WithBlock` here. This code is already
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCDial(c.addr.String())
if err != nil {
log.Errorf(ctx, "node %d: failed to dial: %s", nodeID, err)
consecFailures := breaker.ConsecFailures()
var stream Gossip_GossipClient
if err := breaker.Call(func() error {
// Note: avoid using `grpc.WithBlock` here. This code is already
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCDial(c.addr.String())
if err != nil {
return err
}
if stream, err = NewGossipClient(conn).Gossip(ctx); err != nil {
return err
}
return c.requestGossip(g, stream)
}, 0); err != nil {
if consecFailures == 0 {
log.Warningf(ctx, "node %d: failed to start gossip client: %s", nodeID, err)
}
return
}

// Start gossiping.
if err := c.gossip(ctx, g, NewGossipClient(conn), stopper); err != nil {
log.Infof(ctx, "node %d: started gossip client to %s", nodeID, c.addr)
if err := c.gossip(ctx, g, stream, stopper); err != nil {
if !grpcutil.IsClosedConnection(err) {
g.mu.Lock()
peerID := c.peerID
Expand All @@ -116,11 +136,11 @@ func (c *client) close() {
// requestGossip requests the latest gossip from the remote server by
// supplying a map of this node's knowledge of other nodes' high water
// timestamps.
func (c *client) requestGossip(g *Gossip, addr util.UnresolvedAddr, stream Gossip_GossipClient) error {
func (c *client) requestGossip(g *Gossip, stream Gossip_GossipClient) error {
g.mu.Lock()
args := &Request{
NodeID: g.mu.is.NodeID,
Addr: addr,
Addr: g.mu.is.NodeAddr,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
}
g.mu.Unlock()
Expand All @@ -134,12 +154,12 @@ func (c *client) requestGossip(g *Gossip, addr util.UnresolvedAddr, stream Gossi

// sendGossip sends the latest gossip to the remote server, based on
// the remote server's notion of other nodes' high water timestamps.
func (c *client) sendGossip(g *Gossip, addr util.UnresolvedAddr, stream Gossip_GossipClient) error {
func (c *client) sendGossip(g *Gossip, stream Gossip_GossipClient) error {
g.mu.Lock()
if delta := g.mu.is.delta(c.remoteHighWaterStamps); len(delta) > 0 {
args := Request{
NodeID: g.mu.is.NodeID,
Addr: addr,
Addr: g.mu.is.NodeAddr,
Delta: delta,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
}
Expand Down Expand Up @@ -228,21 +248,7 @@ func (c *client) handleResponse(g *Gossip, reply *Response) error {
// gossip loops, sending deltas of the infostore and receiving deltas
// in turn. If an alternate is proposed on response, the client addr
// is modified and method returns for forwarding by caller.
func (c *client) gossip(ctx context.Context, g *Gossip, gossipClient GossipClient, stopper *stop.Stopper) error {
// For un-bootstrapped node, g.is.NodeID is 0 when client start gossip,
// so it's better to get nodeID from g.is every time.
addr := g.GetNodeAddr()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := gossipClient.Gossip(ctx)
if err != nil {
return err
}

if err := c.requestGossip(g, *addr, stream); err != nil {
return err
}

func (c *client) gossip(ctx context.Context, g *Gossip, stream Gossip_GossipClient, stopper *stop.Stopper) error {
sendGossipChan := make(chan struct{}, 1)

// Register a callback for gossip updates.
Expand Down Expand Up @@ -279,7 +285,7 @@ func (c *client) gossip(ctx context.Context, g *Gossip, gossipClient GossipClien
case err := <-errCh:
return err
case <-sendGossipChan:
if err := c.sendGossip(g, *addr, stream); err != nil {
if err := c.sendGossip(g, stream); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func gossipSucceedsSoon(t *testing.T, stopper *stop.Stopper, disconnected chan *
select {
case client := <-disconnected:
// If the client wasn't able to connect, restart it.
client.start(gossip[client], disconnected, rpcContext, stopper, gossip[client].GetNodeID())
client.start(gossip[client], disconnected, rpcContext, stopper, gossip[client].GetNodeID(), rpcContext.NewBreaker())
default:
}

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestClientNodeID(t *testing.T) {
return
case <-disconnected:
// The client hasn't been started or failed to start, loop and try again.
c.start(local, disconnected, rpcContext, stopper, local.GetNodeID())
c.start(local, disconnected, rpcContext, stopper, local.GetNodeID(), rpcContext.NewBreaker())
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip/resolver"
Expand Down Expand Up @@ -159,6 +160,8 @@ type Gossip struct {
clientsMu struct {
syncutil.Mutex
clients []*client
// One breaker per client for the life of the process.
breakers map[string]*circuit.Breaker
}

disconnected chan *client // Channel of disconnected clients
Expand Down Expand Up @@ -209,6 +212,7 @@ func New(rpcContext *rpc.Context, grpcServer *grpc.Server, resolvers []resolver.
bootstrapAddrs: map[util.UnresolvedAddr]struct{}{},
}
registry.AddMetric(g.outgoing.gauge)
g.clientsMu.breakers = map[string]*circuit.Breaker{}
g.SetResolvers(resolvers)

g.mu.Lock()
Expand Down Expand Up @@ -1058,11 +1062,16 @@ func (g *Gossip) checkHasConnectedLocked() {
// The client is added to the outgoing address set and launched in
// a goroutine.
func (g *Gossip) startClient(addr net.Addr, nodeID roachpb.NodeID) {
c := newClient(addr, g.serverMetrics)
g.clientsMu.Lock()
defer g.clientsMu.Unlock()
breaker, ok := g.clientsMu.breakers[addr.String()]
if !ok {
breaker = g.rpcContext.NewBreaker()
g.clientsMu.breakers[addr.String()] = breaker
}
c := newClient(addr, g.serverMetrics)
g.clientsMu.clients = append(g.clientsMu.clients, c)
c.start(g, g.disconnected, g.rpcContext, g.server.stopper, nodeID)
g.clientsMu.Unlock()
c.start(g, g.disconnected, g.rpcContext, g.server.stopper, nodeID, breaker)
}

// removeClient removes the specified client. Called when a client
Expand Down
4 changes: 2 additions & 2 deletions gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
return err
}

if err := c.requestGossip(peer, *peer.GetNodeAddr(), stream); err != nil {
if err := c.requestGossip(peer, stream); err != nil {
return err
}

Expand All @@ -201,7 +201,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
for {
localAddr := local.GetNodeAddr()
c := newClient(localAddr, makeMetrics())
c.start(peer, disconnectedCh, peer.rpcContext, stopper, peer.GetNodeID())
c.start(peer, disconnectedCh, peer.rpcContext, stopper, peer.GetNodeID(), peer.rpcContext.NewBreaker())

disconnectedClient := <-disconnectedCh
if disconnectedClient != c {
Expand Down
6 changes: 3 additions & 3 deletions internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func SendWrappedWith(
return unwrappedReply, nil
}

// SendWrapped is identical to SendWrappedAt with a zero header.
// TODO(tschottdorf): should move this to testutils and merge with other helpers
// which are used, for example, in `storage`.
// SendWrapped is identical to SendWrappedWith with a zero header.
// TODO(tschottdorf): should move this to testutils and merge with
// other helpers which are used, for example, in `storage`.
func SendWrapped(sender Sender, ctx context.Context, args roachpb.Request) (roachpb.Response, *roachpb.Error) {
return SendWrappedWith(sender, ctx, roachpb.Header{}, args)
}
Expand Down
5 changes: 0 additions & 5 deletions kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"github.com/cockroachdb/cockroach/util/envutil"
"github.com/cockroachdb/cockroach/util/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/rubyist/circuitbreaker"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -120,9 +118,6 @@ func grpcTransportFactory(
for _, replica := range replicas {
conn, err := rpcContext.GRPCDial(replica.NodeDesc.Address.String())
if err != nil {
if errors.Cause(err) == circuit.ErrBreakerOpen {
continue
}
return nil, err
}
argsCopy := args
Expand Down
Loading