Skip to content

Commit

Permalink
Merge branch 'master' into issue-33764
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Nov 2, 2022
2 parents 179fd35 + e6f020a commit aacf44c
Show file tree
Hide file tree
Showing 31 changed files with 183 additions and 48 deletions.
1 change: 1 addition & 0 deletions bindinfo/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
5 changes: 4 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,10 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool)
recordKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), lastHandle)
return recordKey.Next()
}
return taskRange.endKey.Next()
if taskRange.endInclude {
return taskRange.endKey.Next()
}
return taskRange.endKey
}

func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, rawRecord []byte) error {
Expand Down
14 changes: 14 additions & 0 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,3 +1067,17 @@ func TestAddIndexWithDupIndex(t *testing.T) {
err = tk.ExecToErr("alter table test_add_index_with_dup add index idx (a)")
require.ErrorIs(t, err, errors.Cause(err2))
}

func TestAddIndexUniqueFailOnDuplicate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a bigint primary key clustered, b int);")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 2;")
for i := 1; i <= 12; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tk.MustExec("insert into t values (0, 1);") // Insert a duplicate key.
tk.MustQuery("split table t by (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12);").Check(testkit.Rows("13 1"))
tk.MustGetErrCode("alter table t add unique index idx (b);", errno.ErrDupEntry)
}
1 change: 1 addition & 0 deletions ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
66 changes: 66 additions & 0 deletions docs/design/2022-09-22-global-memory-control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Proposal: Global Memory Control

* Authors: [wshwsh12](https://github.com/wshwsh12), [Xuhuaiyu](https://github.com/Xuhuaiyu)
* Tracking issue: [#37816](https://github.com/pingcap/tidb/issues/37816)

## Abstract

This proposes a design of how to control global memory of TiDB instance.

## Background

Currently, TiDB has a query-level memory control strategy `mem-quota-query`, which triggers Cancel when the memory usage of a single SQL exceeds `mem-quota-query`. However, there is currently no global memory control strategy.

When TiDB has multiple SQLs whose memory usage does not exceed `mem-quota-query` or memory tracking inaccurate, it will lead to high memory usage or even OOM.

Therefore, we need an observer to check whether the memory usage of the current system is normal. When there are some problems, try to control TiDB's memory no longer continue to grow, to reduce the risk of process crashes.

## Goal

- Control the TiDB execution memory within the system variable `tidb_server_memory_limit`.

## Design

New system variables:
- `tidb_server_memory_limit`: TiDB maintains the overall memory usage within `tidb_server_memory_limit`
- `tidb_server_memory_gc_trigger`: When TiDB memory usage reaches a certain percentage of `tidb_server_memory_limit`, try to take the initiative to trigger golang GC to release memory
- `tidb_server_memory_limit_sess_min_size`: The minimum memory of a session that can be killed by TiDB

We need to implement the following three functions to control the memory usage of TiDB:
1. Kill the SQL with the most memory usage in the current system, when `HeapInuse` is larger than `tidb_server_memory_limit`.
2. Take the initiative to trigger `runtime.GC()`, when `HeapInuse` is large than `tidb_server_memory_limit`*`tidb_server_memory_limit_gc_trigger`.
3. Introduce some memory tables to observe the memory status of the current system.

### Kill the SQL with the max memory usage

New variables:

1. Global variable `MemUsageTop1Tracker atomic.Pointer[Tracker]`: Indicates the Tracker with the largest memory usage.
2. The flag `NeedKill atomic.Bool` in the structure `Tracker`: Indicates whether the SQL for the current Tracker needs to be Killed.
3. `SessionID int64` in Structure Tracker: Indicates the Session ID corresponding to the current Tracker.

Implements:

#### How to get the current TiDB memory usage Top 1
When `Tracker.Consume()` calling, check the following logic. If all are satisfied, update the `MemUsageTop1Tracker`.
1. Is it a Session-level Tracker?
2. Whether the flag `NeedKill` is false, to avoid cancel the current SQL twice
3. Whether the memory usage exceeds the threshold `tidb_server_memory_limit_sess_min_size`(default 128MB, can be dynamically adjusted), can be candidate of the `MemUsageTop1Tracker`
4. Is the memory usage of the current Tracker greater than the current `MemUsageTop1Tracker`

#### How to Cancel the current top 1 memory usage and recycle memory in time
1. Create a goroutine that calls Golang's `ReadMemStat` interface in a 100 ms cycle. (Get the memory usage of the current TiDB instance)
2. If the `heapInuse` of the current instance is greater than `tidb_server_memory_limit`, set `MemUsageTop1Tracker`'s `NeedKill` flag. (Sends a Kill signal)
3. When the SQL call to `Tracker.Consume()`, check its own `NeedKill` flag. If it is true, trigger Panic and exit. (terminates the execution of SQL)
4. Get the `SessionID` from the tracker and continuously query its status, waiting for it to complete exited. When SQL successfully exited, explicitly trigger Golang GC to release memory. (Wait for SQL exited completely and release memory)

### Take the initiative to trigger GC

The inspiration for this design comes from uber-go-gc-tuner:
1. Use the Go1.19 `SetMemoryLimit` feature to set the soft limit to `tidb_server_memory_limit` * `tidb_server_memory_limit_gc_trigger` to ensure that GC can be triggered when reaching the certain threshold.
2. After each GC, check whether this GC is caused by memory limit. If it is caused by this, temporarily set memory limit to infinite, and then set it back to the specified threshold after 1 minute. In this way, the problem of frequent GC caused by `heapInUse` being larger than the soft limit can be avoided.

### Introduce some memory tables

Introduce `performance_schema.memory_usage` and `performance_schema.memory_usage_ops_history` to display the current system memory usage and historical operations.
This can be implemented by maintaining a set of global data, and reading and outputting directly from the global data when querying.
14 changes: 11 additions & 3 deletions executor/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,22 @@ func TestForeignKeyOnInsertIgnore(t *testing.T) {
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")

// Test for foreign key index is primary key.
tk.MustExec("CREATE TABLE t1 (i INT PRIMARY KEY);")
tk.MustExec("CREATE TABLE t2 (i INT, FOREIGN KEY (i) REFERENCES t1 (i));")
tk.MustExec("INSERT INTO t1 VALUES (1),(3);")
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1),(2),(3),(4);")
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1), (null), (1), (2),(3),(4);")
warning := "Warning 1452 Cannot add or update a child row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk_1` FOREIGN KEY (`i`) REFERENCES `t1` (`i`))"
tk.MustQuery("show warnings;").Check(testkit.Rows(warning, warning))
tk.MustQuery("select * from t2").Check(testkit.Rows("1", "3"))
tk.MustQuery("select * from t2 order by i").Check(testkit.Rows("<nil>", "1", "1", "3"))
// Test for foreign key index is non-unique key.
tk.MustExec("drop table t1,t2")
tk.MustExec("CREATE TABLE t1 (i INT, index(i));")
tk.MustExec("CREATE TABLE t2 (i INT, FOREIGN KEY (i) REFERENCES t1 (i));")
tk.MustExec("INSERT INTO t1 VALUES (1),(3);")
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1), (null), (1), (2), (3), (2);")
tk.MustQuery("show warnings;").Check(testkit.Rows(warning, warning))
tk.MustQuery("select * from t2 order by i").Check(testkit.Rows("<nil>", "1", "1", "3"))
}

func TestForeignKeyOnInsertOnDuplicateParentTableCheck(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,9 @@ func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementConte
rows[i].ignored = true
sc.AppendWarning(fkc.FailedErr)
fkc.checkRowsCache[string(k)] = true
} else {
fkc.checkRowsCache[string(k)] = false
}
fkc.checkRowsCache[string(k)] = false
if fkc.stats != nil {
fkc.stats.Keys++
}
Expand Down
1 change: 1 addition & 0 deletions executor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
callback := func(i int) int {
testDataMap.GenerateOutputIfNeeded()
Expand Down
17 changes: 17 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7768,6 +7768,23 @@ func TestJSONStorageFree(t *testing.T) {
require.Error(t, err, "[json:3140]Invalid JSON text: The document root must not be followed by other values.")
}

func TestIssue38736(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t0(c0 BOOL, c1 INT);")
tk.MustExec("CREATE TABLE t1 LIKE t0;")
tk.MustExec("CREATE definer='root'@'localhost' VIEW v0(c0) AS SELECT IS_IPV4(t0.c1) FROM t0, t1;")
tk.MustExec("INSERT INTO t0(c0, c1) VALUES (true, 0);")
tk.MustExec("INSERT INTO t1(c0, c1) VALUES (true, 2);")

// The filter is evaled as false.
tk.MustQuery("SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0);").Check(testkit.Rows())

// Also the filter is evaled as false.
tk.MustQuery("SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0) or v0.c0 > 0").Check(testkit.Rows())
}

func TestJSONExtractFromLast(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 1 addition & 0 deletions expression/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

callback := func(i int) int {
Expand Down
53 changes: 22 additions & 31 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *cowExprRef) Set(i int, changed bool, val Expression) {
return
}
c.new = make([]Expression, len(c.ref))
copy(c.new, c.ref[:i])
copy(c.new, c.ref)
c.new[i] = val
}

Expand Down Expand Up @@ -382,17 +382,6 @@ func SetExprColumnInOperand(expr Expression) Expression {
return expr
}

// ColumnSubstitute4PPD substitutes the columns in filter to expressions in select fields.
// Only used for predicate push down to projection. Some columns can not be substituted for some reasons.
// So we should return the bool value to indicate some expressions can not be pushed down.
// e.g. CREATE TABLE t3(c0 INT, primary key(c0));
// SELECT v2.c0 FROM (select 1 as c0 from t3) v2 WHERE (v2.c0)like(True);
// The cond `(v2.c0)like(True)` can not be substituted when the new collation enable. So we shouldn't push the cond down to the projection.
func ColumnSubstitute4PPD(expr Expression, schema *Schema, newExprs []Expression) (bool, Expression) {
substituted, _, resExpr := ColumnSubstituteImpl(expr, schema, newExprs, false)
return substituted, resExpr
}

// ColumnSubstitute substitutes the columns in filter to expressions in select fields.
// e.g. select * from (select b as a from t) k where a < 10 => select * from (select b as a from t where b < 10) k.
func ColumnSubstitute(expr Expression, schema *Schema, newExprs []Expression) Expression {
Expand Down Expand Up @@ -432,41 +421,43 @@ func ColumnSubstituteImpl(expr Expression, schema *Schema, newExprs []Expression
substituted := false
hasFail := false
if v.FuncName.L == ast.Cast {
newFunc := v.Clone().(*ScalarFunction)
substituted, hasFail, newFunc.GetArgs()[0] = ColumnSubstituteImpl(newFunc.GetArgs()[0], schema, newExprs, fail1Return)
var newArg Expression
substituted, hasFail, newArg = ColumnSubstituteImpl(v.GetArgs()[0], schema, newExprs, fail1Return)
if fail1Return && hasFail {
return substituted, hasFail, newFunc
return substituted, hasFail, v
}
if substituted {
// Workaround for issue https://github.com/pingcap/tidb/issues/28804
e := NewFunctionInternal(v.GetCtx(), v.FuncName.L, v.RetType, newFunc.GetArgs()...)
e := BuildCastFunction(v.GetCtx(), newArg, v.RetType)
e.SetCoercibility(v.Coercibility())
return true, false, e
}
return false, false, newFunc
return false, false, v
}
// cowExprRef is a copy-on-write util, args array allocation happens only
// when expr in args is changed
refExprArr := cowExprRef{v.GetArgs(), nil}
_, coll := DeriveCollationFromExprs(v.GetCtx(), v.GetArgs()...)
var tmpArgForCollCheck []Expression
if collate.NewCollationEnabled() {
tmpArgForCollCheck = make([]Expression, len(v.GetArgs()))
}
for idx, arg := range v.GetArgs() {
changed, hasFail, newFuncExpr := ColumnSubstituteImpl(arg, schema, newExprs, fail1Return)
if fail1Return && hasFail {
return changed, hasFail, v
changed, failed, newFuncExpr := ColumnSubstituteImpl(arg, schema, newExprs, fail1Return)
if fail1Return && failed {
return changed, failed, v
}
oldChanged := changed
if collate.NewCollationEnabled() {
if collate.NewCollationEnabled() && changed {
// Make sure the collation used by the ScalarFunction isn't changed and its result collation is not weaker than the collation used by the ScalarFunction.
if changed {
changed = false
tmpArgs := make([]Expression, 0, len(v.GetArgs()))
_ = append(append(append(tmpArgs, refExprArr.Result()[0:idx]...), refExprArr.Result()[idx+1:]...), newFuncExpr)
_, newColl := DeriveCollationFromExprs(v.GetCtx(), append(v.GetArgs(), newFuncExpr)...)
if coll == newColl {
changed = checkCollationStrictness(coll, newFuncExpr.GetType().GetCollate())
}
changed = false
copy(tmpArgForCollCheck, refExprArr.Result())
tmpArgForCollCheck[idx] = newFuncExpr
_, newColl := DeriveCollationFromExprs(v.GetCtx(), tmpArgForCollCheck...)
if coll == newColl {
changed = checkCollationStrictness(coll, newFuncExpr.GetType().GetCollate())
}
}
hasFail = hasFail || failed || oldChanged != changed
if fail1Return && oldChanged != changed {
// Only when the oldChanged is true and changed is false, we will get here.
// And this means there some dependency in this arg can be substituted with
Expand All @@ -481,7 +472,7 @@ func ColumnSubstituteImpl(expr Expression, schema *Schema, newExprs []Expression
}
}
if substituted {
return true, false, NewFunctionInternal(v.GetCtx(), v.FuncName.L, v.RetType, refExprArr.Result()...)
return true, hasFail, NewFunctionInternal(v.GetCtx(), v.FuncName.L, v.RetType, refExprArr.Result()...)
}
}
return false, false, expr
Expand Down
1 change: 1 addition & 0 deletions extension/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions planner/cascades/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

if err := goleak.Find(opts...); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,8 @@ func (*PushSelDownProjection) OnTransform(old *memo.ExprIter) (newExprs []*memo.
canBePushed := make([]expression.Expression, 0, len(sel.Conditions))
canNotBePushed := make([]expression.Expression, 0, len(sel.Conditions))
for _, cond := range sel.Conditions {
substituted, newFilter := expression.ColumnSubstitute4PPD(cond, projSchema, proj.Exprs)
if substituted && !expression.HasGetSetVarFunc(newFilter) {
substituted, hasFailed, newFilter := expression.ColumnSubstituteImpl(cond, projSchema, proj.Exprs, true)
if substituted && !hasFailed && !expression.HasGetSetVarFunc(newFilter) {
canBePushed = append(canBePushed, newFilter)
} else {
canNotBePushed = append(canNotBePushed, cond)
Expand Down
1 change: 1 addition & 0 deletions planner/core/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

callback := func(i int) int {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ func (p *LogicalProjection) PredicatePushDown(predicates []expression.Expression
}
}
for _, cond := range predicates {
substituted, newFilter := expression.ColumnSubstitute4PPD(cond, p.Schema(), p.Exprs)
if substituted && !expression.HasGetSetVarFunc(newFilter) {
substituted, hasFailed, newFilter := expression.ColumnSubstituteImpl(cond, p.Schema(), p.Exprs, true)
if substituted && !hasFailed && !expression.HasGetSetVarFunc(newFilter) {
canBePushed = append(canBePushed, newFilter)
} else {
canNotBePushed = append(canNotBePushed, cond)
Expand Down
1 change: 1 addition & 0 deletions planner/funcdep/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions planner/memo/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
2 changes: 1 addition & 1 deletion session/advisory_locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (a *advisoryLock) DecrReferences() {
a.referenceCount--
}

// References returns the current reference count for the advisory lock.
// ReferenceCount returns the current reference count for the advisory lock.
func (a *advisoryLock) ReferenceCount() int {
return a.referenceCount
}
Expand Down
Loading

0 comments on commit aacf44c

Please sign in to comment.