Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
112342: changefeedccl: add observabilty to row fetcher rows  r=jayshrivastava a=jayshrivastava

### sql/row: add util.EveryN arg to row fetcher for TraceKV

This change adds a `util.EveryN` parameter to the `row.Fetcher`
called `TraceKVEveryN`. When KV tracing is enabled, the row fetcher
will print ALL KVs that it sees. This will could result in millions
of rows being logged in certain cases. This change adds an `EveryN`
to limit the log frequency. `util.EveryN` is used instead of `log.EveryN`
because `log.EveryN` is a noop when verbosity is 2 or above. The row
fetcher uses verbosity level 2 to log KVs.

Release note: None
Epic: None

### changefeedccl: add observabilty to row fetcher rows
This change adds some flags which can be enabled to make
row fetchers log the KVs they are fetching. The setting
`changefeed.cdcevent.trace_kv` is passed down to the row
fetcher to enable logging. The setting
`changefeed.cdcevent.trace_kv.log_frequency` is added as
well to limit the rate of logging.

Release note: None
Epic: None

112890: roachtest: admission-control/intent-resolution assert on 20 sublevels r=sumeerbhola a=aadityasondhi

This patch addresses a lingering TODO to start asserting on 20 sublevels since #109932 has been merged.

Release note: None

112958: kvserver: disable allocator checks with `COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER` r=erikgrinaker a=erikgrinaker

Otherwise, the replicate queue may be unable to relocate leaseholder replicas. This can cause e.g. `failover/partial/lease-leader` to flake.

Resolves #112241.
Epic: none
Release note: None

Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
4 people committed Oct 24, 2023
4 parents 3994981 + 8d74d10 + 1bf454f + 56fb5b1 commit 3e32c60
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 18 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
Expand All @@ -31,6 +33,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/cache",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func NewEventDecoder(
cfg.LeaseManager,
cfg.CollectionFactory,
cfg.DB,
cfg.Settings,
targets,
)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ package cdcevent

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand All @@ -23,12 +26,24 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// RowFetcherTraceKVLogFrequency controls how frequently KVs are logged when
// KV tracing is enabled.
var traceKVLogFrequency = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.cdcevent.trace_kv.log_frequency",
"controls how frequently KVs are logged when KV tracing is enabled",
500*time.Millisecond,
settings.NonNegativeDuration,
)

// rowFetcherCache maintains a cache of single table row.Fetchers. Given a key
// with an mvcc timestamp, it retrieves the correct TableDescriptor for that key
// and returns a row.Fetcher initialized with that table. This Fetcher's
Expand All @@ -43,9 +58,18 @@ type rowFetcherCache struct {
collection *descs.Collection
db *kv.DB

rfArgs rowFetcherArgs

a tree.DatumAlloc
}

// rowFetcherArgs contains arguments to pass to all row fetchers
// created by this cache.
type rowFetcherArgs struct {
traceKV bool
traceKVLogFrequency time.Duration
}

type cachedFetcher struct {
tableDesc catalog.TableDescriptor
fetcher row.Fetcher
Expand All @@ -65,6 +89,7 @@ func newRowFetcherCache(
leaseMgr *lease.Manager,
cf *descs.CollectionFactory,
db *kv.DB,
s *cluster.Settings,
targets changefeedbase.Targets,
) (*rowFetcherCache, error) {
if targets.Size == 0 {
Expand All @@ -85,6 +110,10 @@ func newRowFetcherCache(
db: db,
fetchers: cache.NewUnorderedCache(DefaultCacheConfig),
watchedFamilies: watchedFamilies,
rfArgs: rowFetcherArgs{
traceKV: log.V(row.TraceKVVerbosity),
traceKVLogFrequency: traceKVLogFrequency.Get(&s.SV),
},
}, err
}

Expand Down Expand Up @@ -259,6 +288,8 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(
WillUseKVProvider: true,
Alloc: &c.a,
Spec: &spec,
TraceKV: c.rfArgs.traceKV,
TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency},
},
); err != nil {
return nil, nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestRowFetcherCache(t *testing.T) {
serverCfg.LeaseManager.(*lease.Manager),
serverCfg.CollectionFactory,
serverCfg.DB.KV(),
serverCfg.Settings,
targets)
if err != nil {
t.Fatal(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ func registerIntentResolutionOverload(r registry.Registry) {
}
// Loop for up to 20 minutes. Intents take ~10min to resolve, and
// we're padding by another 10min.
//
// TODO(sumeer): change subLevelThreshold to 20 after intent
// resolution is subject to admission control.
const subLevelThreshold = 100
const subLevelThreshold = 20
numErrors := 0
numSuccesses := 0
latestIntentCount := math.MaxInt
Expand Down
12 changes: 3 additions & 9 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C
opts := option.DefaultStartOpts()
opts.RoachprodOpts.ScheduleBackups = false
settings := install.MakeClusterSettings()
settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true")
settings.Env = append(settings.Env, "COCKROACH_SCAN_MAX_IDLE_TIME=100ms") // speed up replication

m := c.NewMonitor(ctx, c.Range(1, 6))

Expand All @@ -543,6 +545,7 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C

// Place all ranges on n1-n3 to start with, and wait for upreplication.
configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}})

// NB: We want to ensure the system ranges are all down-replicated from their
// initial RF of 5, so pass in exactlyReplicationFactor below.
require.NoError(t, WaitForReplication(ctx, t, conn, 3, exactlyReplicationFactor))
Expand All @@ -564,15 +567,6 @@ func runFailoverPartialLeaseLeader(ctx context.Context, t test.Test, c cluster.C
relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6})
relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6}, []int{1, 2, 3})

settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true")

// Restart the nodes with the updated cluster setting.
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Range(1, 6))
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

// Wait for the TimeAfterNodeSuspect interval.
sleepFor(ctx, t, 30*time.Second)

// Check that we have a few split leaders/leaseholders on n4-n6. We give
// it a few seconds, since metrics are updated every 10 seconds.
for i := 0; ; i++ {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2183,8 +2183,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
// If there are any replicas that do match lease preferences, then we check if
// the existing leaseholder is one of them.
preferred := a.PreferredLeaseholders(storePool, conf, candidates)
preferred = excludeReplicasInNeedOfSnapshots(
ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred)
if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots {
preferred = excludeReplicasInNeedOfSnapshots(
ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred)
}
if len(preferred) == 0 {
return false
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,12 @@ func (sc *StoreConfig) SetDefaults(numStores int) {
if raftDisableLeaderFollowsLeaseholder {
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
sc.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader = true // otherwise lease requests fail
// The allocator must skip snapshot checks, since these only work when the
// leader and leaseholder are colocated.
if sc.TestingKnobs.AllocatorKnobs == nil {
sc.TestingKnobs.AllocatorKnobs = &allocator.TestingKnobs{}
}
sc.TestingKnobs.AllocatorKnobs.AllowLeaseTransfersToReplicasNeedingSnapshots = true
}
if raftDisableQuiescence {
sc.TestingKnobs.DisableQuiescence = true
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
Expand Down Expand Up @@ -287,6 +288,10 @@ func (rf *Fetcher) Close(ctx context.Context) {
}
}

// TraceKVVerbosity is the verbosity level at which pretty printed KVs
// are logged.
const TraceKVVerbosity = 2

// FetcherInitArgs contains arguments for Fetcher.Init.
type FetcherInitArgs struct {
// StreamingKVFetcher, if non-nil, contains the KVFetcher that uses the
Expand Down Expand Up @@ -321,7 +326,10 @@ type FetcherInitArgs struct {
MemMonitor *mon.BytesMonitor
Spec *fetchpb.IndexFetchSpec
// TraceKV indicates whether or not session tracing is enabled.
TraceKV bool
TraceKV bool
// TraceKVEvery controls how often KVs are sampled for logging with traceKV
// enabled.
TraceKVEvery *util.EveryN
ForceProductionKVBatchSize bool
// SpansCanOverlap indicates whether the spans in a given batch can overlap
// with one another. If it is true, spans that correspond to the same row must
Expand Down Expand Up @@ -1129,8 +1137,12 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, spanID
if err != nil {
return nil, 0, err
}
if rf.args.TraceKV {
log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal)
// TraceKVEvery is a util.EveryN and not a log.EveryN because
// log.EveryN will always print under verbosity level 2.
// The caller may choose to set it to avoid logging
// too many rows. If unset, we log every KV.
if rf.args.TraceKV && (rf.args.TraceKVEvery == nil || rf.args.TraceKVEvery.ShouldProcess(timeutil.Now())) {
log.VEventf(ctx, TraceKVVerbosity, "fetched: %s -> %s", prettyKey, prettyVal)
}

rowDone, spanID, err := rf.nextKey(ctx)
Expand Down

0 comments on commit 3e32c60

Please sign in to comment.