diff --git a/executor/write.go b/executor/write.go index e1ba79ff37c70..c2297b8e8b140 100644 --- a/executor/write.go +++ b/executor/write.go @@ -301,8 +301,9 @@ func (e *DeleteExec) deleteSingleTable() error { } // If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode. batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() + batchSize := e.ctx.GetSessionVars().DMLBatchSize for { - if batchDelete && rowCount >= BatchDeleteSize { + if batchDelete && rowCount >= batchSize { if err := e.ctx.NewTxn(); err != nil { // We should return a special error for batch insert. return ErrBatchInsertFail.Gen("BatchDelete failed with error: %v", err) @@ -722,14 +723,6 @@ func (e *InsertExec) Schema() *expression.Schema { return expression.NewSchema() } -// BatchInsertSize is the batch size of auto-splitted insert data. -// This will be used when tidb_batch_insert is set to ON. -var BatchInsertSize = 20000 - -// BatchDeleteSize is the batch size of auto-splitted delete data. -// This will be used when tidb_batch_delete is set to ON. -var BatchDeleteSize = 20000 - // Next implements the Executor Next interface. func (e *InsertExec) Next() (Row, error) { if e.finished { @@ -752,11 +745,12 @@ func (e *InsertExec) Next() (Row, error) { // If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode. batchInsert := e.ctx.GetSessionVars().BatchInsert && !e.ctx.GetSessionVars().InTxn() + batchSize := e.ctx.GetSessionVars().DMLBatchSize txn := e.ctx.Txn() rowCount := 0 for _, row := range rows { - if batchInsert && rowCount >= BatchInsertSize { + if batchInsert && rowCount >= batchSize { if err := e.ctx.NewTxn(); err != nil { // We should return a special error for batch insert. return nil, ErrBatchInsertFail.Gen("BatchInsert failed with error: %v", err) diff --git a/executor/write_test.go b/executor/write_test.go index 4d641d13e20f4..36e2286549c48 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1061,17 +1061,11 @@ func makeLoadDataInfo(column int, specifiedColumns []string, ctx context.Context func (s *testSuite) TestBatchInsertDelete(c *C) { originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit) - originBatch := executor.BatchInsertSize - originDeleteBatch := executor.BatchDeleteSize defer func() { atomic.StoreUint64(&kv.TxnEntryCountLimit, originLimit) - executor.BatchInsertSize = originBatch - executor.BatchDeleteSize = originDeleteBatch }() // Set the limitation to a small value, make it easier to reach the limitation. atomic.StoreUint64(&kv.TxnEntryCountLimit, 100) - executor.BatchInsertSize = 50 - executor.BatchDeleteSize = 50 tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -1112,6 +1106,7 @@ func (s *testSuite) TestBatchInsertDelete(c *C) { c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue) r = tk.MustQuery("select count(*) from batch_insert;") r.Check(testkit.Rows("160")) + // for on duplicate key _, err = tk.Exec(`insert into batch_insert_on_duplicate select * from batch_insert_on_duplicate as tt on duplicate key update batch_insert_on_duplicate.id=batch_insert_on_duplicate.id+1000;`) @@ -1120,11 +1115,24 @@ func (s *testSuite) TestBatchInsertDelete(c *C) { r = tk.MustQuery("select count(*) from batch_insert;") r.Check(testkit.Rows("160")) - // Change to batch inset mode. + // Change to batch inset mode and batch size to 50. tk.MustExec("set @@session.tidb_batch_insert=1;") + tk.MustExec("set @@session.tidb_dml_batch_size=50;") tk.MustExec("insert into batch_insert (c) select * from batch_insert;") r = tk.MustQuery("select count(*) from batch_insert;") r.Check(testkit.Rows("320")) + + // Enlarge the batch size to 150 which is larger than the txn limitation (100). + // So the insert will meet error. + tk.MustExec("set @@session.tidb_dml_batch_size=150;") + _, err = tk.Exec("insert into batch_insert (c) select * from batch_insert;") + c.Assert(err, NotNil) + c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue) + r = tk.MustQuery("select count(*) from batch_insert;") + r.Check(testkit.Rows("320")) + // Set it back to 50. + tk.MustExec("set @@session.tidb_dml_batch_size=50;") + // for on duplicate key _, err = tk.Exec(`insert into batch_insert_on_duplicate select * from batch_insert_on_duplicate as tt on duplicate key update batch_insert_on_duplicate.id=batch_insert_on_duplicate.id+1000;`) @@ -1148,8 +1156,9 @@ func (s *testSuite) TestBatchInsertDelete(c *C) { c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue) r = tk.MustQuery("select count(*) from batch_insert;") r.Check(testkit.Rows("320")) - // Enable batch delete. + // Enable batch delete and set batch size to 50. tk.MustExec("set @@session.tidb_batch_delete=on;") + tk.MustExec("set @@session.tidb_dml_batch_size=50;") tk.MustExec("delete from batch_insert;") // Make sure that all rows are gone. r = tk.MustQuery("select count(*) from batch_insert;") diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 5afc6343447ef..a5d12d2a4ebba 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -222,6 +222,10 @@ type SessionVars struct { // BatchDelete indicates if we should split delete data into multiple batches. BatchDelete bool + // DMLBatchSize indicates the size of batches for DML. + // It will be used when BatchInsert or BatchDelete is on. + DMLBatchSize int + // MaxRowCountForINLJ defines max row count that the outer table of index nested loop join could be without force hint. MaxRowCountForINLJ int } @@ -246,6 +250,7 @@ func NewSessionVars() *SessionVars { IndexSerialScanConcurrency: DefIndexSerialScanConcurrency, DistSQLScanConcurrency: DefDistSQLScanConcurrency, MaxRowCountForINLJ: DefMaxRowCountForINLJ, + DMLBatchSize: DefDMLBatchSize, } } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 1555f6299bc04..7292211b0c3fb 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -616,6 +616,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)}, {ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)}, {ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)}, + {ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)}, {ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 6d6e3a6325cd4..8e26bf10a10d9 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -96,6 +96,12 @@ const ( // split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data. TiDBBatchDelete = "tidb_batch_delete" + // tidb_dml_batch_size is used to split the insert/delete data into small batches. + // It only takes effort when tidb_batch_insert/tidb_batch_delete is on. + // Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB. + // User could change it to a smaller one to avoid breaking the transaction size limitation. + TiDBDMLBatchSize = "tidb_dml_batch_size" + // tidb_max_row_count_for_inlj is used when do index nested loop join. // It controls the max row count of outer table when do index nested loop join without hint. // After the row count of the inner table is accurate, this variable will be removed. @@ -117,4 +123,5 @@ const ( DefBatchInsert = false DefBatchDelete = false DefCurretTS = 0 + DefDMLBatchSize = 20000 ) diff --git a/sessionctx/varsutil/varsutil.go b/sessionctx/varsutil/varsutil.go index 71fa08f2fb20d..38caf2b288120 100644 --- a/sessionctx/varsutil/varsutil.go +++ b/sessionctx/varsutil/varsutil.go @@ -161,6 +161,8 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da vars.BatchInsert = tidbOptOn(sVal) case variable.TiDBBatchDelete: vars.BatchDelete = tidbOptOn(sVal) + case variable.TiDBDMLBatchSize: + vars.DMLBatchSize = tidbOptPositiveInt(sVal, variable.DefDMLBatchSize) case variable.TiDBMaxRowCountForINLJ: vars.MaxRowCountForINLJ = tidbOptPositiveInt(sVal, variable.DefMaxRowCountForINLJ) case variable.TiDBCurrentTS: