diff --git a/pkg/kv/kvprober/BUILD.bazel b/pkg/kv/kvprober/BUILD.bazel index 63d994a46628..83c099d35cc6 100644 --- a/pkg/kv/kvprober/BUILD.bazel +++ b/pkg/kv/kvprober/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvpb", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", @@ -64,5 +65,6 @@ go_test( "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//codes", ], ) diff --git a/pkg/kv/kvprober/kvprober.go b/pkg/kv/kvprober/kvprober.go index 17a8e0776500..3346f67df803 100644 --- a/pkg/kv/kvprober/kvprober.go +++ b/pkg/kv/kvprober/kvprober.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -209,6 +210,27 @@ func (p *ProberOps) Write(key roachpb.Key) func(context.Context, *kv.Txn) error } } +// errorIsExpectedDuringNormalOperation filters out errors that may be returned +// during normal operation of CRDB. +// +// One such example is the `was permanently removed from the cluster at` error +// that is returned to the kvclient of decommissioned nodes. This error does not +// affect user traffic, since such traffic is drained off the node by the time it +// becomes decommissioned. +// +// Since such errors do not indicate a problem with CRDB, kvprober does not report +// them as an error in its metrics. +func errorIsExpectedDuringNormalOperation(err error) bool { + // Note that errors *other* than decommissioned status errors, such as + // `use of closed network connection`, happen *occasionally* on the kvclient + // of a decommissioned node. The full set of other errors is not known exactly, + // and the errors mostly lack structure. Since they happen rarely, and since + // the intended use of kvprober is to page on a sustained error rate, not a + // single error, we choose to only filter out errors via the + // kvpb.IsDecommissionedStatusErr function. + return kvpb.IsDecommissionedStatusErr(err) +} + // validateKey returns an error if the key is not valid for use by the kvprober. // This is a sanity check to ensure that the kvprober does not corrupt user data // in the global keyspace or other system data in the local keyspace. @@ -351,8 +373,12 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT return } if err != nil { - log.Health.Errorf(ctx, "can't make a plan: %v", err) - p.metrics.ProbePlanFailures.Inc(1) + if errorIsExpectedDuringNormalOperation(err) { + log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err) + } else { + log.Health.Errorf(ctx, "can't make a plan: %v", err) + p.metrics.ProbePlanFailures.Inc(1) + } return } @@ -382,9 +408,13 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT return txns.TxnRootKV(ctx, f) }) if err != nil { - // TODO(josh): Write structured events with log.Structured. - log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err) - p.metrics.ReadProbeFailures.Inc(1) + if errorIsExpectedDuringNormalOperation(err) { + log.Health.Warningf(ctx, "kv.Get(%s), r=%v failed with expected error: %v", step.Key, step.RangeID, err) + } else { + // TODO(josh): Write structured events with log.Structured. + log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err) + p.metrics.ReadProbeFailures.Inc(1) + } return } @@ -414,8 +444,12 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober return } if err != nil { - log.Health.Errorf(ctx, "can't make a plan: %v", err) - p.metrics.ProbePlanFailures.Inc(1) + if errorIsExpectedDuringNormalOperation(err) { + log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err) + } else { + log.Health.Errorf(ctx, "can't make a plan: %v", err) + p.metrics.ProbePlanFailures.Inc(1) + } return } @@ -434,11 +468,17 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober return txns.TxnRootKV(ctx, f) }) if err != nil { - added := p.quarantineWritePool.maybeAdd(ctx, step) - log.Health.Errorf( - ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added, - ) - p.metrics.WriteProbeFailures.Inc(1) + if errorIsExpectedDuringNormalOperation(err) { + log.Health.Warningf( + ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with expected error: %v", step.Key, step.RangeID, err, + ) + } else { + added := p.quarantineWritePool.maybeAdd(ctx, step) + log.Health.Errorf( + ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added, + ) + p.metrics.WriteProbeFailures.Inc(1) + } return } // This will no-op if not in the quarantine pool. diff --git a/pkg/kv/kvprober/kvprober_integration_test.go b/pkg/kv/kvprober/kvprober_integration_test.go index 13b6433d9139..2f55ee77e006 100644 --- a/pkg/kv/kvprober/kvprober_integration_test.go +++ b/pkg/kv/kvprober/kvprober_integration_test.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -46,7 +45,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) { ctx := context.Background() t.Run("disabled by default", func(t *testing.T) { - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{}) + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{}) defer cleanup() kvprober.ReadInterval.Override(ctx, &s.ClusterSettings().SV, 5*time.Millisecond) @@ -61,7 +60,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) { }) t.Run("happy path", func(t *testing.T) { - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{}) + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{}) defer cleanup() kvprober.ReadEnabled.Override(ctx, &s.ClusterSettings().SV, true) @@ -87,7 +86,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) { }) t.Run("a single range is unavailable for all KV ops", func(t *testing.T) { - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{ + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error { for _, ru := range ba.Requests { @@ -129,7 +128,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) { var dbIsAvailable syncutil.AtomicBool dbIsAvailable.Set(true) - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{ + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error { if !dbIsAvailable.Get() { @@ -174,7 +173,7 @@ func TestProberDoesReadsAndWrites(t *testing.T) { var dbIsAvailable syncutil.AtomicBool dbIsAvailable.Set(true) - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{ + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(i context.Context, ba *kvpb.BatchRequest) *kvpb.Error { if !dbIsAvailable.Get() { @@ -225,7 +224,7 @@ func TestWriteProbeDoesNotLeaveLiveData(t *testing.T) { ctx := context.Background() - s, _, p, cleanup := initTestProber(t, base.TestingKnobs{}) + s, _, p, cleanup := initTestServer(t, base.TestingKnobs{}) defer cleanup() kvprober.WriteEnabled.Override(ctx, &s.ClusterSettings().SV, true) @@ -259,7 +258,7 @@ func TestPlannerMakesPlansCoveringAllRanges(t *testing.T) { ctx := context.Background() // Disable split and merge queue just in case. - _, sqlDB, p, cleanup := initTestProber(t, base.TestingKnobs{ + _, sqlDB, p, cleanup := initTestServer(t, base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{DisableSplitQueue: true, DisableMergeQueue: true}, }) defer cleanup() @@ -307,7 +306,7 @@ func TestProberOpsValidatesProbeKey(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - s, _, _, cleanup := initTestProber(t, base.TestingKnobs{}) + s, _, _, cleanup := initTestServer(t, base.TestingKnobs{}) defer cleanup() var ops kvprober.ProberOps @@ -350,23 +349,18 @@ func TestProberOpsValidatesProbeKey(t *testing.T) { } } -func initTestProber( +func initTestServer( t *testing.T, knobs base.TestingKnobs, ) (serverutils.TestServerInterface, *gosql.DB, *kvprober.Prober, func()) { - - s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Settings: cluster.MakeClusterSettings(), Knobs: knobs, }) - p := kvprober.NewProber(kvprober.Opts{ - Tracer: s.TracerI().(*tracing.Tracer), - DB: kvDB, - HistogramWindowInterval: time.Minute, // actual value not important to test - Settings: s.ClusterSettings(), - }) // Given small test cluster, this better exercises the planning logic. kvprober.NumStepsToPlanAtOnce.Override(context.Background(), &s.ClusterSettings().SV, 10) + + p := s.KvProber() // Want these tests to run as fast as possible; see planner_test.go for a // unit test of the rate limiting. p.SetPlanningRateLimits(0) diff --git a/pkg/kv/kvprober/kvprober_test.go b/pkg/kv/kvprober/kvprober_test.go index 6668e41d0727..d3e5d2bd5219 100644 --- a/pkg/kv/kvprober/kvprober_test.go +++ b/pkg/kv/kvprober/kvprober_test.go @@ -18,10 +18,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" ) func TestReadProbe(t *testing.T) { @@ -78,6 +80,31 @@ func TestReadProbe(t *testing.T) { require.Zero(t, p.Metrics().ReadProbeFailures.Count()) }) + // Once a node is fully decommissioned, neither kvclient nor kvprober work from + // the node. This does not indicate a service health issue; it is expected behavior. + // + // This is not tested with an integration test, since the kvclient of a decommissioned + // node will occasionally return other errors. We choose not to filter those out for + // reasons given at errorIsExpectedDuringNormalOperation. As a result, an integration test + // would be flaky. We believe a unit test is sufficient, largely because the main risk + // in only having a unit test is false positive pages on SRE, due to changes in what errors + // are returned from the kvclient of a decommissioned node. Though false positive pages add + // ops load, they do not directly affect the customer experience. + t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) { + m := &mock{ + t: t, + read: true, + planErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"), + } + p := initTestProber(ctx, m) + p.readProbeImpl(ctx, m, m, m) + + require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count()) + require.Zero(t, p.Metrics().ReadProbeAttempts.Count()) + require.Zero(t, p.Metrics().ProbePlanFailures.Count()) + require.Zero(t, p.Metrics().ReadProbeFailures.Count()) + }) + t.Run("txn fails", func(t *testing.T) { m := &mock{ t: t, @@ -107,6 +134,22 @@ func TestReadProbe(t *testing.T) { require.Zero(t, p.Metrics().ProbePlanFailures.Count()) require.Equal(t, int64(1), p.Metrics().ReadProbeFailures.Count()) }) + + // See comment above matching case in TestReadProbe regarding planning. + t.Run("read fails due to decommissioning but not counted as error", func(t *testing.T) { + m := &mock{ + t: t, + read: true, + readErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"), + } + p := initTestProber(ctx, m) + p.readProbeImpl(ctx, m, m, m) + + require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count()) + require.Equal(t, int64(1), p.Metrics().ReadProbeAttempts.Count()) + require.Zero(t, p.Metrics().ProbePlanFailures.Count()) + require.Zero(t, p.Metrics().ReadProbeFailures.Count()) + }) } func TestWriteProbe(t *testing.T) { @@ -163,6 +206,22 @@ func TestWriteProbe(t *testing.T) { require.Zero(t, p.Metrics().WriteProbeFailures.Count()) }) + // See comment above matching case in TestReadProbe regarding planning. + t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) { + m := &mock{ + t: t, + write: true, + planErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"), + } + p := initTestProber(ctx, m) + p.writeProbeImpl(ctx, m, m, m) + + require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count()) + require.Zero(t, p.Metrics().WriteProbeAttempts.Count()) + require.Zero(t, p.Metrics().ProbePlanFailures.Count()) + require.Zero(t, p.Metrics().WriteProbeFailures.Count()) + }) + t.Run("open txn fails", func(t *testing.T) { m := &mock{ t: t, @@ -192,6 +251,22 @@ func TestWriteProbe(t *testing.T) { require.Zero(t, p.Metrics().ProbePlanFailures.Count()) require.Equal(t, int64(1), p.Metrics().WriteProbeFailures.Count()) }) + + // See comment above matching case in TestReadProbe regarding planning. + t.Run("write fails due to decommissioning but not counted as error", func(t *testing.T) { + m := &mock{ + t: t, + write: true, + writeErr: kvpb.NewDecommissionedStatusErrorf(codes.PermissionDenied, "foobar"), + } + p := initTestProber(ctx, m) + p.writeProbeImpl(ctx, m, m, m) + + require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count()) + require.Equal(t, int64(1), p.Metrics().WriteProbeAttempts.Count()) + require.Zero(t, p.Metrics().ProbePlanFailures.Count()) + require.Zero(t, p.Metrics().WriteProbeFailures.Count()) + }) } func initTestProber(ctx context.Context, m *mock) *Prober { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index ed36949aa630..dbdd8240a223 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" @@ -1836,6 +1837,11 @@ func (ts *TestServer) BinaryVersionOverride() roachpb.Version { return knobs.(*TestingKnobs).BinaryVersionOverride } +// KvProber is part of the TestServerInterface. +func (ts *TestServer) KvProber() *kvprober.Prober { + return ts.Server.kvProber +} + type testServerFactoryImpl struct{} // TestServerFactory can be passed to serverutils.InitTestServerFactory diff --git a/pkg/testutils/serverutils/BUILD.bazel b/pkg/testutils/serverutils/BUILD.bazel index 1af6f17415c5..0aa772f90d06 100644 --- a/pkg/testutils/serverutils/BUILD.bazel +++ b/pkg/testutils/serverutils/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/config", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvprober", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/multitenant/tenantcapabilities", "//pkg/roachpb", diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index f5c8e3dec47a..611f56520611 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" @@ -302,6 +303,10 @@ type TestServerInterface interface { // BinaryVersionOverride returns the value of an override if set using // TestingKnobs. BinaryVersionOverride() roachpb.Version + + // KvProber returns a *kvprober.Prober, which is useful when asserting the + //correctness of the prober from integration tests. + KvProber() *kvprober.Prober } // TestServerFactory encompasses the actual implementation of the shim