From 1e05a9bdb8e2daa67c7d875041e520a0a1485cef Mon Sep 17 00:00:00 2001 From: Vivek Menezes Date: Wed, 30 Jan 2019 12:51:37 -0500 Subject: [PATCH 1/2] sql: allow the InternalExecutor to work with a modified schema The internal executor uses its own TableCollection. This is fine normally but is a problem when it is called in the context of a transaction that has already modified schema. The parent transaction will leave an outstanding intent on the schema, and the InternalExecutor will block attempting to acquire a lease on the schema. The fix is to modify the TableCollection used by the InternalExecutor to house the modified schema so that a schema lookup made by the InternalExecutor will not attempt to acquire a lease. This change is only creating the infrastructure by which this is done but not fixing the underlying deadlock problem. related to #34304 Release note: None --- pkg/sql/conn_executor.go | 8 ++++++++ pkg/sql/internal.go | 12 +++++++++++- pkg/sql/table.go | 17 +++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 0e6f6a0a4d8e..c58a704912c1 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -607,6 +607,7 @@ func (s *Server) newConnExecutorWithTxn( memMetrics MemoryMetrics, srvMetrics *Metrics, txn *client.Txn, + tcModifier TableCollectionModifier, ) (*connExecutor, error) { ex, err := s.newConnExecutor(ctx, sargs, stmtBuf, clientComm, memMetrics, srvMetrics) if err != nil { @@ -633,6 +634,13 @@ func (s *Server) newConnExecutorWithTxn( tree.ReadWrite, txn, ex.transitionCtx) + + // Modify the TableCollection to match the parent executor's TableCollection. + // This allows the InternalExecutor to see schema changes made by the + // parent executor. + if tcModifier != nil { + tcModifier.copyModifiedSchema(&ex.extraTxnState.tables) + } return ex, nil } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 6ebf07dd0595..952a46b756f1 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -83,6 +83,15 @@ type internalExecutorImpl struct { // parent session state if need be, but then we'd need to share a // sessionDataMutator. sessionData *sessiondata.SessionData + + // The internal executor uses its own TableCollection. A TableCollection + // is a schema cache for each transaction and contains data like the schema + // modified by a transaction. Occasionally an internal executor is called + // within the context of a transaction that has modified the schema, the + // internal executor should see the modified schema. This interface allows + // the internal executor to modify its TableCollection to match the + // TableCollection of the parent executor. + tcModifier TableCollectionModifier } // MakeInternalExecutor creates an InternalExecutor. @@ -192,7 +201,8 @@ func (ie *internalExecutorImpl) initConnEx( ie.mon, ie.memMetrics, &ie.s.InternalMetrics, - txn) + txn, + ie.tcModifier) } if err != nil { return nil, nil, err diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 063936588fcf..76878e3557d7 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -658,6 +658,23 @@ func (tc *TableCollection) releaseAllDescriptors() { tc.allDescriptors = nil } +// Copy the modified schema to the table collection. Used when initializing +// an InternalExecutor. +func (tc *TableCollection) copyModifiedSchema(to *TableCollection) { + if tc == nil { + return + } + to.uncommittedTables = tc.uncommittedTables + to.uncommittedDatabases = tc.uncommittedDatabases + // Do not copy the leased descriptors because we do not want + // the leased descriptors to be released by the "to" TableCollection. + // The "to" TableCollection can re-lease the same descriptors. +} + +type TableCollectionModifier interface { + copyModifiedSchema(to *TableCollection) +} + // createOrUpdateSchemaChangeJob finalizes the current mutations in the table // descriptor. If a schema change job in the system.jobs table has not been // created for mutations in the current transaction, one is created. The From ef3270f998002e833e4c0922af0d23d8f9642344 Mon Sep 17 00:00:00 2001 From: Vivek Menezes Date: Wed, 30 Jan 2019 13:01:41 -0500 Subject: [PATCH 2/2] sql: verify index before declaring it public A new index verification step is added after an index backfill to verify that the index is indeed valid. This is good practice, but also necessary with the introduction of the new bulk index backfill which doesn't have the capability to detect duplicate index entries. Release note: None --- pkg/sql/backfill.go | 170 ++++++++++++++++++++++++++++++++- pkg/sql/backfill/backfill.go | 15 +-- pkg/sql/conn_executor.go | 2 +- pkg/sql/exec_util.go | 1 + pkg/sql/internal.go | 2 +- pkg/sql/schema_changer.go | 4 + pkg/sql/schema_changer_test.go | 69 +++++++++++++ pkg/sql/sqlbase/structured.go | 24 +++++ pkg/sql/table.go | 2 +- 9 files changed, 272 insertions(+), 17 deletions(-) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index a2d9ac85f006..08306627b2de 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -16,6 +16,7 @@ package sql import ( "context" + "fmt" "sort" "time" @@ -26,11 +27,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" ) @@ -467,6 +471,165 @@ func (sc *SchemaChanger) distBackfill( return nil } +// validate the new indexes being added +func (sc *SchemaChanger) validateIndexes( + ctx context.Context, + evalCtx *extendedEvalContext, + lease *sqlbase.TableDescriptor_SchemaChangeLease, +) error { + if testDisableTableLeases { + return nil + } + readAsOf := sc.clock.Now() + return sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + txn.SetFixedTimestamp(ctx, readAsOf) + tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID) + if err != nil { + return err + } + + if err := sc.ExtendLease(ctx, lease); err != nil { + return err + } + + grp := ctxgroup.WithContext(ctx) + + var tableRowCount int64 + // Close when table count is ready. + tableCountReady := make(chan struct{}) + // Notify of count completion even when an error is encountered. + // Provision such that a sender will never block. + countDone := make(chan struct{}, len(tableDesc.Mutations)+1) + numIndexCounts := 0 + // Compute the size of each index. + for _, m := range tableDesc.Mutations { + if sc.mutationID != m.MutationID { + break + } + idx := m.GetIndex() + // An inverted index doesn't matchup in length. + if idx == nil || + idx.Type == sqlbase.IndexDescriptor_INVERTED || + m.Direction == sqlbase.DescriptorMutation_DROP { + continue + } + + numIndexCounts++ + grp.GoCtx(func(ctx context.Context) error { + defer func() { countDone <- struct{}{} }() + start := timeutil.Now() + // Make the mutations public in a private copy of the descriptor + // and add it to the TableCollection, so that we can use SQL below to perform + // the validation. We wouldn't have needed to do this if we could have + // updated the descriptor and run validation in the same transaction. However, + // our current system is incapable of running long running schema changes + // (the validation can take many minutes). So we pretend that the schema + // has been updated and actually update it in a separate transaction that + // follows this one. + desc, err := sqlbase.NewImmutableTableDescriptor(*tableDesc).MakeFirstMutationPublic() + if err != nil { + return err + } + tc := &TableCollection{leaseMgr: sc.leaseMgr} + // pretend that the schema has been modified. + if err := tc.addUncommittedTable(*desc); err != nil { + return err + } + + // Create a new eval context only because the eval context cannot be shared across many + // goroutines. + newEvalCtx := createSchemaChangeEvalCtx(ctx, readAsOf, evalCtx.Tracing, sc.ieFactory) + // TODO(vivek): This is not a great API. Leaving #34304 open. + ie := newEvalCtx.InternalExecutor.(*SessionBoundInternalExecutor) + ie.impl.tcModifier = tc + defer func() { + ie.impl.tcModifier = nil + }() + + row, err := newEvalCtx.InternalExecutor.QueryRow(ctx, "verify-idx-count", txn, + fmt.Sprintf(`SELECT count(*) FROM [%d AS t]@[%d]`, tableDesc.ID, idx.ID)) + if err != nil { + return err + } + idxLen := int64(tree.MustBeDInt(row[0])) + + log.Infof(ctx, "index %s/%s row count = %d, took %s", + tableDesc.Name, idx.Name, idxLen, timeutil.Since(start)) + + select { + case <-tableCountReady: + if idxLen != tableRowCount { + // TODO(vivek): find the offending row and include it in the error. + return pgerror.NewErrorf( + pgerror.CodeUniqueViolationError, + "index %q uniqueness violation: %d entries, expected %d", + idx.Name, idxLen, tableRowCount, + ) + } + + case <-ctx.Done(): + return ctx.Err() + } + + return nil + }) + } + + if numIndexCounts > 0 { + grp.GoCtx(func(ctx context.Context) error { + defer close(tableCountReady) + defer func() { countDone <- struct{}{} }() + var tableRowCountTime time.Duration + start := timeutil.Now() + // Count the number of rows in the table. + cnt, err := evalCtx.InternalExecutor.QueryRow(ctx, "VERIFY INDEX", txn, + fmt.Sprintf(`SELECT count(1) FROM [%d AS t]`, tableDesc.ID)) + if err != nil { + return err + } + tableRowCount = int64(tree.MustBeDInt(cnt[0])) + tableRowCountTime = timeutil.Since(start) + log.Infof(ctx, "table %s row count = %d, took %s", + tableDesc.Name, tableRowCount, tableRowCountTime) + return nil + }) + + // Periodic schema change lease extension. + grp.GoCtx(func(ctx context.Context) error { + count := numIndexCounts + 1 + refreshTimer := timeutil.NewTimer() + defer refreshTimer.Stop() + refreshTimer.Reset(checkpointInterval) + for { + select { + case <-countDone: + count-- + if count == 0 { + // Stop. + return nil + } + + case <-refreshTimer.C: + refreshTimer.Read = true + refreshTimer.Reset(checkpointInterval) + if err := sc.ExtendLease(ctx, lease); err != nil { + return err + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + if err := grp.Wait(); err != nil { + return err + } + } + return nil + }) +} + func (sc *SchemaChanger) backfillIndexes( ctx context.Context, evalCtx *extendedEvalContext, @@ -483,9 +646,12 @@ func (sc *SchemaChanger) backfillIndexes( chunkSize = indexBulkBackfillChunkSize.Get(&sc.settings.SV) } - return sc.distBackfill( + if err := sc.distBackfill( ctx, evalCtx, lease, version, indexBackfill, chunkSize, - backfill.IndexMutationFilter) + backfill.IndexMutationFilter); err != nil { + return err + } + return sc.validateIndexes(ctx, evalCtx, lease) } func (sc *SchemaChanger) truncateAndBackfillColumns( diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 73fc7e544888..7937dfe5ab99 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -28,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/pkg/errors" ) @@ -279,17 +278,9 @@ func ConvertBackfillError( // information useful in printing a sensible error. However // ConvertBatchError() will only work correctly if the schema elements // are "live" in the tableDesc. - desc := sqlbase.NewMutableExistingTableDescriptor(*protoutil.Clone(tableDesc.TableDesc()).(*sqlbase.TableDescriptor)) - mutationID := desc.Mutations[0].MutationID - for _, mutation := range desc.Mutations { - if mutation.MutationID != mutationID { - // Mutations are applied in a FIFO order. Only apply the first set - // of mutations if they have the mutation ID we're looking for. - break - } - if err := desc.MakeMutationComplete(mutation); err != nil { - return errors.Wrap(err, "backfill error") - } + desc, err := tableDesc.MakeFirstMutationPublic() + if err != nil { + return err } return row.ConvertBatchError(ctx, sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), b) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index c58a704912c1..651386704677 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -607,7 +607,7 @@ func (s *Server) newConnExecutorWithTxn( memMetrics MemoryMetrics, srvMetrics *Metrics, txn *client.Txn, - tcModifier TableCollectionModifier, + tcModifier tableCollectionModifier, ) (*connExecutor, error) { ex, err := s.newConnExecutor(ctx, sargs, stmtBuf, clientComm, memMetrics, srvMetrics) if err != nil { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 364effe677f6..44242cc414e3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -958,6 +958,7 @@ func (scc *schemaChangerCollection) execSchemaChanges( sc.testingKnobs = cfg.SchemaChangerTestingKnobs sc.distSQLPlanner = cfg.DistSQLPlanner sc.settings = cfg.Settings + sc.ieFactory = ieFactory for r := retry.Start(base.DefaultRetryOptions()); r.Next(); { evalCtx := createSchemaChangeEvalCtx(ctx, cfg.Clock.Now(), tracing, ieFactory) if err := sc.exec(ctx, true /* inSession */, &evalCtx); err != nil { diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 952a46b756f1..694969ca8de9 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -91,7 +91,7 @@ type internalExecutorImpl struct { // internal executor should see the modified schema. This interface allows // the internal executor to modify its TableCollection to match the // TableCollection of the parent executor. - tcModifier TableCollectionModifier + tcModifier tableCollectionModifier } // MakeInternalExecutor creates an InternalExecutor. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 665782e6319c..b5ff3b988de9 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -115,6 +116,7 @@ type SchemaChanger struct { clock *hlc.Clock settings *cluster.Settings execCfg *ExecutorConfig + ieFactory sqlutil.SessionBoundInternalExecutorFactory } // NewSchemaChangerForTesting only for tests. @@ -1641,6 +1643,7 @@ func (s *SchemaChangeManager) Start(stopper *stop.Stopper) { rangeDescriptorCache: s.execCfg.RangeDescriptorCache, clock: s.execCfg.Clock, settings: s.execCfg.Settings, + ieFactory: s.ieFactory, } execAfter := timeutil.Now().Add(delay) @@ -1810,6 +1813,7 @@ func createSchemaChangeEvalCtx( DataConversion: sessiondata.DataConversionConfig{ Location: dummyLocation, }, + User: security.NodeUser, } evalCtx := extendedEvalContext{ diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 7ac1ecb0f597..01ac613a3119 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -3945,3 +3946,71 @@ func TestBlockedSchemaChange(t *testing.T) { wg.Wait() } + +// Tests index baxckfill validation step by purposely deleting an index +// value during the index backfill and checking that the validation +// fails. +func TestIndexBackfillValidation(t *testing.T) { + defer leaktest.AfterTest(t)() + params, _ := tests.CreateTestServerParams() + const maxValue = 1000 + backfillCount := int64(0) + var db *client.DB + var tableDesc *sqlbase.TableDescriptor + params.Knobs = base.TestingKnobs{ + DistSQL: &distsqlrun.TestingKnobs{ + RunAfterBackfillChunk: func() { + count := atomic.AddInt64(&backfillCount, 1) + if count == 2 { + // drop an index value before validation. + keyEnc := keys.MakeTablePrefix(uint32(tableDesc.ID)) + key := roachpb.Key(encoding.EncodeUvarintAscending(keyEnc, uint64(tableDesc.NextIndexID))) + kv, err := db.Scan(context.TODO(), key, key.PrefixEnd(), 1) + if err != nil { + t.Error(err) + } + if err := db.Del(context.TODO(), kv[0].Key); err != nil { + t.Error(err) + } + } + }, + }, + // Disable backfill migrations, we still need the jobs table migration. + SQLMigrationManager: &sqlmigrations.MigrationManagerTestingKnobs{ + DisableBackfillMigrations: true, + }, + } + server, sqlDB, kvDB := serverutils.StartServer(t, params) + db = kvDB + defer server.Stopper().Stop(context.TODO()) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k INT PRIMARY KEY, v INT); +`); err != nil { + t.Fatal(err) + } + + tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // Bulk insert enough rows to exceed the chunk size. + inserts := make([]string, maxValue+1) + for i := 0; i < maxValue+1; i++ { + inserts[i] = fmt.Sprintf(`(%d, %d)`, i, i) + } + if _, err := sqlDB.Exec(`INSERT INTO t.test VALUES ` + strings.Join(inserts, ",")); err != nil { + t.Fatal(err) + } + + // Start schema change that eventually runs a backfill. + if _, err := sqlDB.Exec(`CREATE UNIQUE INDEX foo ON t.test (v)`); !testutils.IsError( + err, fmt.Sprintf("uniqueness violation: %d entries, expected %d", maxValue, maxValue+1), + ) { + t.Fatal(err) + } + + tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test") + if len(tableDesc.Indexes) > 0 || len(tableDesc.Mutations) > 0 { + t.Fatalf("descriptor broken %d, %d", len(tableDesc.Indexes), len(tableDesc.Mutations)) + } +} diff --git a/pkg/sql/sqlbase/structured.go b/pkg/sql/sqlbase/structured.go index ec8d781a6a89..52d700fb36a0 100644 --- a/pkg/sql/sqlbase/structured.go +++ b/pkg/sql/sqlbase/structured.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/interval" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/pkg/errors" ) @@ -2217,6 +2218,29 @@ func (desc *MutableTableDescriptor) addMutation(m DescriptorMutation) { desc.Mutations = append(desc.Mutations, m) } +// MakeFirstMutationPublic creates a MutableTableDescriptor from the +// ImmutableTableDescriptor by making the first mutation public. +// This is super valuable when trying to run SQL over data associated +// with a schema mutation that is still not yet public: Data validation, +// error reporting. +func (desc *ImmutableTableDescriptor) MakeFirstMutationPublic() (*MutableTableDescriptor, error) { + // Clone the ImmutableTable descriptor because we want to create an Immutable one. + table := NewMutableExistingTableDescriptor(*protoutil.Clone(desc.TableDesc()).(*TableDescriptor)) + mutationID := desc.Mutations[0].MutationID + for _, mutation := range desc.Mutations { + if mutation.MutationID != mutationID { + // Mutations are applied in a FIFO order. Only apply the first set + // of mutations if they have the mutation ID we're looking for. + break + } + if err := table.MakeMutationComplete(mutation); err != nil { + return nil, err + } + } + table.Version++ + return table, nil +} + // ColumnNeedsBackfill returns true if adding the given column requires a // backfill (dropping a column always requires a backfill). func ColumnNeedsBackfill(desc *ColumnDescriptor) bool { diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 76878e3557d7..ff55f4aaf3d7 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -671,7 +671,7 @@ func (tc *TableCollection) copyModifiedSchema(to *TableCollection) { // The "to" TableCollection can re-lease the same descriptors. } -type TableCollectionModifier interface { +type tableCollectionModifier interface { copyModifiedSchema(to *TableCollection) }