diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 0c5a38cd0c93..f65bc1744afc 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -17,6 +17,8 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", @@ -31,6 +33,7 @@ go_library( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 4ee1c102e911..2772e43cc633 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -466,6 +466,7 @@ func NewEventDecoder( cfg.LeaseManager, cfg.CollectionFactory, cfg.DB, + cfg.Settings, targets, ) if err != nil { diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index a9c2c192e179..959905244e1a 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -10,11 +10,14 @@ package cdcevent import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -23,12 +26,24 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) +// RowFetcherTraceKVLogFrequency controls how frequently KVs are logged when +// KV tracing is enabled. +var traceKVLogFrequency = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "changefeed.cdcevent.trace_kv.log_frequency", + "controls how frequently KVs are logged when KV tracing is enabled", + 500*time.Millisecond, + settings.NonNegativeDuration, +) + // rowFetcherCache maintains a cache of single table row.Fetchers. Given a key // with an mvcc timestamp, it retrieves the correct TableDescriptor for that key // and returns a row.Fetcher initialized with that table. This Fetcher's @@ -43,9 +58,18 @@ type rowFetcherCache struct { collection *descs.Collection db *kv.DB + rfArgs rowFetcherArgs + a tree.DatumAlloc } +// rowFetcherArgs contains arguments to pass to all row fetchers +// created by this cache. +type rowFetcherArgs struct { + traceKV bool + traceKVLogFrequency time.Duration +} + type cachedFetcher struct { tableDesc catalog.TableDescriptor fetcher row.Fetcher @@ -65,6 +89,7 @@ func newRowFetcherCache( leaseMgr *lease.Manager, cf *descs.CollectionFactory, db *kv.DB, + s *cluster.Settings, targets changefeedbase.Targets, ) (*rowFetcherCache, error) { if targets.Size == 0 { @@ -85,6 +110,10 @@ func newRowFetcherCache( db: db, fetchers: cache.NewUnorderedCache(DefaultCacheConfig), watchedFamilies: watchedFamilies, + rfArgs: rowFetcherArgs{ + traceKV: log.V(row.TraceKVVerbosity), + traceKVLogFrequency: traceKVLogFrequency.Get(&s.SV), + }, }, err } @@ -259,6 +288,8 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( WillUseKVProvider: true, Alloc: &c.a, Spec: &spec, + TraceKV: c.rfArgs.traceKV, + TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency}, }, ); err != nil { return nil, nil, err diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go index 1e213d1e10a8..20a4266b5cb9 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go @@ -74,6 +74,7 @@ func TestRowFetcherCache(t *testing.T) { serverCfg.LeaseManager.(*lease.Manager), serverCfg.CollectionFactory, serverCfg.DB.KV(), + serverCfg.Settings, targets) if err != nil { t.Fatal(err) diff --git a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go index 794bc757e194..1ceecb49d82e 100644 --- a/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go +++ b/pkg/cmd/roachtest/tests/admission_control_intent_resolution.go @@ -124,10 +124,7 @@ func registerIntentResolutionOverload(r registry.Registry) { } // Loop for up to 20 minutes. Intents take ~10min to resolve, and // we're padding by another 10min. - // - // TODO(sumeer): change subLevelThreshold to 20 after intent - // resolution is subject to admission control. - const subLevelThreshold = 100 + const subLevelThreshold = 20 numErrors := 0 numSuccesses := 0 latestIntentCount := math.MaxInt diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 60bab8bd20d3..c8e269ffbab0 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -529,6 +529,8 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C opts := option.DefaultStartOpts() opts.RoachprodOpts.ScheduleBackups = false settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true") + settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=100ms") // speed up replication m := c.NewMonitor(ctx, c.Range(1, 6)) @@ -543,6 +545,7 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C // Place all ranges on n1-n3 to start with, and wait for upreplication. configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) + // NB: We want to ensure the system ranges are all down-replicated from their // initial RF of 5, so pass in exactlyReplicationFactor below. require.NoError(t, WaitForReplication(ctx, t, conn, 3, exactlyReplicationFactor)) @@ -564,15 +567,6 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6}) relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6}, []int{1, 2, 3}) - settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true") - - // Restart the nodes with the updated cluster setting. - c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Range(1, 6)) - c.Start(ctx, t.L(), opts, settings, c.Range(1, 6)) - - // Wait for the TimeAfterNodeSuspect interval. - sleepFor(ctx, t, 30*time.Second) - // Check that we have a few split leaders/leaseholders on n4-n6. We give // it a few seconds, since metrics are updated every 10 seconds. for i := 0; ; i++ { diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index edbee99b5bf7..9c06611f0029 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2183,8 +2183,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // If there are any replicas that do match lease preferences, then we check if // the existing leaseholder is one of them. preferred := a.PreferredLeaseholders(storePool, conf, candidates) - preferred = excludeReplicasInNeedOfSnapshots( - ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) + if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots { + preferred = excludeReplicasInNeedOfSnapshots( + ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) + } if len(preferred) == 0 { return false } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index d268f8970ef6..65c807ba810a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1333,6 +1333,12 @@ func (sc *StoreConfig) SetDefaults(numStores int) { if raftDisableLeaderFollowsLeaseholder { sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true sc.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader = true // otherwise lease requests fail + // The allocator must skip snapshot checks, since these only work when the + // leader and leaseholder are colocated. + if sc.TestingKnobs.AllocatorKnobs == nil { + sc.TestingKnobs.AllocatorKnobs = &allocator.TestingKnobs{} + } + sc.TestingKnobs.AllocatorKnobs.AllowLeaseTransfersToReplicasNeedingSnapshots = true } if raftDisableQuiescence { sc.TestingKnobs.DisableQuiescence = true diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 355d6120d51a..f777c7a250c9 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" @@ -287,6 +288,10 @@ func (rf *Fetcher) Close(ctx context.Context) { } } +// TraceKVVerbosity is the verbosity level at which pretty printed KVs +// are logged. +const TraceKVVerbosity = 2 + // FetcherInitArgs contains arguments for Fetcher.Init. type FetcherInitArgs struct { // StreamingKVFetcher, if non-nil, contains the KVFetcher that uses the @@ -321,7 +326,10 @@ type FetcherInitArgs struct { MemMonitor *mon.BytesMonitor Spec *fetchpb.IndexFetchSpec // TraceKV indicates whether or not session tracing is enabled. - TraceKV bool + TraceKV bool + // TraceKVEvery controls how often KVs are sampled for logging with traceKV + // enabled. + TraceKVEvery *util.EveryN ForceProductionKVBatchSize bool // SpansCanOverlap indicates whether the spans in a given batch can overlap // with one another. If it is true, spans that correspond to the same row must @@ -1129,8 +1137,12 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, spanID if err != nil { return nil, 0, err } - if rf.args.TraceKV { - log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal) + // TraceKVEvery is a util.EveryN and not a log.EveryN because + // log.EveryN will always print under verbosity level 2. + // The caller may choose to set it to avoid logging + // too many rows. If unset, we log every KV. + if rf.args.TraceKV && (rf.args.TraceKVEvery == nil || rf.args.TraceKVEvery.ShouldProcess(timeutil.Now())) { + log.VEventf(ctx, TraceKVVerbosity, "fetched: %s -> %s", prettyKey, prettyVal) } rowDone, spanID, err := rf.nextKey(ctx)