Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: Backport changefeed observability PRs. #68106

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
@@ -851,8 +851,9 @@ type changeFrontier struct {
freqEmitResolved time.Duration
// lastEmitResolved is the last time a resolved timestamp was emitted.
lastEmitResolved time.Time
// lastSlowSpanLog is the last time a slow span from `sf` was logged.
lastSlowSpanLog time.Time

// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// schemaChangeBoundary represents an hlc timestamp at which a schema change
// event occurred to a target watched by this frontier. If the changefeed is
@@ -895,7 +896,10 @@ type changeFrontier struct {
metricsID int
}

const runStatusUpdateFrequency time.Duration = time.Minute
const (
runStatusUpdateFrequency time.Duration = time.Minute
slowSpanMaxFrequency = 10 * time.Second
)

type jobState struct {
job *jobs.Job
@@ -916,11 +920,12 @@ func newChangeFrontierProcessor(
ctx := flowCtx.EvalCtx.Ctx()
memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem")
cf := &changeFrontier{
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
slowLogEveryN: log.Every(slowSpanMaxFrequency),
}
if err := cf.Init(
cf,
@@ -1231,9 +1236,14 @@ func (cf *changeFrontier) handleFrontierChanged(isBehind bool) error {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
cf.metrics.mu.Unlock()

checkpointStart := timeutil.Now()
if err := cf.checkpointResolvedTimestamp(newResolved, isBehind); err != nil {
return err
}
cf.metrics.CheckpointHistNanos.RecordValue(timeutil.Since(checkpointStart).Nanoseconds())
cf.metrics.FrontierUpdates.Inc(1)

if err := cf.maybeEmitResolved(newResolved); err != nil {
return err
}
@@ -1388,38 +1398,48 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
// returned boolean will be true if the resolved timestamp lags far behind the
// present as defined by the current configuration.
func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind bool) {
// These two cluster setting values represent the target responsiveness of
// poller and range feed. The cluster setting for switching between poller and
// rangefeed is only checked at changefeed start/resume, so instead of
// switching on it here, just add them. Also add 1 second in case both these
// settings are set really low (as they are in unit tests).
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval)
frontier := cf.sf.Frontier()
now := timeutil.Now()
resolvedBehind := now.Sub(frontier.GoTime())
if resolvedBehind <= slownessThreshold {
if resolvedBehind <= cf.slownessThreshold() {
return false
}

description := `sinkless feed`
description := "sinkless feed"
if !cf.isSinkless() {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.sf.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
return true
}

func (cf *changeFrontier) slownessThreshold() time.Duration {
clusterThreshold := changefeedbase.SlowSpanLogThreshold.Get(&cf.flowCtx.Cfg.Settings.SV)
if clusterThreshold > 0 {
return clusterThreshold
}

// These two cluster setting values represent the target
// responsiveness of schemafeed and rangefeed.
//
// We add 1 second in case both these settings are set really
// low (as they are in unit tests).
//
// TODO(ssd): We should probably take into account the flush
// frequency here.
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
return time.Second + 10*(pollInterval+closedtsInterval)
}

// ConsumerClosed is part of the RowSource interface.
func (cf *changeFrontier) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
17 changes: 13 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
@@ -437,6 +437,18 @@ func changefeedJobDescription(
return tree.AsStringWithFQNames(c, ann), nil
}

// validateNonNegativeDuration returns a nil error if optValue can be
// parsed as a duration and is non-negative; otherwise, an error is
// returned.
func validateNonNegativeDuration(optName string, optValue string) error {
if d, err := time.ParseDuration(optValue); err != nil {
return err
} else if d < 0 {
return errors.Errorf("negative durations are not accepted: %s='%s'", optName, optValue)
}
return nil
}

func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails, error) {
if details.Opts == nil {
// The proto MarshalTo method omits the Opts field if the map is empty.
@@ -447,11 +459,8 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
{
const opt = changefeedbase.OptResolvedTimestamps
if o, ok := details.Opts[opt]; ok && o != `` {
if d, err := time.ParseDuration(o); err != nil {
if err := validateNonNegativeDuration(opt, o); err != nil {
return jobspb.ChangefeedDetails{}, err
} else if d < 0 {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`negative durations are not accepted: %s='%s'`, opt, o)
}
}
}
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -2228,10 +2228,16 @@ func TestChangefeedErrors(t *testing.T) {
t, `unknown envelope: nope`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH envelope=nope`,
)

sqlDB.ExpectErr(
t, `time: invalid duration "bar"`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='bar'`,
)
sqlDB.ExpectErr(
t, `negative durations are not accepted: resolved='-1s'`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='-1s'`,
)

sqlDB.ExpectErr(
t, `cannot specify timestamp in the future`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH cursor=$1`, timeutil.Now().Add(time.Hour),
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
@@ -45,3 +45,20 @@ var PerChangefeedMemLimit = settings.RegisterByteSizeSetting(
"controls amount of data that can be buffered per changefeed",
1<<30,
)

// SlowSpanLogThreshold controls when we will log slow spans.
var SlowSpanLogThreshold = settings.RegisterDurationSetting(
"changefeed.slow_span_log_threshold",
"a changefeed will log spans with resolved timestamps this far behind the current wall-clock time; if 0, a default value is calculated based on other cluster settings",
0,
settings.NonNegativeDuration,
)

// ScanRequestLimit is the number of Scan requests that can run at once.
// Scan requests are issued when changefeed performs the backfill.
// If set to 0, a reasonable default will be chosen.
var ScanRequestLimit = settings.RegisterIntSetting(
"changefeed.backfill.concurrent_scan_requests",
"number of concurrent scan requests per node issued during a backfill",
0,
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/lease",
42 changes: 28 additions & 14 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
@@ -13,13 +13,14 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
@@ -59,16 +60,7 @@ func (p *scanRequestScanner) Scan(
return err
}

// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
// can't count nodes in tenants
approxNodeCount = 1
}
maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV)
exportsSem := make(chan struct{}, maxConcurrentExports)
g := ctxgroup.WithContext(ctx)

@@ -258,15 +250,37 @@ func allRangeSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes, nil
return nodes
}

// maxConcurrentExportRequests returns the number of concurrent scan requests.
func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
if max > 100 {
max = 100
}
return max
}
Loading