From 8a746b1c38028c4d8581476d77f6f1800c91ce49 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 4 Mar 2019 11:43:20 -0500 Subject: [PATCH 1/3] README: remove links to gitter We're no longer providing support via this channel and want to direct users to the forum instead. --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index d794d8758634..3b45dd73f5df 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,6 @@ CockroachDB is a cloud-native SQL database for building global, scalable cloud s [![TeamCity CI](https://teamcity.cockroachdb.com/guestAuth/app/rest/builds/buildType:(id:Cockroach_UnitTests)/statusIcon.svg)](https://teamcity.cockroachdb.com/viewLog.html?buildTypeId=Cockroach_UnitTests&buildId=lastFinished&guest=1) [![GoDoc](https://godoc.org/github.com/cockroachdb/cockroach?status.svg)](https://godoc.org/github.com/cockroachdb/cockroach) -[![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/cockroachdb/cockroach?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) - [What is CockroachDB?](#what-is-cockroachdb) - [Docs](#docs) @@ -85,9 +84,6 @@ CockroachDB supports the PostgreSQL wire protocol, so you can use any available [Stack Overflow](https://stackoverflow.com/questions/tagged/cockroachdb) - Ask questions, find answers, and help other users. -- [Join us on Gitter](https://gitter.im/cockroachdb/cockroach) - This is the most immediate - way to connect with CockroachDB engineers. - - For filing bugs, suggesting improvements, or requesting new features, help us out by [opening an issue](https://github.com/cockroachdb/cockroach/issues/new). From 3c72ce9b3b78f9a3235d69855197c5b8a94999a8 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 5 Mar 2019 16:46:26 -0500 Subject: [PATCH 2/3] rpc/nodedialer: avoid tripping breaker on context errors In #34026 we added logic to ensure that the context was not canclled before calling in to GRPCDial. This change extends that to also avoid calling breaker.Fail if the returned error was context.Canceled. The motivation for this is an observation that breakers can trip due to context cancellation which race with calls to Dial (from say DistSQL). This behavior was observed after a node died due to an unrelated corruption bug. It appears that this node failure triggered a context cancellation which then tripped a breaker which then lead to a different flow to fail which then lead to another cancellation which seems to have then tripped another breaker. The evidence for this exact serioes of events is somewhat scant but we do know for certain that we saw breakers tripped due to context cancelled which seems wrong. ``` ip-172-31-44-174> I190305 14:53:47.387863 150672 vendor/github.com/cockroachdb/circuitbreaker/circuitbreaker.go:322 [n6] circuitbreaker: rpc [::]:26257->2 tripped: failed to grpc dial n2 at ip-172-31-34-81:26257: context canceled ``` This change also cosmetically refactors DialInternalClient and Dial to share some copy-pasted code which was becoming burdensome. Lastly this change adds a bunch of unit testing to the nodedialer package. Release note: None --- pkg/rpc/nodedialer/nodedialer.go | 67 +++--- pkg/rpc/nodedialer/nodedialer_test.go | 318 ++++++++++++++++++++++++++ pkg/testutils/lint/lint_test.go | 1 + 3 files changed, 354 insertions(+), 32 deletions(-) create mode 100644 pkg/rpc/nodedialer/nodedialer_test.go diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 6c9092e36078..cb0048e95bd8 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -83,33 +83,13 @@ func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.Clien return nil, ctxErr } breaker := n.getBreaker(nodeID) - - if !breaker.Ready() { - err := errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID) - return nil, err - } - - defer func() { - // Enforce a minimum interval between warnings for failed connections. - if err != nil && breaker.ShouldLog() { - log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err) - } - }() - addr, err := n.resolver(nodeID) if err != nil { err = errors.Wrapf(err, "failed to resolve n%d", nodeID) breaker.Fail(err) return nil, err } - conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx) - if err != nil { - err = errors.Wrapf(err, "failed to grpc dial n%d at %v", nodeID, addr) - breaker.Fail(err) - return nil, err - } - breaker.Success() - return conn, nil + return n.dial(ctx, nodeID, addr, breaker) } // DialInternalClient is a specialization of Dial for callers that @@ -124,10 +104,6 @@ func (n *Dialer) DialInternalClient( if n == nil || n.resolver == nil { return nil, nil, errors.New("no node dialer configured") } - // Don't trip the breaker if we're already canceled. - if ctxErr := ctx.Err(); ctxErr != nil { - return nil, nil, ctxErr - } addr, err := n.resolver(nodeID) if err != nil { return nil, nil, err @@ -141,24 +117,51 @@ func (n *Dialer) DialInternalClient( return localCtx, localClient, nil } - - breaker := n.getBreaker(nodeID) - log.VEventf(ctx, 2, "sending request to %s", addr) + conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID)) + if err != nil { + return nil, nil, err + } + return ctx, roachpb.NewInternalClient(conn), err +} + +// dial performs the dialing of the remove connection. +func (n *Dialer) dial( + ctx context.Context, nodeID roachpb.NodeID, addr net.Addr, breaker *wrappedBreaker, +) (_ *grpc.ClientConn, err error) { + // Don't trip the breaker if we're already canceled. + if ctxErr := ctx.Err(); ctxErr != nil { + return nil, ctxErr + } + if !breaker.Ready() { + err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID) + return nil, err + } + defer func() { + // Enforce a minimum interval between warnings for failed connections. + if err != nil && ctx.Err() == nil && breaker.ShouldLog() { + log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err) + } + }() conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx) if err != nil { + // If we were canceled during the dial, don't trip the breaker. + if ctxErr := ctx.Err(); ctxErr != nil { + return nil, ctxErr + } err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr) breaker.Fail(err) - return nil, nil, err + return nil, err } // Check to see if the connection is in the transient failure state. This can // happen if the connection already existed, but a recent heartbeat has // failed and we haven't yet torn down the connection. if err := grpcutil.ConnectionReady(conn); err != nil { - err = errors.Wrapf(err, "failed to check for connection ready to n%d at %v", nodeID, addr) + err = errors.Wrapf(err, "failed to check for ready connection to n%d at %v", nodeID, addr) breaker.Fail(err) - return nil, nil, err + return nil, err } + // TODO(bdarnell): Reconcile the different health checks and circuit breaker // behavior in this file. Note that this different behavior causes problems // for higher-levels in the system. For example, DistSQL checks for @@ -166,7 +169,7 @@ func (n *Dialer) DialInternalClient( // RPCs fail when dial fails due to an open breaker. Reset the breaker here // as a stop-gap before the reconciliation occurs. breaker.Success() - return ctx, roachpb.NewInternalClient(conn), nil + return conn, nil } // ConnHealth returns nil if we have an open connection to the given node diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go new file mode 100644 index 000000000000..a8a17a6579c6 --- /dev/null +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -0,0 +1,318 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package nodedialer + +import ( + "context" + "fmt" + "math/rand" + "net" + "sync" + "testing" + "time" + + circuit "github.com/cockroachdb/circuitbreaker" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestNodedialerPositive(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, rpcCtx, ln, _ := setUpNodedialerTest(t) + defer stopper.Stop(context.TODO()) + nd := New(rpcCtx, newSingleNodeResolver(1, ln.Addr())) + // Ensure that dialing works. + breaker := nd.GetCircuitBreaker(1) + assert.True(t, breaker.Ready()) + ctx := context.Background() + _, err := nd.Dial(ctx, 1) + assert.Nil(t, err, "failed to dial") + assert.True(t, breaker.Ready()) + assert.Equal(t, breaker.Failures(), int64(0)) +} + +func TestConcurrentCancellationAndTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, rpcCtx, ln, _ := setUpNodedialerTest(t) + defer stopper.Stop(context.TODO()) + nd := New(rpcCtx, newSingleNodeResolver(1, ln.Addr())) + ctx := context.Background() + breaker := nd.GetCircuitBreaker(1) + // Test that when a context is canceled during dialing we always return that + // error but we never trip the breaker. + const N = 1000 + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(2) + // Jiggle when we cancel relative to when we dial to try to hit cases where + // cancellation happens during the call to GRPCDial. + iCtx, cancel := context.WithTimeout(ctx, randDuration(time.Millisecond)) + go func() { + time.Sleep(randDuration(time.Millisecond)) + cancel() + wg.Done() + }() + go func() { + time.Sleep(randDuration(time.Millisecond)) + _, err := nd.Dial(iCtx, 1) + if err != nil && + err != context.Canceled && + err != context.DeadlineExceeded { + t.Errorf("got an unexpected error from Dial: %v", err) + } + wg.Done() + }() + } + wg.Wait() + assert.Equal(t, breaker.Failures(), int64(0)) +} + +func TestResolverErrorsTrip(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, rpcCtx, _, _ := setUpNodedialerTest(t) + defer stopper.Stop(context.TODO()) + boom := fmt.Errorf("boom") + nd := New(rpcCtx, func(id roachpb.NodeID) (net.Addr, error) { + return nil, boom + }) + _, err := nd.Dial(context.Background(), 1) + assert.Equal(t, errors.Cause(err), boom) + breaker := nd.GetCircuitBreaker(1) + assert.False(t, breaker.Ready()) +} + +func TestDisconnectsTrip(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, rpcCtx, ln, hb := setUpNodedialerTest(t) + defer stopper.Stop(context.TODO()) + nd := New(rpcCtx, newSingleNodeResolver(1, ln.Addr())) + ctx := context.Background() + breaker := nd.GetCircuitBreaker(1) + + // Now close the underlying connection from the server side and set the + // heartbeat service to return errors. This will eventually lead to the client + // connection being removed and Dial attempts to return an error. + // While this is going on there will be many clients attempting to + // connect. These connecting clients will send interesting errors they observe + // on the errChan. Once an error from Dial is observed the test re-enables the + // heartbeat service. The test will confirm that the only errors they record + // in to the breaker are interesting ones as determined by shouldTrip. + hb.setErr(fmt.Errorf("boom")) + underlyingNetConn := ln.popConn() + assert.Nil(t, underlyingNetConn.Close()) + const N = 1000 + breakerEventChan := make(chan circuit.ListenerEvent, N) + breaker.AddListener(breakerEventChan) + errChan := make(chan error, N) + shouldTrip := func(err error) bool { + return err != nil && + err != context.DeadlineExceeded && + err != context.Canceled && + errors.Cause(err) != circuit.ErrBreakerOpen + } + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(2) + iCtx, cancel := context.WithTimeout(ctx, randDuration(time.Millisecond)) + go func() { + time.Sleep(randDuration(time.Millisecond)) + cancel() + wg.Done() + }() + go func() { + time.Sleep(randDuration(time.Millisecond)) + _, err := nd.Dial(iCtx, 1) + if shouldTrip(err) { + errChan <- err + } + wg.Done() + }() + } + go func() { wg.Wait(); close(errChan) }() + var errorsSeen int + for range errChan { + if errorsSeen == 0 { + hb.setErr(nil) + } + errorsSeen++ + } + breaker.RemoveListener(breakerEventChan) + close(breakerEventChan) + var failsSeen int + for ev := range breakerEventChan { + if ev.Event == circuit.BreakerFail { + failsSeen++ + } + } + // Ensure that all of the interesting errors were seen by the breaker. + assert.Equal(t, errorsSeen, failsSeen) + + // Ensure that the connection becomes healthy soon now that the heartbeat + // service is not returning errors. + hb.setErr(nil) // reset in case there were no errors + testutils.SucceedsSoon(t, func() error { + return rpcCtx.ConnHealth(ln.Addr().String()) + }) +} + +func setUpNodedialerTest( + t *testing.T, +) (stopper *stop.Stopper, rpcCtx *rpc.Context, ln *interceptingListener, hb *heartbeatService) { + stopper = stop.NewStopper() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + // Create an rpc Context and then + rpcCtx = newTestContext(clock, stopper) + _, ln, hb = newTestServer(t, clock, stopper) + testutils.SucceedsSoon(t, func() error { + return rpcCtx.ConnHealth(ln.Addr().String()) + }) + return stopper, rpcCtx, ln, hb +} + +// randDuration returns a uniform random duration between 0 and max. +func randDuration(max time.Duration) time.Duration { + return time.Duration(rand.Intn(int(max))) +} + +func newTestServer( + t testing.TB, clock *hlc.Clock, stopper *stop.Stopper, +) (*grpc.Server, *interceptingListener, *heartbeatService) { + ctx := context.Background() + localAddr := "127.0.0.1:0" + ln, err := net.Listen("tcp", localAddr) + if err != nil { + t.Fatalf("failed to listed on %v: %v", localAddr, err) + } + il := &interceptingListener{Listener: ln} + s := grpc.NewServer() + serverVersion := cluster.MakeTestingClusterSettings().Version.ServerVersion + hb := &heartbeatService{ + clock: clock, + serverVersion: serverVersion, + } + rpc.RegisterHeartbeatServer(s, hb) + if err := stopper.RunAsyncTask(ctx, "localServer", func(ctx context.Context) { + if err := s.Serve(il); err != nil { + log.Infof(ctx, "server stopped: %v", err) + } + }); err != nil { + t.Fatalf("failed to run test server: %v", err) + } + go func() { <-stopper.ShouldQuiesce(); s.Stop() }() + return s, il, hb +} + +func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context { + cfg := testutils.NewNodeTestBaseContext() + cfg.Insecure = true + return rpc.NewContext( + log.AmbientContext{Tracer: tracing.NewTracer()}, + cfg, + clock, + stopper, + &cluster.MakeTestingClusterSettings().Version, + ) +} + +// interceptingListener wraps a net.Listener and provides access to the +// underlying net.Conn objects which that listener Accepts. +type interceptingListener struct { + net.Listener + mu struct { + syncutil.Mutex + conns []net.Conn + } +} + +// newSingleNodeResolver returns a Resolver that resolve a single node id +func newSingleNodeResolver(id roachpb.NodeID, addr net.Addr) AddressResolver { + return func(toResolve roachpb.NodeID) (net.Addr, error) { + if id == toResolve { + return addr, nil + } + return nil, fmt.Errorf("unknown node id %d", toResolve) + } +} + +func (il *interceptingListener) Accept() (c net.Conn, err error) { + defer func() { + if err == nil { + il.mu.Lock() + il.mu.conns = append(il.mu.conns, c) + il.mu.Unlock() + } + }() + return il.Listener.Accept() +} + +func (il *interceptingListener) popConn() net.Conn { + il.mu.Lock() + defer il.mu.Unlock() + if len(il.mu.conns) == 0 { + return nil + } + c := il.mu.conns[0] + il.mu.conns = il.mu.conns[1:] + return c +} + +type errContainer struct { + syncutil.RWMutex + err error +} + +func (ec *errContainer) getErr() error { + ec.RLock() + defer ec.RUnlock() + return ec.err +} + +func (ec *errContainer) setErr(err error) { + ec.Lock() + defer ec.Unlock() + ec.err = err +} + +// heartbeatService is a dummy rpc.HeartbeatService which provides a mechanism +// to inject errors. +type heartbeatService struct { + errContainer + clock *hlc.Clock + serverVersion roachpb.Version +} + +func (hb *heartbeatService) Ping( + ctx context.Context, args *rpc.PingRequest, +) (*rpc.PingResponse, error) { + if err := hb.getErr(); err != nil { + return nil, err + } + return &rpc.PingResponse{ + Pong: args.Ping, + ServerTime: hb.clock.PhysicalNow(), + ServerVersion: hb.serverVersion, + }, nil +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index e4c8e73399a0..065a27469b89 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -480,6 +480,7 @@ func TestLint(t *testing.T) { "*.go", ":!rpc/context_test.go", ":!rpc/context.go", + ":!rpc/nodedialer/nodedialer_test.go", ":!util/grpcutil/grpc_util_test.go", ":!cli/systembench/network_test_server.go", ) From 2d0c3bf53cbd899f6e88e771433f16d932d7acf0 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 7 Mar 2019 16:58:20 -0500 Subject: [PATCH 3/3] rpc,base: move HeartbeatInterval from const into Config This enables dramatically shortening the runtime of the nodedialer tests from over 3s to less than 50ms. Release note: None --- pkg/base/config.go | 10 ++++++++++ pkg/rpc/context.go | 7 +++---- pkg/rpc/nodedialer/nodedialer_test.go | 1 + 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index fedf799bdf6c..b937d2b867bc 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -61,6 +61,10 @@ const ( // leader lease active duration should be of the raft election timeout. defaultRangeLeaseRaftElectionTimeoutMultiplier = 3 + // defaultHeartbeatInterval is the default value of HeartbeatInterval used + // by the rpc context. + defaultHeartbeatInterval = 3 * time.Second + // rangeLeaseRenewalFraction specifies what fraction the range lease // renewal duration should be of the range lease active time. For example, // with a value of 0.2 and a lease duration of 10 seconds, leases would be @@ -178,6 +182,11 @@ type Config struct { // it is set to the arbitrary length of six times the Metrics sample interval. // See the comment in server.Config for more details. HistogramWindowInterval time.Duration + + // HeartbeatInterval controls how often a Ping request is sent on peer + // connections to determine connection health and update the local view + // of remote clocks. + HeartbeatInterval time.Duration } func wrapError(err error) error { @@ -200,6 +209,7 @@ func (cfg *Config) InitDefaults() { cfg.HTTPAddr = defaultHTTPAddr cfg.SSLCertsDir = DefaultCertsDirectory cfg.certificateManager = lazyCertificateManager{} + cfg.HeartbeatInterval = defaultHeartbeatInterval } // HTTPRequestScheme returns "http" or "https" based on the value of Insecure. diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 82d20f866f33..4e06bb8d7efe 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -59,7 +59,6 @@ func init() { } const ( - defaultHeartbeatInterval = 3 * time.Second // The coefficient by which the maximum offset is multiplied to determine the // maximum acceptable measurement latency. maximumPingDurationMult = 2 @@ -404,10 +403,10 @@ func NewContext( var cancel context.CancelFunc ctx.masterCtx, cancel = context.WithCancel(ambient.AnnotateCtx(context.Background())) ctx.Stopper = stopper + ctx.heartbeatInterval = baseCtx.HeartbeatInterval ctx.RemoteClocks = newRemoteClockMonitor( - ctx.LocalClock, 10*defaultHeartbeatInterval, baseCtx.HistogramWindowInterval) - ctx.heartbeatInterval = defaultHeartbeatInterval - ctx.heartbeatTimeout = 2 * defaultHeartbeatInterval + ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval) + ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval stopper.RunWorker(ctx.masterCtx, func(context.Context) { <-stopper.ShouldQuiesce() diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index a8a17a6579c6..c6df9dbff7ca 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -228,6 +228,7 @@ func newTestServer( func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context { cfg := testutils.NewNodeTestBaseContext() cfg.Insecure = true + cfg.HeartbeatInterval = 10 * time.Millisecond return rpc.NewContext( log.AmbientContext{Tracer: tracing.NewTracer()}, cfg,