From 1bf454ff8270542f2b6d6e503ad6fb57f49d2ab4 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Mon, 23 Oct 2023 13:57:15 -0400 Subject: [PATCH 1/5] roachtest: admission-control/intent-resolution assert on 20 sublevels This patch addresses a lingering TODO to start asserting on 20 sublevels since https://github.com/cockroachdb/cockroach/pull/109932 has been merged. Release note: None --- .../roachtest/tests/admission_control_intent_resolution.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go index 794bc757e194..1ceecb49d82e 100644 --- a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go +++ b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go @@ -124,10 +124,7 @@ func registerIntentResolutionOverload(r registry.Registry) { } // Loop for up to 20 minutes. Intents take ~10min to resolve, and // we're padding by another 10min. - // - // TODO(sumeer): change subLevelThreshold to 20 after intent - // resolution is subject to admission control. - const subLevelThreshold = 100 + const subLevelThreshold = 20 numErrors := 0 numSuccesses := 0 latestIntentCount := math.MaxInt From 736174ede7cc695555811785bc419c9d835ec91d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 24 Oct 2023 10:53:39 +0000 Subject: [PATCH 2/5] Revert "roachtest: initialize system before restricting lease transfers" This reverts commit d22da89bb21eb4d11c10d88782e9bd827bda2b87. --- pkg/cmd/roachtest/tests/failover.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 60bab8bd20d3..022da21a1405 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -529,6 +529,8 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C opts := option.DefaultStartOpts() opts.RoachprodOpts.ScheduleBackups = false settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true") + settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=100ms") // speed up replication m := c.NewMonitor(ctx, c.Range(1, 6)) @@ -547,12 +549,18 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C // initial RF of 5, so pass in exactlyReplicationFactor below. require.NoError(t, WaitForReplication(ctx, t, conn, 3, exactlyReplicationFactor)) + // Disable the replicate queue. It can otherwise end up with stuck + // overreplicated ranges during rebalancing, because downreplication requires + // the Raft leader to be colocated with the leaseholder. + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.replicate_queue.enabled = false`) + require.NoError(t, err) + // Now that system ranges are properly placed on n1-n3, start n4-n6. c.Start(ctx, t.L(), opts, settings, c.Range(4, 6)) // Create the kv database on n4-n6. t.L().Printf("creating workload database") - _, err := conn.ExecContext(ctx, `CREATE DATABASE kv`) + _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{4, 5, 6}}) @@ -564,15 +572,6 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6}) relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6}, []int{1, 2, 3}) - settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true") - - // Restart the nodes with the updated cluster setting. - c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Range(1, 6)) - c.Start(ctx, t.L(), opts, settings, c.Range(1, 6)) - - // Wait for the TimeAfterNodeSuspect interval. - sleepFor(ctx, t, 30*time.Second) - // Check that we have a few split leaders/leaseholders on n4-n6. We give // it a few seconds, since metrics are updated every 10 seconds. for i := 0; ; i++ { From 56fb5b13f87f7f9fddcf97637503ca975ab7c7ab Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 24 Oct 2023 11:24:23 +0000 Subject: [PATCH 3/5] kvserver: disable allocator checks with `COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER` Otherwise, the replicate queue may be unable to relocate leaseholder replicas. This can cause e.g. `failover/partial/lease-leader` to flake. Epic: none Release note: None --- pkg/cmd/roachtest/tests/failover.go | 9 ++------- pkg/kv/kvserver/allocator/allocatorimpl/allocator.go | 6 ++++-- pkg/kv/kvserver/store.go | 6 ++++++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 022da21a1405..c8e269ffbab0 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -545,22 +545,17 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C // Place all ranges on n1-n3 to start with, and wait for upreplication. configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) + // NB: We want to ensure the system ranges are all down-replicated from their // initial RF of 5, so pass in exactlyReplicationFactor below. require.NoError(t, WaitForReplication(ctx, t, conn, 3, exactlyReplicationFactor)) - // Disable the replicate queue. It can otherwise end up with stuck - // overreplicated ranges during rebalancing, because downreplication requires - // the Raft leader to be colocated with the leaseholder. - _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.replicate_queue.enabled = false`) - require.NoError(t, err) - // Now that system ranges are properly placed on n1-n3, start n4-n6. c.Start(ctx, t.L(), opts, settings, c.Range(4, 6)) // Create the kv database on n4-n6. t.L().Printf("creating workload database") - _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) + _, err := conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{4, 5, 6}}) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index edbee99b5bf7..9c06611f0029 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2183,8 +2183,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // If there are any replicas that do match lease preferences, then we check if // the existing leaseholder is one of them. preferred := a.PreferredLeaseholders(storePool, conf, candidates) - preferred = excludeReplicasInNeedOfSnapshots( - ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) + if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots { + preferred = excludeReplicasInNeedOfSnapshots( + ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) + } if len(preferred) == 0 { return false } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index d268f8970ef6..65c807ba810a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1333,6 +1333,12 @@ func (sc *StoreConfig) SetDefaults(numStores int) { if raftDisableLeaderFollowsLeaseholder { sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true sc.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader = true // otherwise lease requests fail + // The allocator must skip snapshot checks, since these only work when the + // leader and leaseholder are colocated. + if sc.TestingKnobs.AllocatorKnobs == nil { + sc.TestingKnobs.AllocatorKnobs = &allocator.TestingKnobs{} + } + sc.TestingKnobs.AllocatorKnobs.AllowLeaseTransfersToReplicasNeedingSnapshots = true } if raftDisableQuiescence { sc.TestingKnobs.DisableQuiescence = true From 8a31718db84c15f2379bb67eb910f3d817d614b4 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 9 Oct 2023 15:00:39 -0400 Subject: [PATCH 4/5] sql/row: add util.EveryN arg to row fetcher for TraceKV This change adds a `util.EveryN` parameter to the `row.Fetcher` called `TraceKVEveryN`. When KV tracing is enabled, the row fetcher will print ALL KVs that it sees. This will could result in millions of rows being logged in certain cases. This change adds an `EveryN` to limit the log frequency. `util.EveryN` is used instead of `log.EveryN` because `log.EveryN` is a noop when verbosity is 2 or above. The row fetcher uses verbosity level 2 to log KVs. Release note: None Epic: None --- pkg/sql/row/fetcher.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 355d6120d51a..f777c7a250c9 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" @@ -287,6 +288,10 @@ func (rf *Fetcher) Close(ctx context.Context) { } } +// TraceKVVerbosity is the verbosity level at which pretty printed KVs +// are logged. +const TraceKVVerbosity = 2 + // FetcherInitArgs contains arguments for Fetcher.Init. type FetcherInitArgs struct { // StreamingKVFetcher, if non-nil, contains the KVFetcher that uses the @@ -321,7 +326,10 @@ type FetcherInitArgs struct { MemMonitor *mon.BytesMonitor Spec *fetchpb.IndexFetchSpec // TraceKV indicates whether or not session tracing is enabled. - TraceKV bool + TraceKV bool + // TraceKVEvery controls how often KVs are sampled for logging with traceKV + // enabled. + TraceKVEvery *util.EveryN ForceProductionKVBatchSize bool // SpansCanOverlap indicates whether the spans in a given batch can overlap // with one another. If it is true, spans that correspond to the same row must @@ -1129,8 +1137,12 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, spanID if err != nil { return nil, 0, err } - if rf.args.TraceKV { - log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal) + // TraceKVEvery is a util.EveryN and not a log.EveryN because + // log.EveryN will always print under verbosity level 2. + // The caller may choose to set it to avoid logging + // too many rows. If unset, we log every KV. + if rf.args.TraceKV && (rf.args.TraceKVEvery == nil || rf.args.TraceKVEvery.ShouldProcess(timeutil.Now())) { + log.VEventf(ctx, TraceKVVerbosity, "fetched: %s -> %s", prettyKey, prettyVal) } rowDone, spanID, err := rf.nextKey(ctx) From 8d74d10e77f020dd0f5c98003f7976b63531de18 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 9 Oct 2023 15:03:50 -0400 Subject: [PATCH 5/5] changefeedccl: add observabilty to row fetcher rows This change adds some flags which can be enabled to make row fetchers log the KVs they are fetching. The setting `changefeed.cdcevent.trace_kv` is passed down to the row fetcher to enable logging. The setting `changefeed.cdcevent.trace_kv.log_frequency` is added as well to limit the rate of logging. Release note: None Epic: None --- pkg/ccl/changefeedccl/cdcevent/BUILD.bazel | 3 ++ pkg/ccl/changefeedccl/cdcevent/event.go | 1 + .../cdcevent/rowfetcher_cache.go | 31 +++++++++++++++++++ .../changefeedccl/cdcevent/rowfetcher_test.go | 1 + 4 files changed, 36 insertions(+) diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 0c5a38cd0c93..f65bc1744afc 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -17,6 +17,8 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", @@ -31,6 +33,7 @@ go_library( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 4ee1c102e911..2772e43cc633 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -466,6 +466,7 @@ func NewEventDecoder( cfg.LeaseManager, cfg.CollectionFactory, cfg.DB, + cfg.Settings, targets, ) if err != nil { diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index a9c2c192e179..959905244e1a 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -10,11 +10,14 @@ package cdcevent import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -23,12 +26,24 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) +// RowFetcherTraceKVLogFrequency controls how frequently KVs are logged when +// KV tracing is enabled. +var traceKVLogFrequency = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "changefeed.cdcevent.trace_kv.log_frequency", + "controls how frequently KVs are logged when KV tracing is enabled", + 500*time.Millisecond, + settings.NonNegativeDuration, +) + // rowFetcherCache maintains a cache of single table row.Fetchers. Given a key // with an mvcc timestamp, it retrieves the correct TableDescriptor for that key // and returns a row.Fetcher initialized with that table. This Fetcher's @@ -43,9 +58,18 @@ type rowFetcherCache struct { collection *descs.Collection db *kv.DB + rfArgs rowFetcherArgs + a tree.DatumAlloc } +// rowFetcherArgs contains arguments to pass to all row fetchers +// created by this cache. +type rowFetcherArgs struct { + traceKV bool + traceKVLogFrequency time.Duration +} + type cachedFetcher struct { tableDesc catalog.TableDescriptor fetcher row.Fetcher @@ -65,6 +89,7 @@ func newRowFetcherCache( leaseMgr *lease.Manager, cf *descs.CollectionFactory, db *kv.DB, + s *cluster.Settings, targets changefeedbase.Targets, ) (*rowFetcherCache, error) { if targets.Size == 0 { @@ -85,6 +110,10 @@ func newRowFetcherCache( db: db, fetchers: cache.NewUnorderedCache(DefaultCacheConfig), watchedFamilies: watchedFamilies, + rfArgs: rowFetcherArgs{ + traceKV: log.V(row.TraceKVVerbosity), + traceKVLogFrequency: traceKVLogFrequency.Get(&s.SV), + }, }, err } @@ -259,6 +288,8 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( WillUseKVProvider: true, Alloc: &c.a, Spec: &spec, + TraceKV: c.rfArgs.traceKV, + TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency}, }, ); err != nil { return nil, nil, err diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go index 1e213d1e10a8..20a4266b5cb9 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go @@ -74,6 +74,7 @@ func TestRowFetcherCache(t *testing.T) { serverCfg.LeaseManager.(*lease.Manager), serverCfg.CollectionFactory, serverCfg.DB.KV(), + serverCfg.Settings, targets) if err != nil { t.Fatal(err)