Skip to content

Commit

Permalink
Merge pull request cockroachdb#68106 from miretskiy/backport21.1-6392…
Browse files Browse the repository at this point in the history
…3-67268

release-21.1: Backport changefeed observability PRs.
  • Loading branch information
miretskiy authored Jul 27, 2021
2 parents 6d0bcbd + a8bea98 commit eb059d2
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 44 deletions.
62 changes: 41 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,9 @@ type changeFrontier struct {
freqEmitResolved time.Duration
// lastEmitResolved is the last time a resolved timestamp was emitted.
lastEmitResolved time.Time
// lastSlowSpanLog is the last time a slow span from `sf` was logged.
lastSlowSpanLog time.Time

// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// schemaChangeBoundary represents an hlc timestamp at which a schema change
// event occurred to a target watched by this frontier. If the changefeed is
Expand Down Expand Up @@ -895,7 +896,10 @@ type changeFrontier struct {
metricsID int
}

const runStatusUpdateFrequency time.Duration = time.Minute
const (
runStatusUpdateFrequency time.Duration = time.Minute
slowSpanMaxFrequency = 10 * time.Second
)

type jobState struct {
job *jobs.Job
Expand All @@ -916,11 +920,12 @@ func newChangeFrontierProcessor(
ctx := flowCtx.EvalCtx.Ctx()
memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem")
cf := &changeFrontier{
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
flowCtx: flowCtx,
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: span.MakeFrontier(spec.TrackedSpans...),
slowLogEveryN: log.Every(slowSpanMaxFrequency),
}
if err := cf.Init(
cf,
Expand Down Expand Up @@ -1231,9 +1236,14 @@ func (cf *changeFrontier) handleFrontierChanged(isBehind bool) error {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
cf.metrics.mu.Unlock()

checkpointStart := timeutil.Now()
if err := cf.checkpointResolvedTimestamp(newResolved, isBehind); err != nil {
return err
}
cf.metrics.CheckpointHistNanos.RecordValue(timeutil.Since(checkpointStart).Nanoseconds())
cf.metrics.FrontierUpdates.Inc(1)

if err := cf.maybeEmitResolved(newResolved); err != nil {
return err
}
Expand Down Expand Up @@ -1388,38 +1398,48 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
// returned boolean will be true if the resolved timestamp lags far behind the
// present as defined by the current configuration.
func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind bool) {
// These two cluster setting values represent the target responsiveness of
// poller and range feed. The cluster setting for switching between poller and
// rangefeed is only checked at changefeed start/resume, so instead of
// switching on it here, just add them. Also add 1 second in case both these
// settings are set really low (as they are in unit tests).
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval)
frontier := cf.sf.Frontier()
now := timeutil.Now()
resolvedBehind := now.Sub(frontier.GoTime())
if resolvedBehind <= slownessThreshold {
if resolvedBehind <= cf.slownessThreshold() {
return false
}

description := `sinkless feed`
description := "sinkless feed"
if !cf.isSinkless() {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.sf.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
return true
}

func (cf *changeFrontier) slownessThreshold() time.Duration {
clusterThreshold := changefeedbase.SlowSpanLogThreshold.Get(&cf.flowCtx.Cfg.Settings.SV)
if clusterThreshold > 0 {
return clusterThreshold
}

// These two cluster setting values represent the target
// responsiveness of schemafeed and rangefeed.
//
// We add 1 second in case both these settings are set really
// low (as they are in unit tests).
//
// TODO(ssd): We should probably take into account the flush
// frequency here.
pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV)
return time.Second + 10*(pollInterval+closedtsInterval)
}

// ConsumerClosed is part of the RowSource interface.
func (cf *changeFrontier) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
Expand Down
17 changes: 13 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,18 @@ func changefeedJobDescription(
return tree.AsStringWithFQNames(c, ann), nil
}

// validateNonNegativeDuration returns a nil error if optValue can be
// parsed as a duration and is non-negative; otherwise, an error is
// returned.
func validateNonNegativeDuration(optName string, optValue string) error {
if d, err := time.ParseDuration(optValue); err != nil {
return err
} else if d < 0 {
return errors.Errorf("negative durations are not accepted: %s='%s'", optName, optValue)
}
return nil
}

func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails, error) {
if details.Opts == nil {
// The proto MarshalTo method omits the Opts field if the map is empty.
Expand All @@ -447,11 +459,8 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
{
const opt = changefeedbase.OptResolvedTimestamps
if o, ok := details.Opts[opt]; ok && o != `` {
if d, err := time.ParseDuration(o); err != nil {
if err := validateNonNegativeDuration(opt, o); err != nil {
return jobspb.ChangefeedDetails{}, err
} else if d < 0 {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`negative durations are not accepted: %s='%s'`, opt, o)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,10 +2228,16 @@ func TestChangefeedErrors(t *testing.T) {
t, `unknown envelope: nope`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH envelope=nope`,
)

sqlDB.ExpectErr(
t, `time: invalid duration "bar"`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='bar'`,
)
sqlDB.ExpectErr(
t, `negative durations are not accepted: resolved='-1s'`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='-1s'`,
)

sqlDB.ExpectErr(
t, `cannot specify timestamp in the future`,
`EXPERIMENTAL CHANGEFEED FOR foo WITH cursor=$1`, timeutil.Now().Add(time.Hour),
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,20 @@ var PerChangefeedMemLimit = settings.RegisterByteSizeSetting(
"controls amount of data that can be buffered per changefeed",
1<<30,
)

// SlowSpanLogThreshold controls when we will log slow spans.
var SlowSpanLogThreshold = settings.RegisterDurationSetting(
"changefeed.slow_span_log_threshold",
"a changefeed will log spans with resolved timestamps this far behind the current wall-clock time; if 0, a default value is calculated based on other cluster settings",
0,
settings.NonNegativeDuration,
)

// ScanRequestLimit is the number of Scan requests that can run at once.
// Scan requests are issued when changefeed performs the backfill.
// If set to 0, a reasonable default will be chosen.
var ScanRequestLimit = settings.RegisterIntSetting(
"changefeed.backfill.concurrent_scan_requests",
"number of concurrent scan requests per node issued during a backfill",
0,
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/lease",
Expand Down
42 changes: 28 additions & 14 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -59,16 +60,7 @@ func (p *scanRequestScanner) Scan(
return err
}

// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
// can't count nodes in tenants
approxNodeCount = 1
}
maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV)
exportsSem := make(chan struct{}, maxConcurrentExports)
g := ctxgroup.WithContext(ctx)

Expand Down Expand Up @@ -258,15 +250,37 @@ func allRangeSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes, nil
return nodes
}

// maxConcurrentExportRequests returns the number of concurrent scan requests.
func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
if max > 100 {
max = 100
}
return max
}
Loading

0 comments on commit eb059d2

Please sign in to comment.