Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#61371 cockroachdb#61374 cockroachdb#61424 cockroachdb#61428

59507: kvserver, kvclient: allow Rangefeeds to run over non-voting replicas r=aayushshah15 a=aayushshah15

Fixes cockroachdb#59454.

Release justification: low risk high benefit change to be able to fuel CDC streams using non-voting replicas

Release note: None

60823: docs: Add ui ARCHITECTURE.md doc r=dhartunian a=dhartunian

Add an ARCHITECTURE doc to /pkg/ui to help developers onboard to the
project.

The doc contains pointers on general React and Redux knowledge and links
on where best to acquire it as well as a dataflow diagram and some FAQs
on how data flows through the application, which is likely the part
that's hardest to currently understand.

Release note: None

61345: sql: alter primary key is not idempotent r=fqazi a=fqazi

Fixes: cockroachdb#59307

Previously, issuing an alter primary key with the exact same
definition as the current primary key would cause expensive
indexes to be recreated even when there was no logical change.
There was overhead involved in creating these indexes and as
a side effect existing indexes for the primary key would also
get needlessly renamed. To address this, this patch uses the
AST node for alter primary key to determine if the requested
change is logically the same before any operation is executed.
If its logically the same then it becomes a no-op.

Release justification: This is a low risk change with good benefit
to the user base by reducing extra indexes getting made and rename
operations.

Release note (bug fix): Alter primary key was not idempotent, so
logical equivalent changes to primary keys would unnecessarily
create new indexes.

61371: Update CONTRIBUTING.md r=RaduBerinde a=RaduBerinde

Minor typo fix.

Release justification: non-production code change.

Release note: None

61374: backupccl: skip more TestProtectedTimestampSpanSelectionDuringBackup subtests r=RaduBerinde a=RaduBerinde

Informs cockroachdb#57546.

Release justification: non-production code change.

Release note: None

61424: roachtest: install GEOS libraries for activerecord tests r=rafiss a=otan

Release justification: non-production code change

Release note: None

61428: sql: use correct FuncExpr when encoding sequences r=the-ericwang35 a=the-ericwang35

Previously, when encoding sequences by swapping sequence
names for IDs, we were always wrapping the sequence in
a nextval func. This is incorrect, and instead
we should wrap the sequence in whatever function
it came in before this encoding. This patch makes
this change.

Release justification: bug fix for new functionality
Release note (bug fix): use correct FuncExpr when encoding sequences.

Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: RaduBerinde <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Eric Wang <[email protected]>
  • Loading branch information
7 people committed Mar 3, 2021
8 parents 99b543e + 839de54 + eb177d3 + 03319af + 745c4c1 + 514f2ab + 69d4b67 + c5df5ec commit 44ea98b
Show file tree
Hide file tree
Showing 12 changed files with 533 additions and 28 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ For tech writers and docs enthusiasts | Help improve CockroachDB docs: [List of

## Contributor Guidelines

Our contributor guidelines are available on [the public wiki at **wiki.crdb.io**(https://wiki.crdb.io/wiki/spaces/CRDB/pages/73204033/Contributing+to+CockroachDB).
Our contributor guidelines are available on [the public wiki at **wiki.crdb.io**](https://wiki.crdb.io/wiki/spaces/CRDB/pages/73204033/Contributing+to+CockroachDB).

At this location, we share our team guidelines and knowledge base
regarding:
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ func TestBackupRestoreNegativePrimaryKey(t *testing.T) {
backupAndRestore(ctx, t, tc, []string{LocalFoo}, []string{LocalFoo}, numAccounts)

sqlDB.Exec(t, `CREATE UNIQUE INDEX id2 ON data.bank (id)`)
sqlDB.Exec(t, `ALTER TABLE data.bank ALTER PRIMARY KEY USING COLUMNS(id)`)

var unused string
var exportedRows, exportedIndexEntries int
Expand All @@ -838,7 +837,7 @@ func TestBackupRestoreNegativePrimaryKey(t *testing.T) {
if exportedRows != numAccounts {
t.Fatalf("expected %d rows, got %d", numAccounts, exportedRows)
}
expectedIndexEntries := numAccounts * 3 // old PK, new and old secondary idx
expectedIndexEntries := numAccounts * 2 // Indexes id2 and balance_idx
if exportedIndexEntries != expectedIndexEntries {
t.Fatalf("expected %d index entries, got %d", expectedIndexEntries, exportedIndexEntries)
}
Expand Down Expand Up @@ -5868,6 +5867,7 @@ func TestProtectedTimestampSpanSelectionDuringBackup(t *testing.T) {
})

t.Run("interleaved-spans", func(t *testing.T) {
skip.WithIssue(t, 57546, "flaky test")
runner.Exec(t, "CREATE DATABASE test; USE test;")
runner.Exec(t, "CREATE TABLE grandparent (a INT PRIMARY KEY, v BYTES, INDEX gpindex (v))")
runner.Exec(t, "CREATE TABLE parent (a INT, b INT, v BYTES, "+
Expand All @@ -5886,6 +5886,7 @@ func TestProtectedTimestampSpanSelectionDuringBackup(t *testing.T) {
})

t.Run("revs-span-merge", func(t *testing.T) {
skip.WithIssue(t, 57546, "flaky test")
runner.Exec(t, "CREATE DATABASE test; USE test;")
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES, name STRING, "+
"INDEX baz(name), INDEX bar (v))")
Expand Down Expand Up @@ -5919,6 +5920,7 @@ func TestProtectedTimestampSpanSelectionDuringBackup(t *testing.T) {
})

t.Run("last-index-dropped", func(t *testing.T) {
skip.WithIssue(t, 57546, "flaky test")
runner.Exec(t, "CREATE DATABASE test; USE test;")
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES, name STRING, INDEX baz(name))")
runner.Exec(t, "CREATE TABLE foo2 (k INT PRIMARY KEY, v BYTES, name STRING, INDEX baz(name))")
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/activerecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func registerActiveRecord(r *testRegistry) {
node := c.Node(1)
t.Status("setting up cockroach")
c.Put(ctx, cockroach, "./cockroach", c.All())
if err := c.PutLibraries(ctx, "./lib"); err != nil {
t.Fatal(err)
}
c.Start(ctx, t, c.All())

version, err := fetchCockroachVersion(ctx, c, node[0])
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
// TODO(aayush): We should enable creating RangeFeeds on non-voting replicas.
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas)
if err != nil {
return args.Timestamp, err
}
Expand All @@ -256,7 +255,7 @@ func (ds *DistSender) singleRangeFeed(
log.VErrEventf(ctx, 2, "RPC error: %s", err)
continue
}

log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica)
stream, err := client.RangeFeed(clientCtx, &args)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -182,3 +184,61 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) {
cancel()
require.Regexp(t, context.Canceled.Error(), <-rangefeedErrChan)
}

func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
clusterArgs := aggressiveResolvedTimestampClusterArgs
// We want to manually add a non-voter to a range in this test, so disable
// the replicateQueue to prevent it from disrupting the test.
clusterArgs.ReplicationMode = base.ReplicationManual
// NB: setupClusterForClosedTSTesting sets a low closed timestamp target
// duration.
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration,
testingCloseFraction, clusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))

db := tc.Server(1).DB()
ds := tc.Server(1).DistSenderI().(*kvcoord.DistSender)
_, err := tc.ServerConn(1).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)

startTS := db.Clock().Now()
rangefeedCtx, rangefeedCancel := context.WithCancel(ctx)
rangefeedCtx, getRec, cancel := tracing.ContextWithRecordingSpan(rangefeedCtx,
tracing.NewTracer(),
"rangefeed over non-voter")
defer cancel()

// Do a read on the range to make sure that the dist sender learns about the
// latest state of the range (with the new non-voter).
_, err = db.Get(ctx, desc.StartKey.AsRawKey())
require.NoError(t, err)

rangefeedErrChan := make(chan error, 1)
eventCh := make(chan *roachpb.RangeFeedEvent, 1000)
go func() {
rangefeedErrChan <- ds.RangeFeed(
rangefeedCtx,
desc.RSpan().AsRawSpanWithNoLocals(),
startTS,
false, /* withDiff */
eventCh,
)
}()

// Wait for an event to ensure that the rangefeed is set up.
select {
case <-eventCh:
case err := <-rangefeedErrChan:
t.Fatalf("rangefeed failed with %s", err)
case <-time.After(60 * time.Second):
t.Fatalf("rangefeed initialization took too long")
}
rangefeedCancel()
require.Regexp(t, "context canceled", <-rangefeedErrChan)
require.Regexp(t, "attempting to create a RangeFeed over replica.*2NON_VOTER", getRec().String())
}
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestReplicaRangefeed(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
const numNodes = 5
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: make(map[int]base.TestServerArgs, numNodes),
Expand Down Expand Up @@ -116,6 +116,7 @@ func TestReplicaRangefeed(t *testing.T) {
startKey := []byte("a")
tc.SplitRangeOrFatal(t, startKey)
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
tc.AddNonVotersOrFatal(t, startKey, tc.Target(3), tc.Target(4))
if pErr := tc.WaitForVoters(startKey, tc.Target(1), tc.Target(2)); pErr != nil {
t.Fatalf("Unexpected error waiting for replication: %v", pErr)
}
Expand All @@ -128,13 +129,12 @@ func TestReplicaRangefeed(t *testing.T) {
if _, pErr := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts1}, incArgs); pErr != nil {
t.Fatal(pErr)
}
tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9})
tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9, 9, 9})

replNum := 3
streams := make([]*testStream, replNum)
streamErrC := make(chan *roachpb.Error, replNum)
streams := make([]*testStream, numNodes)
streamErrC := make(chan *roachpb.Error, numNodes)
rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}
for i := 0; i < replNum; i++ {
for i := 0; i < numNodes; i++ {
stream := newTestStream()
streams[i] = stream
ts := tc.Servers[i]
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestReplicaRangefeed(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
for i := 0; i < replNum; i++ {
for i := 0; i < numNodes; i++ {
ts := tc.Servers[i]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
Expand Down
109 changes: 109 additions & 0 deletions pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -148,6 +149,19 @@ func (p *planner) AlterPrimaryKey(
)
}

// Validate if the end result is the same as the current
// primary index, which would mean nothing needs to be modified
// here.
{
requiresIndexChange, err := p.shouldCreateIndexes(ctx, tableDesc, alterPKNode, alterPrimaryKeyLocalitySwap)
if err != nil {
return err
}
if !requiresIndexChange {
return nil
}
}

nameExists := func(name string) bool {
_, err := tableDesc.FindIndexWithName(name)
return err == nil
Expand Down Expand Up @@ -511,6 +525,101 @@ func (p *planner) AlterPrimaryKey(
return nil
}

// Given the current table descriptor and the new primary keys
// index descriptor this function determines if the two are
// equivalent and if any index creation operations are needed
// by comparing properties.
func (p *planner) shouldCreateIndexes(
ctx context.Context,
desc *tabledesc.Mutable,
alterPKNode *tree.AlterTableAlterPrimaryKey,
alterPrimaryKeyLocalitySwap *alterPrimaryKeyLocalitySwap,
) (requiresIndexChange bool, err error) {
oldPK := desc.GetPrimaryIndex()

// Validate if basic properties between the two match.
if len(oldPK.IndexDesc().ColumnIDs) != len(alterPKNode.Columns) ||
oldPK.IsSharded() != (alterPKNode.Sharded != nil) ||
oldPK.IsInterleaved() != (alterPKNode.Interleave != nil) {
return true, nil
}

// Validate if sharding properties are the same.
if alterPKNode.Sharded != nil {
shardBuckets, err := tabledesc.EvalShardBucketCount(ctx, &p.semaCtx, p.EvalContext(), alterPKNode.Sharded.ShardBuckets)
if err != nil {
return true, err
}
if oldPK.IndexDesc().Sharded.ShardBuckets != shardBuckets {
return true, nil
}
}

// Validate if interleaving properties match,
// specifically the parent table, and the index
// involved.
if alterPKNode.Interleave != nil {
parentTable, err := resolver.ResolveExistingTableObject(
ctx, p, &alterPKNode.Interleave.Parent, tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc),
)
if err != nil {
return true, err
}

ancestors := oldPK.IndexDesc().Interleave.Ancestors
if len(ancestors) == 0 {
return true, nil
}
if ancestors[len(ancestors)-1].TableID !=
parentTable.GetID() {
return true, nil
}
if ancestors[len(ancestors)-1].IndexID !=
parentTable.GetPrimaryIndexID() {
return true, nil
}
}

// If the old primary key is dropped, then recreation
// is required.
if oldPK.IndexDesc().Disabled {
return true, nil
}

// Validate the columns on the indexes
for idx, elem := range alterPKNode.Columns {
col, err := desc.FindColumnWithName(elem.Column)
if err != nil {
return true, err
}

if col.GetID() != oldPK.IndexDesc().ColumnIDs[idx] {
return true, nil
}
if (elem.Direction == tree.Ascending &&
oldPK.IndexDesc().ColumnDirections[idx] != descpb.IndexDescriptor_ASC) ||
(elem.Direction == tree.Descending &&
oldPK.IndexDesc().ColumnDirections[idx] != descpb.IndexDescriptor_DESC) {
return true, nil
}
}

// Check partitioning changes based on primary key locality,
// either the config changes, or the region column is changed
// then recreate indexes.
if alterPrimaryKeyLocalitySwap != nil {
localitySwapConfig := alterPrimaryKeyLocalitySwap.localityConfigSwap
if !localitySwapConfig.NewLocalityConfig.Equal(localitySwapConfig.OldLocalityConfig) {
return true, nil
}
if localitySwapConfig.NewRegionalByRowColumnID != nil &&
*localitySwapConfig.NewRegionalByRowColumnID != oldPK.IndexDesc().ColumnIDs[0] {
return true, nil
}
}
return false, nil
}

// We only recreate the old primary key of the table as a unique secondary
// index if:
// * The table has a primary key (no DROP PRIMARY KEY statements have
Expand Down
Loading

0 comments on commit 44ea98b

Please sign in to comment.