Skip to content

Commit

Permalink
Merge pull request cockroachdb#35361 from ajwerner/backport2.0-33676
Browse files Browse the repository at this point in the history
release-2.0: rpc: adopt logging in circuitbreaker
  • Loading branch information
ajwerner authored Mar 8, 2019
2 parents c0f63e4 + 1e995af commit 43cc479
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 30 deletions.
19 changes: 12 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ ignored = [
name = "github.com/montanaflynn/stats"
branch = "master"

# https://github.com/rubyist/circuitbreaker/commit/af95830
[[constraint]]
name = "github.com/rubyist/circuitbreaker"
branch = "master"

# github.com/docker/docker depends on a few functions not included in the
# latest release: reference.{FamiliarName,ParseNormalizedNamed,TagNameOnly}.
#
Expand Down Expand Up @@ -93,3 +88,7 @@ ignored = [
[[prune.project]]
name = "github.com/knz/go-libedit"
unused-packages = false

[[constraint]]
name = "github.com/cockroachdb/circuitbreaker"
revision = "a614b14ccf63dd2311d4ff646c30c61b8ed34aa"
2 changes: 1 addition & 1 deletion pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"sync"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down
4 changes: 2 additions & 2 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func gossipSucceedsSoon(
// If the client wasn't able to connect, restart it.
g := gossip[client]
g.mu.Lock()
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker())
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
g.mu.Unlock()
default:
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestClientNodeID(t *testing.T) {
case <-disconnected:
// The client hasn't been started or failed to start, loop and try again.
local.mu.Lock()
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker())
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
local.mu.Unlock()
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ import (

"google.golang.org/grpc"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
Expand Down Expand Up @@ -1345,7 +1345,8 @@ func (g *Gossip) startClientLocked(addr net.Addr) {
defer g.clientsMu.Unlock()
breaker, ok := g.clientsMu.breakers[addr.String()]
if !ok {
breaker = g.rpcContext.NewBreaker()
name := fmt.Sprintf("gossip %v->%v", g.rpcContext.Addr, addr)
breaker = g.rpcContext.NewBreaker(name)
g.clientsMu.breakers[addr.String()] = breaker
}
ctx := g.AnnotateCtx(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
localAddr := local.GetNodeAddr()
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, localAddr, makeMetrics())
peer.mu.Lock()
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker())
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker(""))
peer.mu.Unlock()

disconnectedClient := <-disconnectedCh
Expand Down
27 changes: 24 additions & 3 deletions pkg/rpc/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package rpc

import (
"context"
"time"

"github.com/cenk/backoff"
"github.com/cenkalti/backoff"
circuit "github.com/cockroachdb/circuitbreaker"
"github.com/facebookgo/clock"
"github.com/rubyist/circuitbreaker"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

const maxBackoff = time.Second
Expand Down Expand Up @@ -90,10 +92,29 @@ func newBackOff(clock backoff.Clock) backoff.BackOff {
return b
}

func newBreaker(clock clock.Clock) *circuit.Breaker {
func newBreaker(ctx context.Context, name string, clock clock.Clock) *circuit.Breaker {
return circuit.NewBreakerWithOptions(&circuit.Options{
Name: name,
BackOff: newBackOff(clock),
Clock: clock,
ShouldTrip: circuit.ThresholdTripFunc(1),
Logger: breakerLogger{ctx},
})
}

// breakerLogger implements circuit.Logger to expose logging from the
// circuitbreaker package. Debugf is logged with a vmodule level of 2 so to see
// the circuitbreaker debug messages set --vmodule=breaker=2
type breakerLogger struct {
ctx context.Context
}

func (r breakerLogger) Debugf(format string, v ...interface{}) {
if log.V(2) {
log.InfofDepth(r.ctx, 1, format, v...)
}
}

func (r breakerLogger) Infof(format string, v ...interface{}) {
log.InfofDepth(r.ctx, 1, format, v...)
}
9 changes: 5 additions & 4 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"sync/atomic"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/rubyist/circuitbreaker"
"golang.org/x/sync/syncmap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -578,12 +578,13 @@ func (ctx *Context) GRPCDial(target string) *Connection {
}

// NewBreaker creates a new circuit breaker properly configured for RPC
// connections.
func (ctx *Context) NewBreaker() *circuit.Breaker {
// connections. name is used internally for logging state changes of the
// returned breaker.
func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
if ctx.BreakerFactory != nil {
return ctx.BreakerFactory()
}
return newBreaker(&ctx.breakerClock)
return newBreaker(ctx.masterCtx, name, &ctx.breakerClock)
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"time"

"github.com/cenk/backoff"
circuit "github.com/cockroachdb/circuitbreaker"
"github.com/coreos/etcd/raft"
"github.com/kr/pretty"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"
"google.golang.org/grpc"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"time"
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/coreos/etcd/raft/raftpb"
"github.com/pkg/errors"
"github.com/rubyist/circuitbreaker"
"google.golang.org/grpc"

"github.com/cockroachdb/cockroach/pkg/gossip"
Expand Down Expand Up @@ -415,7 +415,7 @@ func (t *RaftTransport) Stop(storeID roachpb.StoreID) {
func (t *RaftTransport) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker {
value, ok := t.breakers.Load(int64(nodeID))
if !ok {
breaker := t.rpcContext.NewBreaker()
breaker := t.rpcContext.NewBreaker(fmt.Sprintf("RaftTransport[n%d]", nodeID))
value, _ = t.breakers.LoadOrStore(int64(nodeID), unsafe.Pointer(breaker))
}
return (*circuit.Breaker)(value)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (t *RaftTransport) connectAndProcess(
if consecFailures == 0 {
log.Warningf(ctx, "raft transport stream to node %d failed: %s", nodeID, err)
}
breaker.Fail()
breaker.Fail(err)
}
}

Expand Down

0 comments on commit 43cc479

Please sign in to comment.