From 1e995af262113a5d543c28bc1ed8cda51450b109 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 8 Jan 2019 10:34:27 -0500 Subject: [PATCH] rpc: adopt logger in circuit breaker Adopts changes to the circuitbreaker package to enable logging. Release note: None --- Gopkg.lock | 19 ++++++++++++------- Gopkg.toml | 9 ++++----- pkg/gossip/client.go | 2 +- pkg/gossip/client_test.go | 4 ++-- pkg/gossip/gossip.go | 5 +++-- pkg/gossip/gossip_test.go | 2 +- pkg/rpc/breaker.go | 27 ++++++++++++++++++++++++--- pkg/rpc/context.go | 9 +++++---- pkg/storage/client_test.go | 2 +- pkg/storage/raft_transport.go | 6 +++--- vendor | 2 +- 11 files changed, 57 insertions(+), 30 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 55109aad2219..dba310c0babd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -186,6 +186,12 @@ revision = "61153c768f31ee5f130071d08fc82b85208528de" version = "v1.1.0" +[[projects]] + name = "github.com/cenkalti/backoff" + packages = ["."] + revision = "1e4cf3da559842a91afcb6ea6141451e6c30c618" + version = "v2.1.1" + [[projects]] name = "github.com/certifi/gocertifi" packages = ["."] @@ -207,6 +213,11 @@ revision = "b1ce49cb2a474f4416531e7395373eaafaa4fbe2" version = "v1.0.0" +[[projects]] + name = "github.com/cockroachdb/circuitbreaker" + packages = ["."] + revision = "a614b14ccf63dd2311d4ff646c30c61b8ed34aa" + [[projects]] branch = "master" name = "github.com/cockroachdb/cmux" @@ -937,12 +948,6 @@ ] revision = "1f30fe9094a513ce4c700b9a54458bbb0c96996c" -[[projects]] - branch = "master" - name = "github.com/rubyist/circuitbreaker" - packages = ["."] - revision = "2074adba5ddc7d5f7559448a9c3066573521c5bf" - [[projects]] name = "github.com/russross/blackfriday" packages = ["."] @@ -1235,6 +1240,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "3f17f79398b9d0ebd6aaaaf0f901bc5b0f8818abe58f95211356691e746ff3ff" + inputs-digest = "8cf1fa12e173a7194b9af15d74137b8163c2d7ba9a19b424321bb684467574e2" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 26aacfa03163..f5ab206eddf3 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -61,11 +61,6 @@ ignored = [ name = "github.com/montanaflynn/stats" branch = "master" -# https://github.com/rubyist/circuitbreaker/commit/af95830 -[[constraint]] - name = "github.com/rubyist/circuitbreaker" - branch = "master" - # github.com/docker/docker depends on a few functions not included in the # latest release: reference.{FamiliarName,ParseNormalizedNamed,TagNameOnly}. # @@ -93,3 +88,7 @@ ignored = [ [[prune.project]] name = "github.com/knz/go-libedit" unused-packages = false + +[[constraint]] + name = "github.com/cockroachdb/circuitbreaker" + revision = "a614b14ccf63dd2311d4ff646c30c61b8ed34aa" diff --git a/pkg/gossip/client.go b/pkg/gossip/client.go index 0c97cae0edd2..84f72107dc4a 100644 --- a/pkg/gossip/client.go +++ b/pkg/gossip/client.go @@ -21,8 +21,8 @@ import ( "sync" "time" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/pkg/errors" - circuit "github.com/rubyist/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" diff --git a/pkg/gossip/client_test.go b/pkg/gossip/client_test.go index 693d68ded6eb..c59d8573196b 100644 --- a/pkg/gossip/client_test.go +++ b/pkg/gossip/client_test.go @@ -168,7 +168,7 @@ func gossipSucceedsSoon( // If the client wasn't able to connect, restart it. g := gossip[client] g.mu.Lock() - client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker()) + client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker("")) g.mu.Unlock() default: } @@ -306,7 +306,7 @@ func TestClientNodeID(t *testing.T) { case <-disconnected: // The client hasn't been started or failed to start, loop and try again. local.mu.Lock() - c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker()) + c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker("")) local.mu.Unlock() } } diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index 2ae885ebec2a..26b6fbf8823e 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -63,9 +63,9 @@ import ( "google.golang.org/grpc" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" - circuit "github.com/rubyist/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" @@ -1345,7 +1345,8 @@ func (g *Gossip) startClientLocked(addr net.Addr) { defer g.clientsMu.Unlock() breaker, ok := g.clientsMu.breakers[addr.String()] if !ok { - breaker = g.rpcContext.NewBreaker() + name := fmt.Sprintf("gossip %v->%v", g.rpcContext.Addr, addr) + breaker = g.rpcContext.NewBreaker(name) g.clientsMu.breakers[addr.String()] = breaker } ctx := g.AnnotateCtx(context.TODO()) diff --git a/pkg/gossip/gossip_test.go b/pkg/gossip/gossip_test.go index 8d1e0c13af9b..c9985eb92301 100644 --- a/pkg/gossip/gossip_test.go +++ b/pkg/gossip/gossip_test.go @@ -383,7 +383,7 @@ func TestGossipNoForwardSelf(t *testing.T) { localAddr := local.GetNodeAddr() c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, localAddr, makeMetrics()) peer.mu.Lock() - c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker()) + c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker("")) peer.mu.Unlock() disconnectedClient := <-disconnectedCh diff --git a/pkg/rpc/breaker.go b/pkg/rpc/breaker.go index db9ca015a6f6..09dffb04c991 100644 --- a/pkg/rpc/breaker.go +++ b/pkg/rpc/breaker.go @@ -15,13 +15,15 @@ package rpc import ( + "context" "time" - "github.com/cenk/backoff" + "github.com/cenkalti/backoff" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/facebookgo/clock" - "github.com/rubyist/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" ) const maxBackoff = time.Second @@ -90,10 +92,29 @@ func newBackOff(clock backoff.Clock) backoff.BackOff { return b } -func newBreaker(clock clock.Clock) *circuit.Breaker { +func newBreaker(ctx context.Context, name string, clock clock.Clock) *circuit.Breaker { return circuit.NewBreakerWithOptions(&circuit.Options{ + Name: name, BackOff: newBackOff(clock), Clock: clock, ShouldTrip: circuit.ThresholdTripFunc(1), + Logger: breakerLogger{ctx}, }) } + +// breakerLogger implements circuit.Logger to expose logging from the +// circuitbreaker package. Debugf is logged with a vmodule level of 2 so to see +// the circuitbreaker debug messages set --vmodule=breaker=2 +type breakerLogger struct { + ctx context.Context +} + +func (r breakerLogger) Debugf(format string, v ...interface{}) { + if log.V(2) { + log.InfofDepth(r.ctx, 1, format, v...) + } +} + +func (r breakerLogger) Infof(format string, v ...interface{}) { + log.InfofDepth(r.ctx, 1, format, v...) +} diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index ef2f623fc5a1..5b908bf6b27f 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -23,10 +23,10 @@ import ( "sync/atomic" "time" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/rubyist/circuitbreaker" "golang.org/x/sync/syncmap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -578,12 +578,13 @@ func (ctx *Context) GRPCDial(target string) *Connection { } // NewBreaker creates a new circuit breaker properly configured for RPC -// connections. -func (ctx *Context) NewBreaker() *circuit.Breaker { +// connections. name is used internally for logging state changes of the +// returned breaker. +func (ctx *Context) NewBreaker(name string) *circuit.Breaker { if ctx.BreakerFactory != nil { return ctx.BreakerFactory() } - return newBreaker(&ctx.breakerClock) + return newBreaker(ctx.masterCtx, name, &ctx.breakerClock) } // ErrNotConnected is returned by ConnHealth when there is no connection to the diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 7184af2d8c74..0ddd548083ca 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -34,10 +34,10 @@ import ( "time" "github.com/cenk/backoff" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/coreos/etcd/raft" "github.com/kr/pretty" "github.com/pkg/errors" - circuit "github.com/rubyist/circuitbreaker" "google.golang.org/grpc" "github.com/cockroachdb/cockroach/pkg/base" diff --git a/pkg/storage/raft_transport.go b/pkg/storage/raft_transport.go index c01ca8a4e41c..593f731d93f7 100644 --- a/pkg/storage/raft_transport.go +++ b/pkg/storage/raft_transport.go @@ -25,9 +25,9 @@ import ( "time" "unsafe" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/coreos/etcd/raft/raftpb" "github.com/pkg/errors" - "github.com/rubyist/circuitbreaker" "google.golang.org/grpc" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -415,7 +415,7 @@ func (t *RaftTransport) Stop(storeID roachpb.StoreID) { func (t *RaftTransport) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker { value, ok := t.breakers.Load(int64(nodeID)) if !ok { - breaker := t.rpcContext.NewBreaker() + breaker := t.rpcContext.NewBreaker(fmt.Sprintf("RaftTransport[n%d]", nodeID)) value, _ = t.breakers.LoadOrStore(int64(nodeID), unsafe.Pointer(breaker)) } return (*circuit.Breaker)(value) @@ -464,7 +464,7 @@ func (t *RaftTransport) connectAndProcess( if consecFailures == 0 { log.Warningf(ctx, "raft transport stream to node %d failed: %s", nodeID, err) } - breaker.Fail() + breaker.Fail(err) } } diff --git a/vendor b/vendor index cd01c2fb8fb3..1bc089eac7ba 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit cd01c2fb8fb3ed45fb71b0332238b182692164d2 +Subproject commit 1bc089eac7ba19bb141a65350258799fa9687e38