Skip to content

Commit

Permalink
*: Set the batch delete/insert size by session variable (pingcap#5413) (
Browse files Browse the repository at this point in the history
pingcap#5437)

We hardcode it to 20k. But if the row size is large. It will still break
the transaction size limitation. So we need to provide a way to set it
dynamicially.
  • Loading branch information
shenli authored and tiancaiamao committed Dec 18, 2017
1 parent a18f86c commit 51a446c
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 18 deletions.
14 changes: 4 additions & 10 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func (e *DeleteExec) deleteSingleTable() error {
}
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchSize := e.ctx.GetSessionVars().DMLBatchSize
for {
if batchDelete && rowCount >= BatchDeleteSize {
if batchDelete && rowCount >= batchSize {
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.Gen("BatchDelete failed with error: %v", err)
Expand Down Expand Up @@ -722,14 +723,6 @@ func (e *InsertExec) Schema() *expression.Schema {
return expression.NewSchema()
}

// BatchInsertSize is the batch size of auto-splitted insert data.
// This will be used when tidb_batch_insert is set to ON.
var BatchInsertSize = 20000

// BatchDeleteSize is the batch size of auto-splitted delete data.
// This will be used when tidb_batch_delete is set to ON.
var BatchDeleteSize = 20000

// Next implements the Executor Next interface.
func (e *InsertExec) Next() (Row, error) {
if e.finished {
Expand All @@ -752,11 +745,12 @@ func (e *InsertExec) Next() (Row, error) {

// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
batchInsert := e.ctx.GetSessionVars().BatchInsert && !e.ctx.GetSessionVars().InTxn()
batchSize := e.ctx.GetSessionVars().DMLBatchSize

txn := e.ctx.Txn()
rowCount := 0
for _, row := range rows {
if batchInsert && rowCount >= BatchInsertSize {
if batchInsert && rowCount >= batchSize {
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return nil, ErrBatchInsertFail.Gen("BatchInsert failed with error: %v", err)
Expand Down
25 changes: 17 additions & 8 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,17 +1061,11 @@ func makeLoadDataInfo(column int, specifiedColumns []string, ctx context.Context

func (s *testSuite) TestBatchInsertDelete(c *C) {
originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
originBatch := executor.BatchInsertSize
originDeleteBatch := executor.BatchDeleteSize
defer func() {
atomic.StoreUint64(&kv.TxnEntryCountLimit, originLimit)
executor.BatchInsertSize = originBatch
executor.BatchDeleteSize = originDeleteBatch
}()
// Set the limitation to a small value, make it easier to reach the limitation.
atomic.StoreUint64(&kv.TxnEntryCountLimit, 100)
executor.BatchInsertSize = 50
executor.BatchDeleteSize = 50

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1112,6 +1106,7 @@ func (s *testSuite) TestBatchInsertDelete(c *C) {
c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue)
r = tk.MustQuery("select count(*) from batch_insert;")
r.Check(testkit.Rows("160"))

// for on duplicate key
_, err = tk.Exec(`insert into batch_insert_on_duplicate select * from batch_insert_on_duplicate as tt
on duplicate key update batch_insert_on_duplicate.id=batch_insert_on_duplicate.id+1000;`)
Expand All @@ -1120,11 +1115,24 @@ func (s *testSuite) TestBatchInsertDelete(c *C) {
r = tk.MustQuery("select count(*) from batch_insert;")
r.Check(testkit.Rows("160"))

// Change to batch inset mode.
// Change to batch inset mode and batch size to 50.
tk.MustExec("set @@session.tidb_batch_insert=1;")
tk.MustExec("set @@session.tidb_dml_batch_size=50;")
tk.MustExec("insert into batch_insert (c) select * from batch_insert;")
r = tk.MustQuery("select count(*) from batch_insert;")
r.Check(testkit.Rows("320"))

// Enlarge the batch size to 150 which is larger than the txn limitation (100).
// So the insert will meet error.
tk.MustExec("set @@session.tidb_dml_batch_size=150;")
_, err = tk.Exec("insert into batch_insert (c) select * from batch_insert;")
c.Assert(err, NotNil)
c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue)
r = tk.MustQuery("select count(*) from batch_insert;")
r.Check(testkit.Rows("320"))
// Set it back to 50.
tk.MustExec("set @@session.tidb_dml_batch_size=50;")

// for on duplicate key
_, err = tk.Exec(`insert into batch_insert_on_duplicate select * from batch_insert_on_duplicate as tt
on duplicate key update batch_insert_on_duplicate.id=batch_insert_on_duplicate.id+1000;`)
Expand All @@ -1148,8 +1156,9 @@ func (s *testSuite) TestBatchInsertDelete(c *C) {
c.Assert(kv.ErrTxnTooLarge.Equal(err), IsTrue)
r = tk.MustQuery("select count(*) from batch_insert;")
r.Check(testkit.Rows("320"))
// Enable batch delete.
// Enable batch delete and set batch size to 50.
tk.MustExec("set @@session.tidb_batch_delete=on;")
tk.MustExec("set @@session.tidb_dml_batch_size=50;")
tk.MustExec("delete from batch_insert;")
// Make sure that all rows are gone.
r = tk.MustQuery("select count(*) from batch_insert;")
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ type SessionVars struct {
// BatchDelete indicates if we should split delete data into multiple batches.
BatchDelete bool

// DMLBatchSize indicates the size of batches for DML.
// It will be used when BatchInsert or BatchDelete is on.
DMLBatchSize int

// MaxRowCountForINLJ defines max row count that the outer table of index nested loop join could be without force hint.
MaxRowCountForINLJ int
}
Expand All @@ -246,6 +250,7 @@ func NewSessionVars() *SessionVars {
IndexSerialScanConcurrency: DefIndexSerialScanConcurrency,
DistSQLScanConcurrency: DefDistSQLScanConcurrency,
MaxRowCountForINLJ: DefMaxRowCountForINLJ,
DMLBatchSize: DefDMLBatchSize,
}
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)},
{ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)},
{ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)},
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
}

Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ const (
// split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data.
TiDBBatchDelete = "tidb_batch_delete"

// tidb_dml_batch_size is used to split the insert/delete data into small batches.
// It only takes effort when tidb_batch_insert/tidb_batch_delete is on.
// Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB.
// User could change it to a smaller one to avoid breaking the transaction size limitation.
TiDBDMLBatchSize = "tidb_dml_batch_size"

// tidb_max_row_count_for_inlj is used when do index nested loop join.
// It controls the max row count of outer table when do index nested loop join without hint.
// After the row count of the inner table is accurate, this variable will be removed.
Expand All @@ -117,4 +123,5 @@ const (
DefBatchInsert = false
DefBatchDelete = false
DefCurretTS = 0
DefDMLBatchSize = 20000
)
2 changes: 2 additions & 0 deletions sessionctx/varsutil/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da
vars.BatchInsert = tidbOptOn(sVal)
case variable.TiDBBatchDelete:
vars.BatchDelete = tidbOptOn(sVal)
case variable.TiDBDMLBatchSize:
vars.DMLBatchSize = tidbOptPositiveInt(sVal, variable.DefDMLBatchSize)
case variable.TiDBMaxRowCountForINLJ:
vars.MaxRowCountForINLJ = tidbOptPositiveInt(sVal, variable.DefMaxRowCountForINLJ)
case variable.TiDBCurrentTS:
Expand Down

0 comments on commit 51a446c

Please sign in to comment.