Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84555: sql: make sequence integer bound consistent with `default_int_size` r=ZhouXing19 a=ZhouXing19

This PR is based on #84034. 
Please just look at the 2nd commit. I'll rebase this PR once #84034 is merged.

Previously, the default bounds of sequence are always`math.MaxInt64` 
or `math.MinInt64` (depending on the sequence's order). This
can be inconsistent with the cluster setting `default_int_size`. This commit
is to fix it.

fixes #84554

Release note (bug fix): make sequence integer bound consistent with the
cluster setting `default_int_size`.

Release justification: fix a bug of sequence integer bound

86253: changefeedccl: make core changefeeds more resilient r=jayshrivastava a=jayshrivastava

This change updates core changefeeds to save checkpoints
inside the EvalCtx. With this change, core changefeeds
can retry from the last checkpoint instead of restarting
from the beginning.

Fixes: #84511

Release note (general change): This change updates core/experimental
changefeeds to be more resilient to transient errors
(ex. network blips) by adding checkpointing.

Previously, transient errors would result in a core changefeed
stopping and terminating the underlying SQL statement. This
would require the SQL statement to be restarted by a user.
Furtheremore, if the core changefeed were restarted during an
inital scan, the initial scan would start from the beginning.
For large initial scans, transient errors are more likely,
so restarting from the beginning would likely see more transient
errors and restarts which would not progress the changefeed.

Now, an experimental changefeed will automatically take
frequent checkpoints and retry from the last checkpoint
when a transient errors occurs.

Release justification: This change updates an experimental
feature.

86272: insights: execution_insights_capacity cluster setting r=matthewtodd a=matthewtodd

Fixes #79450.

Whereas we previously retained only the 10 most recent insights, we now
let the user choose how many they'd like to hang onto.

It may make sense to be more sophisticated than a simple LRU cache here:
perhaps we won't want a single fingerprint to dominate the list. That
work is captured in #86271.

Release justification: Category 2: Bug fixes and low-risk updates to new
functionality.

Release note (ops change): The
`sql.insights.execution_insights_capacity` cluster setting was
introduced, limiting the number of SQL execution insights retained in
memory per node.

86329: ui/cluster-ui: show status as waiting when txn is waiting for lock r=xinhaoz a=xinhaoz

Now that we have surfaced contention information in the UI, we can
update the stmt / txn status field for active executions to be
'Waiting' when the stmt or txn is waiting to acquire a lock.

Release justification: low risk update to existing functionality
Release note (ui change): txns and stmts in active exec pages that
are waiting for a lock will now have the status 'Waiting'

<img width="654" alt="image" src="https://user-images.githubusercontent.com/20136951/185226858-8c194582-d405-4c8b-aec9-7a21a4bc1c22.png">


86348: sql/stats: remove NumRange-stealing behavior from histogram prediction r=yuzefovich,rytaft a=michae2

We should be able to handle NumEq=0 just fine everywhere that uses
histograms, so delete this NumRange-stealing code.

Fixes: #86344

Release justification: low-risk updates to new functionality.

Release note: None

86383: sql: move UDF execution tests to bottom of test file r=mgartner a=mgartner

This commit moves UDF execution logic tests to the bottom of the UDF
test file so that execution-related tests add in the future will not
change the output of schema-related tests.

Release justification: This is a test-only change.

Release note: None

86385: opt: clarify the return type of Index.Ordinal() r=mgartner a=mgartner

The return type of `Index.Ordinal()` is now the type alias
`cat.IndexOrdinal` to be consistent with other functions that use an
index ordinal, like `Table.Index(i cat.IndexOrdinal)`. It is now more
clear that `idx == Table().Index(idx.Ordinal())`

Release justification: This is a very small, low-risk change.

Release note: None

Co-authored-by: Jane Xing <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
7 people committed Aug 18, 2022
8 parents 2cdb9e2 + b6dc58b + 592ce26 + cf996d2 + db2cbd9 + 98d1e0c + 4a3a302 + c863dea commit 5789fac
Show file tree
Hide file tree
Showing 31 changed files with 796 additions and 413 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ sql.distsql.max_running_flows integer -128 the value - when positive - used as i
sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage
sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable
sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable
sql.insights.execution_insights_capacity integer 1000 the size of the per-node store of execution insights
sql.insights.latency_threshold duration 100ms amount of time after which an executing statement is considered slow. Use 0 to disable.
sql.log.slow_query.experimental_full_table_scans.enabled boolean false when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.
sql.log.slow_query.internal_queries.enabled boolean false when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
<tr><td><code>sql.guardrails.max_row_size_err</code></td><td>byte size</td><td><code>512 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable</td></tr>
<tr><td><code>sql.guardrails.max_row_size_log</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable</td></tr>
<tr><td><code>sql.hash_sharded_range_pre_split.max</code></td><td>integer</td><td><code>16</code></td><td>max pre-split ranges to have when adding hash sharded index to an existing table</td></tr>
<tr><td><code>sql.insights.execution_insights_capacity</code></td><td>integer</td><td><code>1000</code></td><td>the size of the per-node store of execution insights</td></tr>
<tr><td><code>sql.insights.latency_threshold</code></td><td>duration</td><td><code>100ms</code></td><td>amount of time after which an executing statement is considered slow. Use 0 to disable.</td></tr>
<tr><td><code>sql.log.slow_query.experimental_full_table_scans.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.</td></tr>
<tr><td><code>sql.log.slow_query.internal_queries.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.</td></tr>
Expand Down
163 changes: 105 additions & 58 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ type changeFrontier struct {
// metricsID is used as the unique id of this changefeed in the
// metrics.MaxBehindNanos map.
metricsID int

knobs TestingKnobs
}

const (
Expand All @@ -729,7 +731,11 @@ const (

// jobState encapsulates changefeed job state.
type jobState struct {
job *jobs.Job
// job is set for changefeeds other than core/sinkless changefeeds.
job *jobs.Job
// coreProgress is set for only core/sinkless changefeeds.
coreProgress *coreChangefeedProgress

settings *cluster.Settings
metrics *Metrics
ts timeutil.TimeSource
Expand All @@ -744,11 +750,36 @@ type jobState struct {
progressUpdatesSkipped bool
}

type coreChangefeedProgress struct {
progress jobspb.Progress
}

// SetHighwater implements the eval.ChangefeedState interface.
func (cp *coreChangefeedProgress) SetHighwater(frontier *hlc.Timestamp) {
cp.progress.Progress = &jobspb.Progress_HighWater{
HighWater: frontier,
}
}

// SetCheckpoint implements the eval.ChangefeedState interface.
func (cp *coreChangefeedProgress) SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) {
changefeedProgress := cp.progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &jobspb.ChangefeedProgress_Checkpoint{
Spans: spans,
Timestamp: timestamp,
}
}

func newJobState(
j *jobs.Job, st *cluster.Settings, metrics *Metrics, ts timeutil.TimeSource,
j *jobs.Job,
coreProgress *coreChangefeedProgress,
st *cluster.Settings,
metrics *Metrics,
ts timeutil.TimeSource,
) *jobState {
return &jobState{
job: j,
coreProgress: coreProgress,
settings: st,
metrics: metrics,
ts: ts,
Expand Down Expand Up @@ -830,6 +861,7 @@ func newChangeFrontierProcessor(
if err != nil {
return nil, err
}

cf := &changeFrontier{
flowCtx: flowCtx,
spec: spec,
Expand All @@ -838,6 +870,11 @@ func newChangeFrontierProcessor(
frontier: sf,
slowLogEveryN: log.Every(slowSpanMaxFrequency),
}

if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
cf.knobs = *cfKnobs
}

if err := cf.Init(
cf,
post,
Expand Down Expand Up @@ -937,7 +974,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.MoveToDraining(err)
return
}
cf.js = newJobState(job, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})
cf.js = newJobState(job, nil, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})

if changefeedbase.FrontierCheckpointFrequency.Get(&cf.flowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
Expand Down Expand Up @@ -968,6 +1005,10 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// running status around for a while before we override it.
cf.js.lastRunStatusUpdate = timeutil.Now()
}
} else {
cf.js = newJobState(nil,
cf.EvalCtx.ChangefeedState.(*coreChangefeedProgress),
cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})
}

cf.metrics.mu.Lock()
Expand Down Expand Up @@ -1038,7 +1079,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime)
err = changefeedbase.MarkRetryableError(err)
}
}

Expand Down Expand Up @@ -1139,22 +1180,16 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged

// Checkpoint job record progress if needed.
// NB: Sinkless changefeeds will not have a job state (js). In fact, they
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}

// Emit resolved timestamp only if we have checkpointed the job.
// Usually, this happens every time frontier changes, but we can skip some updates
// if we update frontier too rapidly.
emitResolved = checkpointed
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}

// Emit resolved timestamp only if we have checkpointed the job.
// Usually, this happens every time frontier changes, but we can skip some updates
// if we update frontier too rapidly.
emitResolved = checkpointed

if emitResolved {
// Keeping this after the checkpointJobProgress call will avoid
// some duplicates if a restart happens.
Expand Down Expand Up @@ -1239,60 +1274,72 @@ func (cf *changeFrontier) checkpointJobProgress(
}
cf.metrics.FrontierUpdates.Inc(1)
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
updateSkipped = err
return nil
}
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
updateSkipped = err
return nil
}

// Advance resolved timestamp.
progress := md.Progress
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &frontier,
}
// Advance resolved timestamp.
progress := md.Progress
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &frontier,
}

changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint

timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
return err
}
timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
return err
}

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}

ju.UpdateProgress(progress)
ju.UpdateProgress(progress)

// Reset RunStats.NumRuns to 1 since the changefeed is
// now running. By resetting the NumRuns, we avoid
// future job system level retries from having large
// backoffs because of past failures.
if md.RunStats != nil {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}
// Reset RunStats.NumRuns to 1 since the changefeed is
// now running. By resetting the NumRuns, we avoid
// future job system level retries from having large
// backoffs because of past failures.
if md.RunStats != nil {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

return nil
}); err != nil {
return false, err
return nil
}); err != nil {
return false, err
}
} else {
cf.js.coreProgress.SetHighwater(&frontier)
cf.js.coreProgress.SetCheckpoint(checkpoint.Spans, checkpoint.Timestamp)
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
}
}

return true, nil
}

Expand Down
18 changes: 6 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func changefeedPlanHook(
}

if details.SinkURI == `` {

p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{
progress: progress,
}

// If this is a sinkless changefeed, then we should not hold on to the
// descriptor leases accessed to plan the changefeed. If changes happen
// to descriptors, they will be addressed during the execution.
Expand Down Expand Up @@ -220,18 +225,7 @@ func changefeedPlanHook(
return err
}

// Check for a schemachange boundary timestamp returned via a
// retryable error. Retrying without updating the changefeed progress
// will result in the changefeed performing the schema change again,
// causing an infinite loop.
if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok {
progress = jobspb.Progress{
Progress: &jobspb.Progress_HighWater{HighWater: &ts},
Details: &jobspb.Progress_Changefeed{
Changefeed: &jobspb.ChangefeedProgress{},
},
}
}
progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress
}
telemetry.Count(`changefeed.core.error`)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5752,6 +5752,64 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}
}

// TestCoreChangefeedBackfillScanCheckpoint tests that a core changefeed
// successfully completes the initial scan of a table when transient errors occur.
// This test only succeeds if checkpoints are taken.
func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t)
skip.UnderShort(t)

rnd, _ := randutil.NewPseudoRand()

rowCount := 10000

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo(a INT PRIMARY KEY)`)
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo (a) SELECT * FROM generate_series(%d, %d)`, 0, rowCount))

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

// Ensure Scan Requests are always small enough that we receive multiple
// resolved events during a backfill. Also ensure that checkpoint frequency
// and size are large enough to induce several checkpoints when
// writing `rowCount` rows.
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)

emittedCount := 0
knobs.RaiseRetryableError = func() error {
emittedCount++
if emittedCount%200 == 0 {
return changefeedbase.MarkRetryableError(errors.New("test transient error"))
}
return nil
}

foo := feed(t, f, `CREATE CHANGEFEED FOR TABLE foo`)
defer closeFeed(t, foo)

payloads := make([]string, rowCount+1)
for i := 0; i < rowCount+1; i++ {
payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i)
}
assertPayloads(t, foo, payloads)
}

cdcTest(t, testFn, feedTestForceSink("sinkless"))
}

func TestCheckpointFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -5765,6 +5823,7 @@ func TestCheckpointFrequency(t *testing.T) {
ts := timeutil.NewManualTime(timeutil.Now())
js := newJobState(
nil, /* job */
nil, /* core progress */
cluster.MakeTestingClusterSettings(),
MakeMetrics(time.Second).(*Metrics), ts,
)
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
Loading

0 comments on commit 5789fac

Please sign in to comment.