From 9c83f3f1767a2e01b3a95ffb8dab2b73b2298cce Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 13 Dec 2016 18:31:13 +0800 Subject: [PATCH] executor: 'INSERT IGNORE' should only ignore KeyExists error. (#2241) --- executor/executor_write.go | 16 +++--- executor/executor_write_test.go | 10 +++- kv/fault_injection.go | 94 +++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 8 deletions(-) create mode 100644 kv/fault_injection.go diff --git a/executor/executor_write.go b/executor/executor_write.go index 02acd34f0aa9d..7b665bbcbb892 100644 --- a/executor/executor_write.go +++ b/executor/executor_write.go @@ -608,19 +608,21 @@ func (e *InsertExec) Next() (*Row, error) { continue } - if len(e.OnDuplicate) == 0 || !terror.ErrorEqual(err, kv.ErrKeyExists) { - // If you use the IGNORE keyword, errors that occur while executing the INSERT statement are ignored. + if terror.ErrorEqual(err, kv.ErrKeyExists) { + // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. // For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in // the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs. if e.Ignore { continue } - return nil, errors.Trace(err) - } - - if err = e.onDuplicateUpdate(row, h, toUpdateColumns); err != nil { - return nil, errors.Trace(err) + if len(e.OnDuplicate) > 0 { + if err = e.onDuplicateUpdate(row, h, toUpdateColumns); err != nil { + return nil, errors.Trace(err) + } + continue + } } + return nil, errors.Trace(err) } if e.lastInsertID != 0 { diff --git a/executor/executor_write_test.go b/executor/executor_write_test.go index ab24315e9d5d9..abd742220188c 100644 --- a/executor/executor_write_test.go +++ b/executor/executor_write_test.go @@ -14,12 +14,14 @@ package executor_test import ( + "errors" "fmt" . "github.com/pingcap/check" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/testkit" @@ -217,7 +219,8 @@ func (s *testSuite) TestInsertIgnore(c *C) { s.cleanEnv(c) testleak.AfterTest(c)() }() - tk := testkit.NewTestKit(c, s.store) + var cfg kv.InjectionConfig + tk := testkit.NewTestKit(c, kv.NewInjectedStore(s.store, &cfg)) tk.MustExec("use test") testSQL := `drop table if exists t; create table t (id int PRIMARY KEY AUTO_INCREMENT, c1 int);` @@ -235,6 +238,11 @@ func (s *testSuite) TestInsertIgnore(c *C) { rowStr = fmt.Sprintf("%v %v", "1", "2") rowStr1 := fmt.Sprintf("%v %v", "2", "3") r.Check(testkit.Rows(rowStr, rowStr1)) + + cfg.SetGetError(errors.New("foo")) + _, err := tk.Exec("insert ignore into t values (1, 3)") + c.Assert(err, NotNil) + cfg.SetGetError(nil) } func (s *testSuite) TestReplace(c *C) { diff --git a/kv/fault_injection.go b/kv/fault_injection.go new file mode 100644 index 0000000000000..bc1d2f4a05f84 --- /dev/null +++ b/kv/fault_injection.go @@ -0,0 +1,94 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "sync" + +// InjectionConfig is used for fault injections for KV components. +type InjectionConfig struct { + sync.RWMutex + getError error // kv.Get() always return this error. +} + +// SetGetError injects an error for all kv.Get() methods. +func (c *InjectionConfig) SetGetError(err error) { + c.Lock() + defer c.Unlock() + + c.getError = err +} + +// InjectedStore wraps a Storage with injections. +type InjectedStore struct { + Storage + cfg *InjectionConfig +} + +// NewInjectedStore creates a InjectedStore with config. +func NewInjectedStore(store Storage, cfg *InjectionConfig) Storage { + return &InjectedStore{ + Storage: store, + cfg: cfg, + } +} + +// Begin creates an injected Transaction. +func (s *InjectedStore) Begin() (Transaction, error) { + txn, err := s.Storage.Begin() + return &InjectedTransaction{ + Transaction: txn, + cfg: s.cfg, + }, err +} + +// GetSnapshot creates an injected Snapshot. +func (s *InjectedStore) GetSnapshot(ver Version) (Snapshot, error) { + snapshot, err := s.Storage.GetSnapshot(ver) + return &InjectedSnapshot{ + Snapshot: snapshot, + cfg: s.cfg, + }, err +} + +// InjectedTransaction wraps a Transaction with injections. +type InjectedTransaction struct { + Transaction + cfg *InjectionConfig +} + +// Get returns an error if cfg.getError is set. +func (t *InjectedTransaction) Get(k Key) ([]byte, error) { + t.cfg.RLock() + defer t.cfg.RUnlock() + if t.cfg.getError != nil { + return nil, t.cfg.getError + } + return t.Transaction.Get(k) +} + +// InjectedSnapshot wraps a Snapshot with injections. +type InjectedSnapshot struct { + Snapshot + cfg *InjectionConfig +} + +// Get returns an error if cfg.getError is set. +func (t *InjectedSnapshot) Get(k Key) ([]byte, error) { + t.cfg.RLock() + defer t.cfg.RUnlock() + if t.cfg.getError != nil { + return nil, t.cfg.getError + } + return t.Snapshot.Get(k) +}