Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81310: sql: Remove GRANT privilege r=RichardJCai a=ecwall

The GRANT privilege has been replaced by the more granular WITH GRANT OPTION
that gives control over which privileges are allowed to be granted.

Release note (sql change): Remove deprecated GRANT privilege.

81844: opt: use expr and input fds to remap index join columns r=rharding6373 a=rharding6373

Before this change index joins would use their own FDs to remap input to
output columns if there was a required ordering. This could miss some
cases where the index joins' FDs were generated from another expression
in the memo group and did not include FDs from its own input, such as
equivalent columns only found the index joins' input. This change
modifies the FDs used for remapping columns to include both the index
join and input FDs.

Fixes: #81649

Release note (bug fix): Index joins now consider functional dependencies
from their input when determining equivalent columns instead of
returning an internal error.



82452: kvserver/rangefeed: move `CatchUpScan()` params to constructor r=stevendanna,miretskiy a=erikgrinaker

`CatchUpIterator` took keyspan and start time parameters both in the
constructor and when calling `CatchUpScan()`. This wasn't safe, because
the iterator could have been constructed with bounds that would not
satisfy the parameters passed to the scan -- for example, if passing a
wider key span or lower start time -- in which case the scan would omit
values.

This patch removes the keyspan and start time parameters to
`CatchUpScan()`, such that the caller must provide them during
construction

Release note: None

Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
4 people committed Jun 7, 2022
4 parents abc8afd + 76d79b0 + f9b2f5d + 590da0d commit 1fd6c45
Show file tree
Hide file tree
Showing 72 changed files with 939 additions and 2,001 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-10 set the active cluster version in the format '<major>.<minor>'
version version 22.1-12 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-12</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4595,7 +4595,7 @@ func TestRestoredPrivileges(t *testing.T) {
// Explicitly don't restore grants when just restoring a database since we
// cannot ensure that the same users exist in the restoring cluster.
data2Grants := sqlDB.QueryStr(t, `SHOW GRANTS ON DATABASE data2`)
sqlDB.Exec(t, `GRANT CONNECT, CREATE, DROP, GRANT, ZONECONFIG ON DATABASE data2 TO someone`)
sqlDB.Exec(t, `GRANT CONNECT, CREATE, DROP, ZONECONFIG ON DATABASE data2 TO someone`)

withGrants := sqlDB.QueryStr(t, `SHOW GRANTS ON data.bank`)

Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,23 +439,23 @@ func restoreV201ZoneconfigPrivilegeTest(exportDir string) func(t *testing.T) {
require.NoError(t, err)
sqlDB.Exec(t, `RESTORE FROM $1`, localFoo)
testDBGrants := [][]string{
{"test", "admin", "ALL", "true"},
{"test", "root", "ALL", "true"},
{"test", "admin", "ALL", "false"},
{"test", "root", "ALL", "false"},
{"test", "testuser", "ZONECONFIG", "false"},
}
sqlDB.CheckQueryResults(t, `show grants on database test`, testDBGrants)

testTableGrants := [][]string{
{"test", "public", "test_table", "admin", "ALL", "true"},
{"test", "public", "test_table", "root", "ALL", "true"},
{"test", "public", "test_table", "admin", "ALL", "false"},
{"test", "public", "test_table", "root", "ALL", "false"},
{"test", "public", "test_table", "testuser", "ZONECONFIG", "false"},
}
sqlDB.CheckQueryResults(t, `show grants on test.test_table`, testTableGrants)

testTable2Grants := [][]string{
{"test", "public", "test_table2", "admin", "ALL", "true"},
{"test", "public", "test_table2", "root", "ALL", "true"},
{"test", "public", "test_table2", "testuser", "ALL", "true"},
{"test", "public", "test_table2", "admin", "ALL", "false"},
{"test", "public", "test_table2", "root", "ALL", "false"},
{"test", "public", "test_table2", "testuser", "ALL", "false"},
}
sqlDB.CheckQueryResults(t, `show grants on test.test_table2`, testTable2Grants)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ GRANT UPDATE ON top_secret TO agent_bond;
{`mi5`, `database`, `GRANT ALL ON mi5 TO admin; GRANT ALL ` +
`ON mi5 TO agents; GRANT CONNECT ON mi5 TO public; GRANT ALL ON mi5 TO root; `, `root`},
{`public`, `schema`, `GRANT ALL ON public TO admin; GRANT CREATE, USAGE ON public TO public; GRANT ALL ON public TO root; `, `admin`},
{`locator`, `schema`, `GRANT ALL ON locator TO admin; GRANT CREATE, GRANT ON locator TO agent_bond; GRANT ALL ON locator TO m; ` +
{`locator`, `schema`, `GRANT ALL ON locator TO admin; GRANT CREATE ON locator TO agent_bond; GRANT ALL ON locator TO m; ` +
`GRANT ALL ON locator TO root; `, `root`},
{`continent`, `type`, `GRANT ALL ON continent TO admin; GRANT GRANT ON continent TO agent_bond; GRANT ALL ON continent TO m; GRANT USAGE ON continent TO public; GRANT ALL ON continent TO root; `, `root`},
{`continent`, `type`, `GRANT ALL ON continent TO admin; GRANT ALL ON continent TO m; GRANT USAGE ON continent TO public; GRANT ALL ON continent TO root; `, `root`},
{`_continent`, `type`, `GRANT ALL ON _continent TO admin; GRANT USAGE ON _continent TO public; GRANT ALL ON _continent TO root; `, `root`},
{`agent_locations`, `table`, `GRANT ALL ON agent_locations TO admin; ` +
`GRANT SELECT ON agent_locations TO agent_bond; GRANT UPDATE ON agent_locations TO agents; ` +
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ const (
// on strings.
TrigramInvertedIndexes

// RemoveGrantPrivilege is the last step to migrate from the GRANT privilege to WITH GRANT OPTION.
RemoveGrantPrivilege

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -646,6 +649,10 @@ var versionsSingleton = keyedVersions{
Key: TrigramInvertedIndexes,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 10},
},
{
Key: RemoveGrantPrivilege,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 12},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 23 additions & 24 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
"github.com/cockroachdb/errors"
)

// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
simpleCatchupIter
close func()
}

// simpleCatchupIter is an extension of SimpleMVCCIterator that allows for the
// primary iterator to be implemented using a regular MVCCIterator or a
// (often) more efficient MVCCIncrementalIterator. When the caller wants to
Expand All @@ -52,19 +46,28 @@ func (i simpleCatchupIterAdapter) NextIgnoringTime() {

var _ simpleCatchupIter = simpleCatchupIterAdapter{}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
// CatchUpIterator is an iterator for catchup-scans.
type CatchUpIterator struct {
simpleCatchupIter
close func()
span roachpb.Span
startTime hlc.Timestamp // exclusive
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader over the
// given key/time span. startTime is exclusive.
//
// NB: The start timestamp given in args.Header.Timestamp is exclusive, i.e. the
// first possible event will be emitted at Timestamp.Next().
// NB: startTime is exclusive, i.e. the first possible event will be emitted at
// Timestamp.Next().
func NewCatchUpIterator(
reader storage.Reader, args *roachpb.RangeFeedRequest, closer func(),
reader storage.Reader, span roachpb.Span, startTime hlc.Timestamp, closer func(),
) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
StartTime: args.Timestamp,
EndKey: span.EndKey,
StartTime: startTime,
EndTime: hlc.MaxTimestamp,
// We want to emit intents rather than error
// (the default behavior) so that we can skip
Expand All @@ -78,7 +81,9 @@ func NewCatchUpIterator(
// still needed (#69357).
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
}),
close: closer,
close: closer,
span: span,
startTime: startTime,
}
}

Expand All @@ -96,15 +101,9 @@ func (i *CatchUpIterator) Close() {
// returns. However, we may revist this in #69596.
type outputEventFn func(e *roachpb.RangeFeedEvent) error

// CatchUpScan iterates over all changes for the given span of keys,
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn. catchUpTimestamp is exclusive.
func (i *CatchUpIterator) CatchUpScan(
startKey, endKey storage.MVCCKey,
catchUpTimestamp hlc.Timestamp,
withDiff bool,
outputFn outputEventFn,
) error {
// CatchUpScan iterates over all changes in the configured key/time span, and
// emits them as RangeFeedEvents via outputFn in chronological order.
func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error {
var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
Expand Down Expand Up @@ -139,7 +138,7 @@ func (i *CatchUpIterator) CatchUpScan(
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
i.SeekGE(startKey)
i.SeekGE(storage.MVCCKey{Key: i.span.Key})
for {
if ok, err := i.Valid(); err != nil {
return err
Expand Down Expand Up @@ -198,7 +197,7 @@ func (i *CatchUpIterator) CatchUpScan(
// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ts := unsafeKey.Timestamp
ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts))
ignore := !(ts.IsEmpty() || i.startTime.Less(ts))
if ignore && !withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,13 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
func() {
iter := rangefeed.NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: opts.ts,
},
WithDiff: opts.withDiff,
Span: span,
}, func() {})
iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil)
defer iter.Close()
counter := 0
err := iter.CatchUpScan(storage.MakeMVCCMetadataKey(startKey), storage.MakeMVCCMetadataKey(endKey), opts.ts, opts.withDiff, func(*roachpb.RangeFeedEvent) error {
err := iter.CatchUpScan(func(*roachpb.RangeFeedEvent) error {
counter++
return nil
})
}, opts.withDiff)
if err != nil {
b.Fatalf("failed catchUp scan: %+v", err)
}
Expand Down
21 changes: 6 additions & 15 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,15 @@ func TestCatchupScan(t *testing.T) {
t.Fatal(err)
}
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: ts1, // exclusive
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, nil)
span := roachpb.Span{Key: testKey1, EndKey: roachpb.KeyMax}
iter := NewCatchUpIterator(eng, span, ts1, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}, withDiff))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,11 @@ func NewProcessor(cfg Config) *Processor {
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that
// can be used for catchup-scans. It should be called from underneath
// a stopper task to ensure that the engine has not been closed.
//
// The constructed iterator must have an UpperBound set.
type CatchUpIteratorConstructor func() *CatchUpIterator
// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) *CatchUpIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -371,10 +370,7 @@ func (r *registration) maybeRunCatchUpScan() error {
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

startKey := storage.MakeMVCCMetadataKey(r.span.Key)
endKey := storage.MakeMVCCMetadataKey(r.span.EndKey)

return catchUpIter.CatchUpScan(startKey, endKey, r.catchUpTimestamp, r.withDiff, r.stream.Send)
return catchUpIter.CatchUpScan(r.stream.Send, r.withDiff)
}

// ID implements interval.Interface.
Expand Down Expand Up @@ -560,7 +556,7 @@ func (r *registration) maybeConstructCatchUpIter() {
return
}

catchUpIter := r.catchUpIterConstructor()
catchUpIter := r.catchUpIterConstructor(r.span, r.catchUpTimestamp)
r.catchUpIterConstructor = nil

r.mu.Lock()
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIter
if iter == nil {
return nil
}
return func() *CatchUpIterator {
return &CatchUpIterator{simpleCatchupIter: simpleCatchupIterAdapter{iter}}
return func(span roachpb.Span, startTime hlc.Timestamp) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,11 @@ func (r *Replica) rangeFeedWithRangeID(
// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.CatchUpIteratorConstructor
if usingCatchUpIter {
catchUpIterFunc = func() *rangefeed.CatchUpIterator {
catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) *rangefeed.CatchUpIterator {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
return rangefeed.NewCatchUpIterator(r.Engine(), args, iterSemRelease)
return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease)
}
}
p := r.registerWithRangefeedRaftMuLocked(
Expand Down
Loading

0 comments on commit 1fd6c45

Please sign in to comment.