Skip to content

Commit

Permalink
kvprober: special case node-is-decommissioned errors
Browse files Browse the repository at this point in the history
kvprober runs on decommissioned node. In CC, this is generally fine, since
automation fully takes down nodes once they reach the decommissioned state. But
there is a brief period where a node is running and in the decommissioned
state, and we see kvprober errors in metrics during this period, as in below.
This sometimes leads to false positive kvprober pages in CC production.

‹rpc error: code = PermissionDenied desc = n1 was permanently removed from...

To be clear, the errors are not wrong per say. They just are expected to
happen, once a node is decommissioned.

This commit adds special handling for errors of the kind above, by doing a
substring match on the error string. To be exact, kvprober now logs such errors
at warning level and does not increment any error counters. This way, an
operation like decommissioning a node does not cause false positive kvprober
pages in CC production.

Fixes #104367

Release note: None, since kvprober is not used by customers. (It is not
documented.)

Co-authored-by: Josh Carp <[email protected]>
  • Loading branch information
joshimhoff and Josh Carp committed Jul 11, 2023
1 parent a7c1494 commit 2611c74
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 30 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvprober/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -64,5 +65,6 @@ go_test(
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
],
)
64 changes: 52 additions & 12 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -209,6 +210,27 @@ func (p *ProberOps) Write(key roachpb.Key) func(context.Context, *kv.Txn) error
}
}

// errorIsExpectedDuringNormalOperation filters out errors that may be returned
// during normal operation of CRDB.
//
// One such example is the `was permanently removed from the cluster at` error
// that is returned to the kvclient of decommissioned nodes. This error does not
// affect user traffic, since such traffic is drained off the node by the time it
// becomes decommissioned.
//
// Since such errors do not indicate a problem with CRDB, kvprober does not report
// them as an error in its metrics.
func errorIsExpectedDuringNormalOperation(err error) bool {
// Note that errors *other* than decommissioned status errors, such as
// `use of closed network connection`, happen *occasionally* on the kvclient
// of a decommissioned node. The full set of other errors is not known exactly,
// and the errors mostly lack structure. Since they happen rarely, and since
// the intended use of kvprober is to page on a sustained error rate, not a
// single error, we choose to only filter out errors via the
// kvpb.IsDecommissionedStatusErr function.
return kvpb.IsDecommissionedStatusErr(err)
}

// validateKey returns an error if the key is not valid for use by the kvprober.
// This is a sanity check to ensure that the kvprober does not corrupt user data
// in the global keyspace or other system data in the local keyspace.
Expand Down Expand Up @@ -351,8 +373,12 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
}
return
}

Expand Down Expand Up @@ -382,9 +408,13 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
return txns.TxnRootKV(ctx, f)
})
if err != nil {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "kv.Get(%s), r=%v failed with expected error: %v", step.Key, step.RangeID, err)
} else {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
}
return
}

Expand Down Expand Up @@ -414,8 +444,12 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
}
return
}

Expand All @@ -434,11 +468,17 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
return txns.TxnRootKV(ctx, f)
})
if err != nil {
added := p.quarantineWritePool.maybeAdd(ctx, step)
log.Health.Errorf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added,
)
p.metrics.WriteProbeFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with expected error: %v", step.Key, step.RangeID, err,
)
} else {
added := p.quarantineWritePool.maybeAdd(ctx, step)
log.Health.Errorf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added,
)
p.metrics.WriteProbeFailures.Inc(1)
}
return
}
// This will no-op if not in the quarantine pool.
Expand Down
30 changes: 12 additions & 18 deletions pkg/kv/kvprober/kvprober_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -46,7 +45,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) {
ctx := context.Background()

t.Run("disabled by default", func(t *testing.T) {
s, _, p, cleanup := initTestProber(t, base.TestingKnobs{})
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{})
defer cleanup()

kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond)
Expand All @@ -61,7 +60,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) {
})

t.Run("happy path", func(t *testing.T) {
s, _, p, cleanup := initTestProber(t, base.TestingKnobs{})
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{})
defer cleanup()

kvprober.ReadEnabled.Override(ctx, &s.ClusterSettings().SV, true)
Expand All @@ -87,7 +86,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) {
})

t.Run("a single range is unavailable for all KV ops", func(t *testing.T) {
s, _, p, cleanup := initTestProber(t, base.TestingKnobs{
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range ba.Requests {
Expand Down Expand Up @@ -129,7 +128,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) {
var dbIsAvailable syncutil.AtomicBool
dbIsAvailable.Set(true)

s, _, p, cleanup := initTestProber(t, base.TestingKnobs{
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if !dbIsAvailable.Get() {
Expand Down Expand Up @@ -174,7 +173,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) {
var dbIsAvailable syncutil.AtomicBool
dbIsAvailable.Set(true)

s, _, p, cleanup := initTestProber(t, base.TestingKnobs{
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if !dbIsAvailable.Get() {
Expand Down Expand Up @@ -225,7 +224,7 @@ func TestWriteProbeDoesNotLeaveLiveData(t *testing.T) {

ctx := context.Background()

s, _, p, cleanup := initTestProber(t, base.TestingKnobs{})
s, _, p, cleanup := initTestServer(t, base.TestingKnobs{})
defer cleanup()

kvprober.WriteEnabled.Override(ctx, &s.ClusterSettings().SV, true)
Expand Down Expand Up @@ -259,7 +258,7 @@ func TestPlannerMakesPlansCoveringAllRanges(t *testing.T) {

ctx := context.Background()
// Disable split and merge queue just in case.
_, sqlDB, p, cleanup := initTestProber(t, base.TestingKnobs{
_, sqlDB, p, cleanup := initTestServer(t, base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{DisableSplitQueue: true, DisableMergeQueue: true},
})
defer cleanup()
Expand Down Expand Up @@ -307,7 +306,7 @@ func TestProberOpsValidatesProbeKey(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, _, cleanup := initTestProber(t, base.TestingKnobs{})
s, _, _, cleanup := initTestServer(t, base.TestingKnobs{})
defer cleanup()

var ops kvprober.ProberOps
Expand Down Expand Up @@ -350,23 +349,18 @@ func TestProberOpsValidatesProbeKey(t *testing.T) {
}
}

func initTestProber(
func initTestServer(
t *testing.T, knobs base.TestingKnobs,
) (serverutils.TestServerInterface, *gosql.DB, *kvprober.Prober, func()) {

s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: cluster.MakeClusterSettings(),
Knobs: knobs,
})
p := kvprober.NewProber(kvprober.Opts{
Tracer: s.TracerI().(*tracing.Tracer),
DB: kvDB,
HistogramWindowInterval: time.Minute, // actual value not important to test
Settings: s.ClusterSettings(),
})

// Given small test cluster, this better exercises the planning logic.
kvprober.NumStepsToPlanAtOnce.Override(context.Background(), &s.ClusterSettings().SV, 10)

p := s.KvProber()
// Want these tests to run as fast as possible; see planner_test.go for a
// unit test of the rate limiting.
p.SetPlanningRateLimits(0)
Expand Down
75 changes: 75 additions & 0 deletions pkg/kv/kvprober/kvprober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
)

func TestReadProbe(t *testing.T) {
Expand Down Expand Up @@ -78,6 +80,31 @@ func TestReadProbe(t *testing.T) {
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})

// Once a node is fully decommissioned, neither kvclient nor kvprober work from
// the node. This does not indicate a service health issue; it is expected behavior.
//
// This is not tested with an integration test, since the kvclient of a decommissioned
// node will occasionally return other errors. We choose not to filter those out for
// reasons given at errorIsExpectedDuringNormalOperation. As a result, an integration test
// would be flaky. We believe a unit test is sufficient, largely because the main risk
// in only having a unit test is false positive pages on SRE, due to changes in what errors
// are returned from the kvclient of a decommissioned node. Though false positive pages add
// ops load, they do not directly affect the customer experience.
t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
read: true,
planErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"),
}
p := initTestProber(ctx, m)
p.readProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Zero(t, p.Metrics().ReadProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})

t.Run("txn fails", func(t *testing.T) {
m := &mock{
t: t,
Expand Down Expand Up @@ -107,6 +134,22 @@ func TestReadProbe(t *testing.T) {
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Equal(t, int64(1), p.Metrics().ReadProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("read fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
read: true,
readErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"),
}
p := initTestProber(ctx, m)
p.readProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Equal(t, int64(1), p.Metrics().ReadProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})
}

func TestWriteProbe(t *testing.T) {
Expand Down Expand Up @@ -163,6 +206,22 @@ func TestWriteProbe(t *testing.T) {
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
write: true,
planErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"),
}
p := initTestProber(ctx, m)
p.writeProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Zero(t, p.Metrics().WriteProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})

t.Run("open txn fails", func(t *testing.T) {
m := &mock{
t: t,
Expand Down Expand Up @@ -192,6 +251,22 @@ func TestWriteProbe(t *testing.T) {
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Equal(t, int64(1), p.Metrics().WriteProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("write fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
write: true,
writeErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"),
}
p := initTestProber(ctx, m)
p.writeProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Equal(t, int64(1), p.Metrics().WriteProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})
}

func initTestProber(ctx context.Context, m *mock) *Prober {
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
Expand Down Expand Up @@ -1830,6 +1831,11 @@ func (ts *TestServer) BinaryVersionOverride() roachpb.Version {
return knobs.(*TestingKnobs).BinaryVersionOverride
}

// KvProber is part of the TestServerInterface.
func (ts *TestServer) KvProber() *kvprober.Prober {
return ts.Server.kvProber
}

type testServerFactoryImpl struct{}

// TestServerFactory can be passed to serverutils.InitTestServerFactory
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/serverutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/config",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
Expand Down
5 changes: 5 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -317,6 +318,10 @@ type TestServerInterface interface {
// BinaryVersionOverride returns the value of an override if set using
// TestingKnobs.
BinaryVersionOverride() roachpb.Version

// KvProber returns a *kvprober.Prober, which is useful when asserting the
//correctness of the prober from integration tests.
KvProber() *kvprober.Prober
}

// TestServerFactory encompasses the actual implementation of the shim
Expand Down

0 comments on commit 2611c74

Please sign in to comment.