Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
64171: ccl/sqlproxyccl: directory proto and test server r=darinpp a=darinpp

* Defines a new interface between a tenant directory client and server
* Moves the tenant directory from the CC repo over
* Tenant directory modified to use the new interface
* Tenant directory modified to use stop.Stopper
* Modified pod watcher to use streaming grpc call
* Renamed package from directory to tenant, tenantID to ID etc
* Renamed references to k8s, pods to server, endpoints etc
* Prevents test tenant servers to start for non-existing/inactive tenants
* Adds ability to shut down individual tenant servers within a test cluster
* Adds a test directory server that can start/stop tenants
* Adds tests of the directory running agianst the test server
* Allow insecure connections from test tenant to KV server
* Fixed a race in kvtenantccl

Release note: None

64273: sql: Repartition tables before dropping regions r=arulajmani,otan,ajwerner a=ajstorm

Previously we could get into a situation where on dropping a region,
concurrent queries on REGIONAL BY ROW tables could fail. This was due to
the fact that when resolving the partition tuple in the optimizer, we'd
encounter a partition without a corresponding enum value. This issue was
timing dependant, and would only be hit if the query had a leased type
descriptor from after the drop region, along with a table descriptor
from before the drop region.

To get around this problem, we introduce a new transaction to the drop
region schema changer which performs a pre-drop action of repartitioning
all REGIONAL BY ROW tables, and updating their leases. This ensures that
the table descriptors will be seen _before_ the modified type
descriptors.

Of note is the fact that this is only required on drop region. In the
add region case, having this mismatch occur and seeing an extra region
(with no corresponding partition) is not a problem for the query engine.

Release note (sql change): Fix a bug where queries on REGIONAL BY ROW tables
could fail in the brief window in which a DROP REGION operation is in
progress.

Resolves: #64223 

64341: changefeedccl: Add a large doc comment r=stevendanna a=stevendanna

I found drawing out this diagram useful when working on this system,
perhaps it'll be useful to others as well.

Release note: None

Co-authored-by: Darin Peshev <[email protected]>
Co-authored-by: Adam Storm <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
4 people committed Apr 29, 2021
4 parents 7dd9c89 + 65c30a3 + 4a5db31 + 78f8e0f commit 974b144
Show file tree
Hide file tree
Showing 24 changed files with 4,102 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ALL_TESTS = [
"//pkg/ccl/partitionccl:partitionccl_test",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/sqlproxyccl/cache:cache_test",
"//pkg/ccl/sqlproxyccl/tenant:tenant_test",
"//pkg/ccl/sqlproxyccl:sqlproxyccl_test",
"//pkg/ccl/storageccl/engineccl:engineccl_test",
"//pkg/ccl/storageccl:storageccl_test",
Expand Down
4 changes: 4 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,8 @@ type TestTenantArgs struct {

// TestingKnobs for the test server.
TestingKnobs TestingKnobs

// Test server starts with secure mode by default. When this is set to true
// it will switch to insecure
ForceInsecure bool
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3594,7 +3594,7 @@ func TestBackupTenantsWithRevisionHistory(t *testing.T) {
ctx, tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()

_, err := tc.Servers[0].StartTenant(base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10)})
_, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10)})
require.NoError(t, err)

const msg = "can not backup tenants with revision history"
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"changefeed_dist.go",
"changefeed_processors.go",
"changefeed_stmt.go",
"doc.go",
"encoder.go",
"errors.go",
"metrics.go",
Expand Down
91 changes: 91 additions & 0 deletions pkg/ccl/changefeedccl/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

/*
Package changefeedccl is the internal implementation behind
changefeeds.
Changefeeds emit KV events on user-specified tables to user-specified
sinks.
Changefeeds are built on top of rangefeeds, which provide a stream of
KV events for a given keyspan as well as periodic "resolved
timestamps" for those spans. For more information on rangefeeds see
docs/RFCS/20170613_range_feeds_storage_primitive.md
The changefeed machinery encodes and delivers both the KV events
and resolved timestamps to the sinks. It further uses the resolved
timestamps to periodically checkpoint a changefeed's progress such
that it can be resumed in the case of a failure.
To ensure that we can correctly encode every KV returned by the
rangefeed, changefeeds also monitor for schema changes.
"Enterprise" changefeeds are all changefeeds with a sink. These
feeds emit KV events to external systems and are run via the job
system.
"Sinkless" or "Experimental" changefeeds are changefeeds without a
sink which emit rows back to the original sql node that issues the
CREATE CHANGEFEED request.
The major components of this system are:
changfeedAggregator: Reads events from a kvfeed, encodes and emits
KV events to the sink and forwards resolved to the changeFrontier.
changeFrontier: Keeps track of the high-watermark of resolved
timestamps seen across the spans we are tracking. Periodically, it
emits resolved timestamps to the sink and checkpoints the
changefeed progress in the job system.
kvfeed: Coordinates the consumption of the rangefeed with the
schemafeed. It starts a set of goroutines that consume the
rangefeed events and forwards events back to the
changefeedAggregator once the schema for the event is known.
schemafeed: Periodically polls the table descriptors
table. Rangefeed events are held until it is sure it knows the
schema for the relevant table at the event's timestamp.
+-----------------+
+------+ | | +-----+
| sink |<------+ changeFrontier +------>| job |
+------+ | | +-----+
+--------+--------+
^
|
+-------+--------+
+------+ | |
| sink +<-------+ changefeedAgg |<------------+
+------+ | | |
+--+-------------+ chanBuffer
| |
v +------+------+
+--------------+ | |
| +------>| copyFromTo +--+
| kvfeed | | | |
| | +------+------+ |
+--------+---+-+ ^ |
| | memBuffer |
| | | |
| | +-----+------+ | +-----------+
| | | | | | |
| +--------> |physical +----->| rangefeed |
| | feed | | | |
| +------------+ | +-----------+
| |
| |
| +------------+ |
+------------> | schemafeed |<-|
| (polls) |
+------------+
*/
package changefeedccl
24 changes: 14 additions & 10 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,20 @@ func (c *Connector) getClient(ctx context.Context) (roachpb.InternalClient, erro
dialCtx := c.AnnotateCtx(context.Background())
dialCtx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(dialCtx)
defer cancel()
err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial", c.dialAddrs)
var client roachpb.InternalClient
err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial",
func(ctx context.Context) error {
var err error
client, err = c.dialAddrs(ctx)
return err
})
if err != nil {
return nil, err
}
// NB: read lock not needed.
return c.mu.client, nil
c.mu.Lock()
defer c.mu.Unlock()
c.mu.client = client
return client, nil
})
c.mu.RUnlock()

Expand All @@ -387,7 +395,7 @@ func (c *Connector) getClient(ctx context.Context) (roachpb.InternalClient, erro

// dialAddrs attempts to dial each of the configured addresses in a retry loop.
// The method will only return a non-nil error on context cancellation.
func (c *Connector) dialAddrs(ctx context.Context) error {
func (c *Connector) dialAddrs(ctx context.Context) (roachpb.InternalClient, error) {
for r := retry.StartWithCtx(ctx, c.rpcRetryOptions); r.Next(); {
// Try each address on each retry iteration.
randStart := rand.Intn(len(c.addrs))
Expand All @@ -398,14 +406,10 @@ func (c *Connector) dialAddrs(ctx context.Context) error {
log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)
continue
}
client := roachpb.NewInternalClient(conn)
c.mu.Lock()
c.mu.client = client
c.mu.Unlock()
return nil
return roachpb.NewInternalClient(conn), nil
}
}
return ctx.Err()
return nil, ctx.Err()
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
209 changes: 209 additions & 0 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,212 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) {
})
}
}

// TestRegionAddDropEnclosingBackupOps tests adding/dropping regions
// (which may or may not succeed) with a concurrent backup operation
// The sketch of the test is as follows:
// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit.
// - Block in the type schema changer.
// - Client 2 performs a backup operation.
// - Resume blocked schema change job.
// - Startup a new cluster.
// - Restore the database, block in the schema changer.
// - Fail or succeed the schema change job.
// - Validate that the database and its tables look as expected.
func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "times out under race")

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

regionAlterCmds := []struct {
name string
cmd string
shouldSucceed bool
expectedPartitions []string
}{
{
name: "drop-region-fail",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "drop-region-succeed",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2"},
},
{
name: "add-region-fail",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "add-region-succeed",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3", "us-east4"},
},
}

testCases := []struct {
name string
backupOp string
restoreOp string
}{
{
name: "backup-database",
backupOp: `BACKUP DATABASE db TO 'nodelocal://0/db_backup'`,
restoreOp: `RESTORE DATABASE db FROM 'nodelocal://0/db_backup'`,
},
}

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+"-"+tc.name, func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
backupOpFinished := make(chan struct{})
waitInTypeSchemaChangerDuringBackup := true

backupKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if waitInTypeSchemaChangerDuringBackup {
waitInTypeSchemaChangerDuringBackup = false
close(typeChangeStarted)
<-backupOpFinished
}
// Always return success here. The goal of this test isn't to
// fail during the backup, but to do so during the restore.
return nil
},
},
}

tempExternalIODir, tempDirCleanup := testutils.TempDir(t)
defer tempDirCleanup()

_, sqlDBBackup, cleanupBackup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, backupKnobs, &tempExternalIODir,
)
defer cleanupBackup()

_, err := sqlDBBackup.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
USE db;
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);
`)
require.NoError(t, err)

go func() {
defer func() {
close(typeChangeFinished)
}()
_, err := sqlDBBackup.Exec(regionAlterCmd.cmd)
if err != nil {
t.Errorf("expected success, got %v when executing %s", err, regionAlterCmd.cmd)
}
}()

<-typeChangeStarted

_, err = sqlDBBackup.Exec(tc.backupOp)
close(backupOpFinished)
require.NoError(t, err)

<-typeChangeFinished

restoreKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if !regionAlterCmd.shouldSucceed {
// Trigger a roll-back.
return errors.New("nope")
}
// Trod on.
return nil
},
},
}

// Start a new cluster (with new testing knobs) for restore.
_, sqlDBRestore, cleanupRestore := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, restoreKnobs, &tempExternalIODir,
)
defer cleanupRestore()

_, err = sqlDBRestore.Exec(tc.restoreOp)
require.NoError(t, err)

// First ensure that the data was restored correctly.
numRows := sqlDBRestore.QueryRow(`SELECT count(*) from db.rbr`)
require.NoError(t, numRows.Err())
var count int
err = numRows.Scan(&count)
require.NoError(t, err)
if count != 3 {
t.Logf("unexpected number of rows after restore: expected 3, found %d", count)
}

// Now validate that the background job has completed and the
// regions are in the expected state.
testutils.SucceedsSoon(t, func() error {
dbRegions := make([]string, 0, len(regionAlterCmd.expectedPartitions))
rowsRegions, err := sqlDBRestore.Query("SELECT region FROM [SHOW REGIONS FROM DATABASE db]")
require.NoError(t, err)
defer rowsRegions.Close()
for {
done := rowsRegions.Next()
if !done {
require.NoError(t, rowsRegions.Err())
break
}
var region string
err := rowsRegions.Scan(&region)
require.NoError(t, err)
dbRegions = append(dbRegions, region)
}
if len(dbRegions) != len(regionAlterCmd.expectedPartitions) {
return errors.Newf("unexpected number of regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
for i, expectedRegion := range regionAlterCmd.expectedPartitions {
if expectedRegion != dbRegions[i] {
return errors.Newf("unexpected regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
}
return nil
})

// Finally, confirm that all of the tables were repartitioned
// correctly by the above ADD/DROP region job.
testutils.SucceedsSoon(t, func() error {
return multiregionccltestutils.TestingEnsureCorrectPartitioning(
sqlDBRestore,
"db",
"rbr",
[]string{"rbr@primary"},
)
})
})
}
}
}
Loading

0 comments on commit 974b144

Please sign in to comment.