Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81782: sql: use declarative schemachange for add column sequence exprs r=fqazi a=fqazi

Fixes: #81781

Previously, the declarative schema changer was disabled for
add column expressions with sequence references (i.e. default,
on update, computed) because we were missing telemetry
from the legacy schema changer for during backfill related failures.
This was inadequate because the lack of telemetry from these
areas could be seen as a usability issue. To address this,
this patch adds in support for missing telemetry and enables
supports for add column operations with sequence operations.

Release note: None

82388: sql: remove date/intervalstyle_enabled from code r=otan,mgartner a=rafiss

fixes #81529
 
No release note is needed, since v22.1 already included a release note
about how these are both hardcoded to true and cannot be changed.

The session variables and cluster settings remain in the code, but are
marked as retired.

Release note: None

82451: kvserver/rangefeed: fix off-by-one in `NewCatchUpIterator()` r=miretskiy a=erikgrinaker

**kvserver/rangefeed: fix off-by-one in NewCatchUpIterator()**

`NewCatchUpIterator` created the `MVCCIncrementalIterator` with
`RangeFeedRequest.Header.Timestamp.Prev()`, because it assumed the given
timestamp was inclusive. However, the rest of the rangefeed code treats
this timestamp as exclusive.

This patch uses the correct, exclusive timestamp when creating the
`MVCCIncrementalIterator`. This change has no externally visible effect:
even though the previous code would cause `MVCCIncrementalIterator` to emit
keys at `Timestamp`, the `CatchUpScan()` method would discard those
events if they were at or below `Timestamp`.

An integration test is also added to verify that the start timestamp of
a rangefeed catchup scan is exclusive. The test passes both with the new
and the old code, as expected.

Release note: None

**rangefeed: emphasize that start time is exclusive**

This patch adds comments for rangefeed-related APIs and components
emphasizing that the start timestamp of rangefeeds is exclusive. In
other words, the first possible emitted event (including catchup scans)
will be at `RangeFeedRequest.Header.Timestamp.Next()`. Several
parameters are also renamed to emphasize this.

There are no functional or semantic changes.

Touches #82488.

Release note: None

82466: dev: have `doctor` advise to set a particular `tmpdir` r=rail a=rickystewart

Bazel's default behavior of rooting the `tmpdir` to an "in-sandbox"
directory has been a point of confusion for CRL developers. The sandbox
directory does not exist after the test is run (unless `--sandbox_debug`
is provided), which is sometimes confusing for folks who expect their
test's temp files to be present where the logs suggest they should be
(see #82413). Furthermore, the long `tmpdir` used in these cases breaks
tests that create Unix sockets on OS's where Unix sockets have a maximum
path length.

Avoid these problems by having `doctor` just tell you to manually set a
`test_tmpdir`. We add `/tmp` to `gitignore` in case people want to root
it at the `tmp` directory in their checkout.

Closes #72918.
Closes #82413.

Release note: None

82476: update cluster-ui to v22.2.0-prerelease-1 r=maryliag a=maryliag

Update cluster-ui to latest published version

Release note: None

82519: stats: fix flaky test TestDefaultColumns r=rytaft a=rytaft

TestDefaultColumns creates statistics on a table with 110 columns
using the command `CREATE STATISTICS s FROM t.a`. It then checks
that there are exactly 101 column statistics on table t.a with
statistics_name = 's' (one stat for the primary index, plus 100
other column stats). However, this test may be flaky if automatic
statistics are running, since each new automatic stat will cause
other stats to be deleted.

Although the test disables automatic stats at the beginning, it seems
that some sort of race condition may cause it to be reenabled. This
commit fixes the problem by disabling automatic stats using the new table
level settings, ensuring that the 101 column stats are not deleted after
they have been created.

Fixes #81513

Release note: None

Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
7 people committed Jun 7, 2022
7 parents 6098b1f + 9a9fe00 + 1fa2c77 + a24a7bb + 538a186 + 486dcc3 + a8451fc commit 02cc393
Show file tree
Hide file tree
Showing 75 changed files with 2,062 additions and 1,243 deletions.
1 change: 1 addition & 0 deletions .bazelignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ bin
build/builder_home
lib
pkg/ui/node_modules
tmp
vendor
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ artifacts
/bin.*
/lib
/lib.*
/tmp
.buildinfo
# cockroach-data, cockroach{,.race}-{darwin,linux,windows}-*
/cockroach*
Expand Down
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=34
DEV_VERSION=35

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func createImportingDescriptors(
) (*restorationDataBase, *mainRestorationData, error) {
details := r.job.Details().(jobspb.RestoreDetails)

var allMutableDescs []catalog.MutableDescriptor
var databases []catalog.DatabaseDescriptor
var writtenTypes []catalog.TypeDescriptor
var schemas []*schemadesc.Mutable
Expand Down Expand Up @@ -706,19 +707,23 @@ func createImportingDescriptors(
}
tables = append(tables, mut)
mutableTables = append(mutableTables, mut)
allMutableDescs = append(allMutableDescs, mut)
oldTableIDs = append(oldTableIDs, mut.GetID())
case catalog.DatabaseDescriptor:
if _, ok := details.DescriptorRewrites[desc.GetID()]; ok {
mut := dbdesc.NewBuilder(desc.DatabaseDesc()).BuildCreatedMutableDatabase()
databases = append(databases, mut)
mutableDatabases = append(mutableDatabases, mut)
allMutableDescs = append(allMutableDescs, mut)
}
case catalog.SchemaDescriptor:
mut := schemadesc.NewBuilder(desc.SchemaDesc()).BuildCreatedMutableSchema()
schemas = append(schemas, mut)
allMutableDescs = append(allMutableDescs, mut)
case catalog.TypeDescriptor:
mut := typedesc.NewBuilder(desc.TypeDesc()).BuildCreatedMutableType()
types = append(types, mut)
allMutableDescs = append(allMutableDescs, mut)
}
}

Expand Down Expand Up @@ -814,6 +819,12 @@ func createImportingDescriptors(
return nil, nil, err
}

// Finally, clean up / update any schema changer state inside descriptors
// globally.
if err := rewrite.MaybeClearSchemaChangerStateInDescs(allMutableDescs); err != nil {
return nil, nil, err
}

// Set the new descriptors' states to offline.
for _, desc := range mutableTables {
desc.SetOffline("restoring")
Expand Down
12 changes: 5 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
// Imported to allow multi-tenant tests
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
// Imported to allow locality-related table mutations
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // multi-tenant tests
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" // locality-related table mutations
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // registers cloud storage providers
Expand Down Expand Up @@ -1292,16 +1290,16 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
setErr := func(stp kvcoord.SpanTimePair, expectedTS hlc.Timestamp) {
incorrectCheckpointErr = errors.Newf(
"rangefeed for span %s expected to start @%s, started @%s instead",
stp.Span, expectedTS, stp.TS)
stp.Span, expectedTS, stp.StartAfter)
}

for _, sp := range spans {
if laggingSpans.Encloses(sp.Span) {
if !sp.TS.Equal(cursor) {
if !sp.StartAfter.Equal(cursor) {
setErr(sp, cursor)
}
} else {
if !sp.TS.Equal(checkpointTS) {
if !sp.StartAfter.Equal(checkpointTS) {
setErr(sp, checkpointTS)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (f *kvFeed) runUntilTableEvent(

var stps []kvcoord.SpanTimePair
resumeFrontier.Entries(func(s roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
stps = append(stps, kvcoord.SpanTimePair{Span: s, TS: ts})
stps = append(stps, kvcoord.SpanTimePair{Span: s, StartAfter: ts})
return span.ContinueMatch
})

Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,10 @@ func (f rawEventFeed) run(
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
) error {
var startFrom hlc.Timestamp
var startAfter hlc.Timestamp
for _, s := range spans {
if startFrom.IsEmpty() || s.TS.Less(startFrom) {
startFrom = s.TS
if startAfter.IsEmpty() || s.StartAfter.Less(startAfter) {
startAfter = s.StartAfter
}
}

Expand All @@ -405,8 +405,8 @@ func (f rawEventFeed) run(
var i int
for i = range f {
ev := f[i]
if ev.Val != nil && startFrom.LessEq(ev.Val.Value.Timestamp) ||
ev.Checkpoint != nil && startFrom.LessEq(ev.Checkpoint.ResolvedTS) {
if ev.Val != nil && startAfter.LessEq(ev.Val.Value.Timestamp) ||
ev.Checkpoint != nil && startAfter.LessEq(ev.Checkpoint.ResolvedTS) {
break
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/cli/interactive_tests/test_timing.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ end_test

start_test "Check that server times also work if IntervalStyle is different"
# regression test for issue #67618.
send "set intervalstyle_enabled = 'on';\r"
eexpect "SET"
eexpect root@
send "set IntervalStyle = 'iso_8601';\r"
eexpect "SET"
eexpect root@
Expand Down
27 changes: 26 additions & 1 deletion pkg/cmd/dev/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// doctorStatusVersion is the current "version" of the status checks performed
// by `dev doctor``. Increasing it will force doctor to be re-run before other
// dev commands can be run.
doctorStatusVersion = 4
doctorStatusVersion = 5

noCacheFlag = "no-cache"
)
Expand Down Expand Up @@ -287,6 +287,15 @@ slightly slower and introduce a noticeable delay in first-time build setup.`
log.Println(failedNogoTestMsg)
}

// Check whether the user has configured a custom tmpdir.
present := d.checkLinePresenceInBazelRcUser(workspace, "test --test_tmpdir=")
if !present {
failures = append(failures, "You haven't configured a tmpdir for your tests.\n"+
"Please add a `test --test_tmpdir=/PATH/TO/TMPDIR` line to your .bazelrc.user:\n"+
fmt.Sprintf(" echo \"test --test_tmpdir=%s\" >> .bazelrc.user\n", filepath.Join(workspace, "tmp"))+
"(You can choose any directory as a tmpdir.)")
}

// We want to make sure there are no other failures before trying to
// set up the cache.
if !noCache && len(failures) == 0 {
Expand Down Expand Up @@ -359,3 +368,19 @@ func (d *dev) checkPresenceInBazelRc(expectedBazelRcLine string) (string, error)
}
return errString, nil
}

// checkLinePresenceInBazelRcUser checks whether the .bazelrc.user file
// contains a line starting with the given prefix. Returns true iff a matching
// line is in the file. Failures to find the file are ignored.
func (d *dev) checkLinePresenceInBazelRcUser(workspace, expectedSubstr string) bool {
contents, err := d.os.ReadFile(filepath.Join(workspace, ".bazelrc.user"))
if err != nil {
return false
}
for _, line := range strings.Split(contents, "\n") {
if strings.HasPrefix(line, expectedSubstr) {
return true
}
}
return false
}
55 changes: 30 additions & 25 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (
)

type singleRangeInfo struct {
rs roachpb.RSpan
startFrom hlc.Timestamp
token rangecache.EvictionToken
rs roachpb.RSpan
startAfter hlc.Timestamp
token rangecache.EvictionToken
}

var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
Expand Down Expand Up @@ -75,27 +75,32 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {
//
// Note that the timestamps in RangeFeedCheckpoint events that are streamed back
// may be lower than the timestamp given here.
//
// NB: the given startAfter timestamp is exclusive, i.e. the first possible
// emitted event (including catchup scans) will be at startAfter.Next().
func (ds *DistSender) RangeFeed(
ctx context.Context,
spans []roachpb.Span,
startFrom hlc.Timestamp,
startAfter hlc.Timestamp, // exclusive
withDiff bool,
eventCh chan<- *roachpb.RangeFeedEvent,
) error {
timedSpans := make([]SpanTimePair, 0, len(spans))
for _, sp := range spans {
timedSpans = append(timedSpans, SpanTimePair{
Span: sp,
TS: startFrom,
Span: sp,
StartAfter: startAfter,
})
}
return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh)
}

// SpanTimePair is a pair of span along with its starting time.
// SpanTimePair is a pair of span along with its starting time. The starting
// time is exclusive, i.e. the first possible emitted event (including catchup
// scans) will be at startAfter.Next().
type SpanTimePair struct {
Span roachpb.Span
TS hlc.Timestamp
Span roachpb.Span
StartAfter hlc.Timestamp // exclusive
}

// RangeFeedSpans is similar to RangeFeed but allows specification of different
Expand Down Expand Up @@ -128,7 +133,7 @@ func (ds *DistSender) RangeFeedSpans(
case sri := <-rangeCh:
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startFrom, sri.token, withDiff, &catchupSem, rangeCh, eventCh)
return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, sri.token, withDiff, &catchupSem, rangeCh, eventCh)
})
case <-ctx.Done():
return ctx.Err()
Expand All @@ -144,7 +149,7 @@ func (ds *DistSender) RangeFeedSpans(
if err != nil {
return err
}
return ds.divideAndSendRangeFeedToRanges(ctx, rs, stp.TS, rangeCh)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, stp.StartAfter, rangeCh)
})
}(s)
}
Expand All @@ -164,7 +169,7 @@ type RangeFeedContext struct {
// PartialRangeFeed structure describes the state of currently executing partial range feed.
type PartialRangeFeed struct {
Span roachpb.Span
StartTS hlc.Timestamp
StartAfter hlc.Timestamp // exclusive
NodeID roachpb.NodeID
RangeID roachpb.RangeID
CreatedTime time.Time
Expand Down Expand Up @@ -249,7 +254,7 @@ func newRangeFeedRegistry(ctx context.Context, withDiff bool) *rangeFeedRegistry
}

func (ds *DistSender) divideAndSendRangeFeedToRanges(
ctx context.Context, rs roachpb.RSpan, startFrom hlc.Timestamp, rangeCh chan<- singleRangeInfo,
ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, rangeCh chan<- singleRangeInfo,
) error {
// As RangeIterator iterates, it can return overlapping descriptors (and
// during splits, this happens frequently), but divideAndSendRangeFeedToRanges
Expand All @@ -267,9 +272,9 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
nextRS.Key = partialRS.EndKey
select {
case rangeCh <- singleRangeInfo{
rs: partialRS,
startFrom: startFrom,
token: ri.Token(),
rs: partialRS,
startAfter: startAfter,
token: ri.Token(),
}:
case <-ctx.Done():
return ctx.Err()
Expand All @@ -289,7 +294,7 @@ func (ds *DistSender) partialRangeFeed(
ctx context.Context,
rr *rangeFeedRegistry,
rs roachpb.RSpan,
startFrom hlc.Timestamp,
startAfter hlc.Timestamp,
token rangecache.EvictionToken,
withDiff bool,
catchupSem *limit.ConcurrentRequestLimiter,
Expand All @@ -303,7 +308,7 @@ func (ds *DistSender) partialRangeFeed(
active := &activeRangeFeed{
PartialRangeFeed: PartialRangeFeed{
Span: span,
StartTS: startFrom,
StartAfter: startAfter,
CreatedTime: timeutil.Now(),
},
}
Expand All @@ -329,16 +334,16 @@ func (ds *DistSender) partialRangeFeed(
}

// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(ctx, span, startFrom, withDiff, token.Desc(),
maxTS, err := ds.singleRangeFeed(ctx, span, startAfter, withDiff, token.Desc(),
catchupSem, eventCh, active.onRangeEvent)

// Forward the timestamp in case we end up sending it again.
startFrom.Forward(maxTS)
startAfter.Forward(maxTS)

if err != nil {
if log.V(1) {
log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v",
span, timeutil.Since(startFrom.GoTime()), err)
span, timeutil.Since(startAfter.GoTime()), err)
}
switch {
case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) ||
Expand All @@ -354,7 +359,7 @@ func (ds *DistSender) partialRangeFeed(
case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)):
// Evict the descriptor from the cache.
token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh)
case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)):
var t *roachpb.RangeFeedRetryError
if ok := errors.As(err, &t); !ok {
Expand All @@ -373,7 +378,7 @@ func (ds *DistSender) partialRangeFeed(
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
// Evict the descriptor from the cache.
token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh)
default:
return errors.AssertionFailedf("unrecognized retryable error type: %T", err)
}
Expand All @@ -398,7 +403,7 @@ type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *
func (ds *DistSender) singleRangeFeed(
ctx context.Context,
span roachpb.Span,
startFrom hlc.Timestamp,
startAfter hlc.Timestamp,
withDiff bool,
desc *roachpb.RangeDescriptor,
catchupSem *limit.ConcurrentRequestLimiter,
Expand All @@ -412,7 +417,7 @@ func (ds *DistSender) singleRangeFeed(
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
Timestamp: startFrom,
Timestamp: startAfter,
RangeID: desc.RangeID,
},
WithDiff: withDiff,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func newFactory(stopper *stop.Stopper, client DB, knobs *TestingKnobs) *Factory
//
// The only error which can be returned will indicate that the server is being
// shut down.
//
// NB: for the rangefeed itself, initialTimestamp is exclusive, i.e. the first
// possible event emitted by the server (including the catchup scan) is at
// initialTimestamp.Next(). This follows from the gRPC API semantics. However,
// the initial scan (if any) is run at initialTimestamp.
func (f *Factory) RangeFeed(
ctx context.Context,
name string,
Expand Down
Loading

0 comments on commit 02cc393

Please sign in to comment.