From c184de0a48eda2c58de6329d398dd1382c9bb129 Mon Sep 17 00:00:00 2001 From: winkyao Date: Mon, 6 Aug 2018 20:39:31 +0800 Subject: [PATCH] ddl: batch check the constrains when we add a unique-index. (#7132) --- ddl/ddl_db_test.go | 1 - ddl/index.go | 93 ++++++++++++++++++++++++++++++++++++++--- ddl/integration_test.go | 16 +++++++ 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 9aafaf4312449..bff4d4bedc588 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -322,7 +322,6 @@ LOOP: s.mustExec(c, "delete from t1 where c1 = ?", i+10) } sessionExec(c, s.store, "create index c3_index on t1 (c3)") - s.mustExec(c, "drop table t1") } diff --git a/ddl/index.go b/ddl/index.go index adb768b00867d..7da0357559059 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -406,6 +406,7 @@ type indexRecord struct { handle int64 key []byte // It's used to lock a record. Record it to reduce the encoding time. vals []types.Datum // It's the index values. + skip bool // skip indicates that the index key is already exists, we should not add it. } type addIndexWorker struct { @@ -420,9 +421,13 @@ type addIndexWorker struct { colFieldMap map[int64]*types.FieldType closed bool - defaultVals []types.Datum // It's used to reduce the number of new slice. - idxRecords []*indexRecord // It's used to reduce the number of new slice. - rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map. + // The following attributes are used to reduce memory allocation. + defaultVals []types.Datum + idxRecords []*indexRecord + rowMap map[int64]types.Datum + idxKeyBufs [][]byte + batchCheckKeys []kv.Key + distinctCheckFlags []bool } type reorgIndexTask struct { @@ -452,7 +457,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, d *ddl, id int, t table.Table index: index, table: t, colFieldMap: colFieldMap, - defaultVals: make([]types.Datum, len(t.Cols())), rowMap: make(map[int64]types.Datum, len(colFieldMap)), } @@ -582,6 +586,71 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string } } +func (w *addIndexWorker) initBatchCheckBufs(batchCount int) { + if len(w.idxKeyBufs) < batchCount { + w.idxKeyBufs = make([][]byte, batchCount) + } + + w.batchCheckKeys = w.batchCheckKeys[:0] + w.distinctCheckFlags = w.distinctCheckFlags[:0] +} + +func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*indexRecord) error { + idxInfo := w.index.Meta() + if !idxInfo.Unique { + // non-unique key need not to check, just overwrite it, + // because in most case, backfilling indices is not exists. + return nil + } + + w.initBatchCheckBufs(len(idxRecords)) + stmtCtx := w.sessCtx.GetSessionVars().StmtCtx + for i, record := range idxRecords { + idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i]) + if err != nil { + return errors.Trace(err) + } + // save the buffer to reduce memory allocations. + w.idxKeyBufs[i] = idxKey + + w.batchCheckKeys = append(w.batchCheckKeys, idxKey) + w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) + } + + batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys) + if err != nil { + return errors.Trace(err) + } + + // 1. unique-key is duplicate and the handle is equal, skip it. + // 2. unique-key is duplicate and the handle is not equal, return duplicate error. + // 3. non-unique-key is duplicate, skip it. + for i, key := range w.batchCheckKeys { + if val, found := batchVals[string(key)]; found { + if w.distinctCheckFlags[i] { + handle, err1 := tables.DecodeHandle(val) + if err1 != nil { + return errors.Trace(err1) + } + + if handle != idxRecords[i].handle { + return errors.Trace(kv.ErrKeyExists) + } + } + idxRecords[i].skip = true + } else { + // The keys in w.batchCheckKeys also maybe duplicate, + // so we need to backfill the not found key into `batchVals` map. + if w.distinctCheckFlags[i] { + batchVals[string(key)] = tables.EncodeHandle(idxRecords[i].handle) + } + } + } + // Constrains is already checked. + stmtCtx.BatchCheck = true + return nil +} + // backfillIndexInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed, // indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry. // backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. @@ -601,15 +670,27 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan return errors.Trace(err) } + err = w.batchCheckUniqueKey(txn, idxRecords) + if err != nil { + return errors.Trace(err) + } + for _, idxRecord := range idxRecords { + scanCount++ + // The index is already exists, we skip it, no needs to backfill it. + // The following update, delete, insert on these rows, TiDB can handle it correctly. + if idxRecord.skip { + continue + } + + // Lock the row key to notify us that someone delete or update the row, + // then we should not backfill the index of it, otherwise the adding index is redundant. err := txn.LockKeys(idxRecord.key) if err != nil { return errors.Trace(err) } - scanCount++ // Create the index. - // TODO: backfill unique-key will check constraint every row, we can speed up this case by using batch check. handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle) if err != nil { if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle { diff --git a/ddl/integration_test.go b/ddl/integration_test.go index 2cacebb284be9..33814371f7185 100644 --- a/ddl/integration_test.go +++ b/ddl/integration_test.go @@ -89,6 +89,22 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) { c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue) } +func (s *testIntegrationSuite) TestUniquekeyNullValue(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("USE test") + + tk.MustExec("create table t(a int primary key, b varchar(255))") + + tk.MustExec("insert into t values(1, NULL)") + tk.MustExec("insert into t values(2, NULL)") + tk.MustExec("alter table t add unique index b(b);") + res := tk.MustQuery("select count(*) from t use index(b);") + res.Check(testkit.Rows("2")) + tk.MustExec("admin check table t") + tk.MustExec("admin check index t b") +} + func (s *testIntegrationSuite) TestEndIncluded(c *C) { tk := testkit.NewTestKit(c, s.store)