Skip to content

Commit

Permalink
sql: introduce MVCC-compliant index backfiller
Browse files Browse the repository at this point in the history
Previously, the index backfilling process depended upon non-MVCC
compliant AddSSTable calls which potentially rewrote previously read
historical values.

To support an MVCC-compliant AddSSTable that writes at the _current_
timestamp, this change implements a new backfilling process described
in the following RFC:

https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20211004_incremental_index_backfiller.md

In summary, the new index backfilling process depends on backfilling
the new index when it is in a BACKFILLING state (added in cockroachdb#72281). In
this state it receives no writes or deletes. Writes that occur during
the backfilling process are captured by a "temporary index."  This
temporary index uses the DeletePreservingEncoding to ensure it
captures deletes as well as writes.

After the of bulk backfill using the MVCC-compliant AddSSTable, the
index is moved into a MERGING state
(added in cockroachdb#75663) in which it receives writes and deletes. Writes
previously captured by the temporary index are then transactionally
merged into the newly added index.

This feature is currently behind a new boolean cluster setting which
default to true. Schema changes that contains both old and new-style
backfills are rejected.

Reverting the default to false will require updating various tests
since many tests depend on the exact index IDs of newly added indexes.

Release note: None

Co-authored-by: Rui Hu <[email protected]>
  • Loading branch information
stevendanna and Rui Hu committed Feb 16, 2022
1 parent 13c5a25 commit 5a97a11
Show file tree
Hide file tree
Showing 48 changed files with 1,246 additions and 256 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-66 set the active cluster version in the format '<major>.<minor>'
version version 21.2-68 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-66</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-68</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ func (r *restoreResumer) publishDescriptors(
}
newIdx := found.IndexDescDeepCopy()
mutTable.RemovePublicNonPrimaryIndex(found.Ordinal())
if err := mutTable.AddIndexMutation(&newIdx, descpb.DescriptorMutation_ADD); err != nil {
if err := mutTable.AddIndexMutation(ctx, &newIdx, descpb.DescriptorMutation_ADD, r.settings); err != nil {
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"//pkg/ccl/testutilsccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvbase",
Expand Down
31 changes: 24 additions & 7 deletions pkg/ccl/multiregionccl/regional_by_row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/testutilsccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
Expand Down Expand Up @@ -568,6 +569,10 @@ func TestIndexCleanupAfterAlterFromRegionalByRow(t *testing.T) {
{locality: "REGIONAL BY ROW AS region_col"},
} {
t.Run(tc.locality, func(t *testing.T) {
// Don't allow gc jobs to complete so that we
// can validate that they were created.
blockGC := make(chan struct{})

knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable the merge queue because it makes this test flakey
Expand All @@ -583,6 +588,7 @@ func TestIndexCleanupAfterAlterFromRegionalByRow(t *testing.T) {
},
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { <-blockGC; return nil }},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
Expand Down Expand Up @@ -680,21 +686,32 @@ CREATE TABLE regional_by_row (
return nil
}

expectedGCJobsForDrops := 4
expectedGCJobsForTempIndexes := 4
// Now check that we have the right number of index GC jobs pending.
err = queryIndexGCJobsAndValidateCount(`running`, 4)
err = queryIndexGCJobsAndValidateCount(`running`, expectedGCJobsForDrops+expectedGCJobsForTempIndexes)
require.NoError(t, err)
err = queryIndexGCJobsAndValidateCount(`succeeded`, 0)
require.NoError(t, err)

queryAndEnsureThatIndexGCJobsSucceeded := func(count int) func() error {
return func() error { return queryIndexGCJobsAndValidateCount(`succeeded`, count) }
}

// Unblock GC jobs.
close(blockGC)
// The GC jobs for the temporary indexes should be cleaned up immediately.
testutils.SucceedsSoon(t, queryAndEnsureThatIndexGCJobsSucceeded(expectedGCJobsForTempIndexes))
// The GC jobs for the drops should still be waiting out the GC TTL.
err = queryIndexGCJobsAndValidateCount(`running`, expectedGCJobsForDrops)
require.NoError(t, err)

// Change gc.ttlseconds to speed up the cleanup.
_, err = sqlDB.Exec(`ALTER TABLE regional_by_row CONFIGURE ZONE USING gc.ttlseconds = 1`)
require.NoError(t, err)

// Validate that indexes are cleaned up.
queryAndEnsureThatFourIndexGCJobsSucceeded := func() error {
return queryIndexGCJobsAndValidateCount(`succeeded`, 4)
}
testutils.SucceedsSoon(t, queryAndEnsureThatFourIndexGCJobsSucceeded)
testutils.SucceedsSoon(t, queryAndEnsureThatIndexGCJobsSucceeded(expectedGCJobsForDrops+expectedGCJobsForTempIndexes))
err = queryIndexGCJobsAndValidateCount(`running`, 0)
require.NoError(t, err)
})
Expand Down Expand Up @@ -918,7 +935,7 @@ func TestIndexDescriptorUpdateForImplicitColumns(t *testing.T) {

t.Run("primary index", func(t *testing.T) {
tdb.Exec(t, `CREATE TABLE test.t1 (
a INT PRIMARY KEY,
a INT PRIMARY KEY,
b test.public.crdb_internal_region NOT NULL
) LOCALITY GLOBAL`)
indexes := fetchIndexes("t1")
Expand All @@ -944,7 +961,7 @@ func TestIndexDescriptorUpdateForImplicitColumns(t *testing.T) {

t.Run("secondary index", func(t *testing.T) {
tdb.Exec(t, `CREATE TABLE test.t2 (
a INT PRIMARY KEY,
a INT PRIMARY KEY,
b test.public.crdb_internal_region NOT NULL,
c INT NOT NULL,
d INT NOT NULL,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
partition string
}{
{1, ""},
{3, ""},
{3, "p2"},
{4, ""},
{4, "p2"},
}
for _, target := range subzones {
if exists := subzoneExists(cfg, target.index, target.partition); !exists {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestDropIndexPartitionedByUserDefinedTypeCCL(t *testing.T) {
t.Helper()
var id int
tdb.QueryRow(t, `
SELECT job_id
SELECT job_id
FROM crdb_internal.jobs
WHERE description LIKE $1
`, description).Scan(&id)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,12 +1462,12 @@ ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (y)

// Our subzones should be spans prefixed with dropped copy of i1,
// dropped copy of i2, new copy of i1, and new copy of i2.
// These have ID's 2, 3, 6 and 7 respectively.
// These have ID's 2, 3, 8 and 10 respectively.
expectedSpans := []roachpb.Key{
table.IndexSpan(keys.SystemSQLCodec, 2 /* indexID */).Key,
table.IndexSpan(keys.SystemSQLCodec, 3 /* indexID */).Key,
table.IndexSpan(keys.SystemSQLCodec, 6 /* indexID */).Key,
table.IndexSpan(keys.SystemSQLCodec, 7 /* indexID */).Key,
table.IndexSpan(keys.SystemSQLCodec, 8 /* indexID */).Key,
table.IndexSpan(keys.SystemSQLCodec, 10 /* indexID */).Key,
}
if len(zone.SubzoneSpans) != len(expectedSpans) {
t.Fatalf("expected subzones to have length %d", len(expectedSpans))
Expand Down
11 changes: 10 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ const (
// to disable the data propagation mechanism it and the entire spanconfig
// infrastructure obviates.
DisableSystemConfigGossipTrigger
// MVCCIndexBackfiller supports MVCC-compliant index
// backfillers via a new BACKFILLING index state, delete
// preserving temporary indexes, and a post-backfill merging
// processing.
MVCCIndexBackfiller

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -450,7 +456,10 @@ var versionsSingleton = keyedVersions{
Key: DisableSystemConfigGossipTrigger,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 66},
},

{
Key: MVCCIndexBackfiller,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 68},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/jobs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// errPauseSelfSentinel exists so the errors returned from PauseRequestErr can
// be marked with it.
var errPauseSelfSentinel = errors.New("job requested it be paused")
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *planner) addColumnImpl(

n.tableDesc.AddColumnMutation(col, descpb.DescriptorMutation_ADD)
if idx != nil {
if err := n.tableDesc.AddIndexMutation(idx, descpb.DescriptorMutation_ADD); err != nil {
if err := n.tableDesc.AddIndexMutation(params.ctx, idx, descpb.DescriptorMutation_ADD, params.p.ExecCfg().Settings); err != nil {
return err
}
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (p *planner) AlterPrimaryKey(
}
}

if err := tableDesc.AddIndexMutation(newPrimaryIndexDesc, descpb.DescriptorMutation_ADD); err != nil {
if err := tableDesc.AddIndexMutation(ctx, newPrimaryIndexDesc, descpb.DescriptorMutation_ADD, p.ExecCfg().Settings); err != nil {
return err
}
version := p.ExecCfg().Settings.Version.ActiveVersion(ctx)
Expand Down Expand Up @@ -357,7 +358,7 @@ func (p *planner) AlterPrimaryKey(
// Set correct version and encoding type.
newUniqueIdx.Version = descpb.PrimaryIndexWithStoredColumnsVersion
newUniqueIdx.EncodingType = descpb.SecondaryIndexEncoding
if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newUniqueIdx, newPrimaryIndexDesc); err != nil {
if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newUniqueIdx, newPrimaryIndexDesc, p.ExecCfg().Settings); err != nil {
return err
}
// Copy the old zone configuration into the newly created unique index for PARTITION ALL BY.
Expand Down Expand Up @@ -484,7 +485,7 @@ func (p *planner) AlterPrimaryKey(
newIndex.Name = tabledesc.GenerateUniqueName(basename, nameExists)
newIndex.Version = descpb.PrimaryIndexWithStoredColumnsVersion
newIndex.EncodingType = descpb.SecondaryIndexEncoding
if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newIndex, newPrimaryIndexDesc); err != nil {
if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newIndex, newPrimaryIndexDesc, p.ExecCfg().Settings); err != nil {
return err
}

Expand Down Expand Up @@ -683,17 +684,30 @@ func addIndexMutationWithSpecificPrimaryKey(
table *tabledesc.Mutable,
toAdd *descpb.IndexDescriptor,
primary *descpb.IndexDescriptor,
settings *cluster.Settings,
) error {
// Reset the ID so that a call to AllocateIDs will set up the index.
toAdd.ID = 0
if err := table.AddIndexMutation(toAdd, descpb.DescriptorMutation_ADD); err != nil {
if err := table.AddIndexMutation(ctx, toAdd, descpb.DescriptorMutation_ADD, settings); err != nil {
return err
}
if err := table.AllocateIDsWithoutValidation(ctx); err != nil {
return err
}
// Use the columns in the given primary index to construct this indexes
// KeySuffixColumnIDs list.

setKeySuffixColumnIDsFromPrimary(toAdd, primary)
if tempIdx := catalog.FindCorrespondingTemporaryIndexByID(table, toAdd.ID); tempIdx != nil {
setKeySuffixColumnIDsFromPrimary(tempIdx.IndexDesc(), primary)
}

return nil
}

// setKeySuffixColumnIDsFromPrimary uses the columns in the given
// primary index to construct this toAdd's KeySuffixColumnIDs list.
func setKeySuffixColumnIDsFromPrimary(
toAdd *descpb.IndexDescriptor, primary *descpb.IndexDescriptor,
) {
presentColIDs := catalog.MakeTableColSet(toAdd.KeyColumnIDs...)
presentColIDs.UnionWith(catalog.MakeTableColSet(toAdd.StoreColumnIDs...))
toAdd.KeySuffixColumnIDs = nil
Expand All @@ -702,5 +716,4 @@ func addIndexMutationWithSpecificPrimaryKey(
toAdd.KeySuffixColumnIDs = append(toAdd.KeySuffixColumnIDs, colID)
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (n *alterTableNode) startExec(params runParams) error {
"index %q being dropped, try again later", d.Name)
}
}
if err := n.tableDesc.AddIndexMutation(&idx, descpb.DescriptorMutation_ADD); err != nil {
if err := n.tableDesc.AddIndexMutation(params.ctx, &idx, descpb.DescriptorMutation_ADD, params.p.ExecCfg().Settings); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 5a97a11

Please sign in to comment.