From 4d47aa3add01971dbfba95d49fc0ed00fb5fefee Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 14 Sep 2022 14:42:59 +0800 Subject: [PATCH 1/4] stmtctx: add mutex to protect stmtCache(#36159) (#36351) (#36389) close pingcap/tidb#36159 --- sessionctx/stmtctx/stmtctx.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 862a3c7dba6f8..01dc0b64f284b 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -179,7 +179,12 @@ type StatementContext struct { TaskMapBakTS uint64 // counter for // stmtCache is used to store some statement-related values. - stmtCache map[StmtCacheKey]interface{} + // add mutex to protect stmtCache concurrent access + // https://github.com/pingcap/tidb/issues/36159 + stmtCache struct { + mu sync.Mutex + data map[StmtCacheKey]interface{} + } // Map to store all CTE storages of current SQL. // Will clean up at the end of the execution. @@ -277,23 +282,29 @@ const ( // GetOrStoreStmtCache gets the cached value of the given key if it exists, otherwise stores the value. func (sc *StatementContext) GetOrStoreStmtCache(key StmtCacheKey, value interface{}) interface{} { - if sc.stmtCache == nil { - sc.stmtCache = make(map[StmtCacheKey]interface{}) + sc.stmtCache.mu.Lock() + defer sc.stmtCache.mu.Unlock() + if sc.stmtCache.data == nil { + sc.stmtCache.data = make(map[StmtCacheKey]interface{}) } - if _, ok := sc.stmtCache[key]; !ok { - sc.stmtCache[key] = value + if _, ok := sc.stmtCache.data[key]; !ok { + sc.stmtCache.data[key] = value } - return sc.stmtCache[key] + return sc.stmtCache.data[key] } // ResetInStmtCache resets the cache of given key. func (sc *StatementContext) ResetInStmtCache(key StmtCacheKey) { - delete(sc.stmtCache, key) + sc.stmtCache.mu.Lock() + defer sc.stmtCache.mu.Unlock() + delete(sc.stmtCache.data, key) } // ResetStmtCache resets all cached values. func (sc *StatementContext) ResetStmtCache() { - sc.stmtCache = make(map[StmtCacheKey]interface{}) + sc.stmtCache.mu.Lock() + defer sc.stmtCache.mu.Unlock() + sc.stmtCache.data = make(map[StmtCacheKey]interface{}) } // SQLDigest gets normalized and digest for provided sql. From a6f53b2e13bd44f2cc403c04d1a44b9845d3f877 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 14 Sep 2022 14:58:58 +0800 Subject: [PATCH 2/4] *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443) (#35770) close pingcap/tidb#35295 --- executor/aggregate_test.go | 40 ++++++++++++++++++++++ executor/builder.go | 10 ++++-- expression/aggregation/descriptor.go | 2 -- planner/core/physical_plans.go | 2 +- planner/core/rule_aggregation_push_down.go | 10 ++++++ planner/core/rule_eliminate_projection.go | 4 +-- planner/core/task.go | 22 ++++++++++-- 7 files changed, 80 insertions(+), 10 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 976775ce5dc4c..bd2861446c9a5 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1597,3 +1597,43 @@ func TestRandomPanicAggConsume(t *testing.T) { require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } } + +func TestIssue35295(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t100") + // This bug only happens on partition prune mode = 'static' + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + tk.MustExec(`CREATE TABLE t100 ( +ID bigint(20) unsigned NOT NULL AUTO_INCREMENT, +col1 int(10) NOT NULL DEFAULT '0' COMMENT 'test', +money bigint(20) NOT NULL COMMENT 'test', +logtime datetime NOT NULL COMMENT '记录时间', +PRIMARY KEY (ID,logtime) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='test' +PARTITION BY RANGE COLUMNS(logtime) ( +PARTITION p20220608 VALUES LESS THAN ("20220609"), +PARTITION p20220609 VALUES LESS THAN ("20220610"), +PARTITION p20220610 VALUES LESS THAN ("20220611"), +PARTITION p20220611 VALUES LESS THAN ("20220612"), +PARTITION p20220612 VALUES LESS THAN ("20220613"), +PARTITION p20220613 VALUES LESS THAN ("20220614"), +PARTITION p20220614 VALUES LESS THAN ("20220615"), +PARTITION p20220615 VALUES LESS THAN ("20220616"), +PARTITION p20220616 VALUES LESS THAN ("20220617"), +PARTITION p20220617 VALUES LESS THAN ("20220618"), +PARTITION p20220618 VALUES LESS THAN ("20220619"), +PARTITION p20220619 VALUES LESS THAN ("20220620"), +PARTITION p20220620 VALUES LESS THAN ("20220621"), +PARTITION p20220621 VALUES LESS THAN ("20220622"), +PARTITION p20220622 VALUES LESS THAN ("20220623"), +PARTITION p20220623 VALUES LESS THAN ("20220624"), +PARTITION p20220624 VALUES LESS THAN ("20220625") + );`) + tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-09 00:00:00');") + tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-10 00:00:00');") + tk.MustQuery("SELECT /*+STREAM_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20")) + tk.MustQuery("SELECT /*+HASH_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20")) +} diff --git a/executor/builder.go b/executor/builder.go index e609b54604a12..eae7d7f0e435c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1350,7 +1350,9 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + if v.IsFinalAgg() { + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + } } for _, aggDesc := range v.AggFuncs { if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 { @@ -1406,10 +1408,14 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu groupChecker: newVecGroupChecker(b.ctx, v.GroupByItems), aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)), } + if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + // Only do this for final agg, see issue #35295, #30923 + if v.IsFinalAgg() { + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + } } for i, aggDesc := range v.AggFuncs { aggFunc := aggfuncs.Build(b.ctx, aggDesc, i) diff --git a/expression/aggregation/descriptor.go b/expression/aggregation/descriptor.go index 8882b93ec05d7..57cd52bad052d 100644 --- a/expression/aggregation/descriptor.go +++ b/expression/aggregation/descriptor.go @@ -123,8 +123,6 @@ func (a *AggFuncDesc) Split(ordinal []int) (partialAggDesc, finalAggDesc *AggFun partialAggDesc.Mode = Partial1Mode } else if a.Mode == FinalMode { partialAggDesc.Mode = Partial2Mode - } else { - panic("Error happened during AggFuncDesc.Split, the AggFunctionMode is not CompleteMode or FinalMode.") } finalAggDesc = &AggFuncDesc{ Mode: FinalMode, // We only support FinalMode now in final phase. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 3d96dd71de13a..71862d2092374 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1027,7 +1027,7 @@ type basePhysicalAgg struct { MppPartitionCols []*property.MPPPartitionColumn } -func (p *basePhysicalAgg) isFinalAgg() bool { +func (p *basePhysicalAgg) IsFinalAgg() bool { if len(p.AggFuncs) > 0 { if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode { return true diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index 7e874369e6027..7f8f4fdbf6baf 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -404,6 +404,16 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl if pushedAgg == nil { return nil } + + // Update the agg mode for the pushed down aggregation. + for _, aggFunc := range pushedAgg.AggFuncs { + if aggFunc.Mode == aggregation.CompleteMode { + aggFunc.Mode = aggregation.Partial1Mode + } else if aggFunc.Mode == aggregation.FinalMode { + aggFunc.Mode = aggregation.Partial2Mode + } + } + newChildren := make([]LogicalPlan, 0, len(union.Children())) for _, child := range union.Children() { newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 0daaf4616185a..137b6ca87c976 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -48,14 +48,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { // passing down the aggregation mode to TiFlash. if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok { if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { - if physicalAgg.isFinalAgg() { + if physicalAgg.IsFinalAgg() { return false } } } if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok { if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { - if physicalAgg.isFinalAgg() { + if physicalAgg.IsFinalAgg() { return false } } diff --git a/planner/core/task.go b/planner/core/task.go index f70966ee5448c..18363c92721e5 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1712,7 +1712,15 @@ func BuildFinalModeAggregation( finalAggFunc.OrderByItems = byItems finalAggFunc.HasDistinct = aggFunc.HasDistinct - finalAggFunc.Mode = aggregation.CompleteMode + // In logical optimize phase, the Agg->PartitionUnion->TableReader may become + // Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation. + // So in the push down here, we need to add a new if-condition check: + // If the original agg mode is partial already, the finalAggFunc's mode become Partial2. + if aggFunc.Mode == aggregation.CompleteMode { + finalAggFunc.Mode = aggregation.CompleteMode + } else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode { + finalAggFunc.Mode = aggregation.Partial2Mode + } } else { if aggFunc.Name == ast.AggFuncGroupConcat && len(aggFunc.OrderByItems) > 0 { // group_concat can only run in one phase if it has order by items but without distinct property @@ -1789,7 +1797,15 @@ func BuildFinalModeAggregation( } } - finalAggFunc.Mode = aggregation.FinalMode + // In logical optimize phase, the Agg->PartitionUnion->TableReader may become + // Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation. + // So in the push down here, we need to add a new if-condition check: + // If the original agg mode is partial already, the finalAggFunc's mode become Partial2. + if aggFunc.Mode == aggregation.CompleteMode { + finalAggFunc.Mode = aggregation.FinalMode + } else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode { + finalAggFunc.Mode = aggregation.Partial2Mode + } } finalAggFunc.Args = args @@ -1855,7 +1871,7 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection { } // no avgs // for final agg, always add project due to in-compatibility between TiDB and TiFlash - if len(p.schema.Columns) == len(newSchema.Columns) && !p.isFinalAgg() { + if len(p.schema.Columns) == len(newSchema.Columns) && !p.IsFinalAgg() { return nil } // add remaining columns to exprs From fab67cca48a9752b097baf7c07f5a92c439aeb0c Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 15 Sep 2022 10:32:58 +0800 Subject: [PATCH 3/4] domain: sync the access of InfoSyncer.SessionManager (#33924) (#33943) close pingcap/tidb#33335 --- domain/infosync/info.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index dcaf12143126b..168030da890c7 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -25,6 +25,7 @@ import ( "path" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -91,12 +92,15 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA // InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down. type InfoSyncer struct { - etcdCli *clientv3.Client - info *ServerInfo - serverInfoPath string - minStartTS uint64 - minStartTSPath string - manager util2.SessionManager + etcdCli *clientv3.Client + info *ServerInfo + serverInfoPath string + minStartTS uint64 + minStartTSPath string + managerMu struct { + mu sync.RWMutex + util2.SessionManager + } session *concurrency.Session topologySession *concurrency.Session prometheusAddr string @@ -201,12 +205,16 @@ func (is *InfoSyncer) init(ctx context.Context, skipRegisterToDashboard bool) er // SetSessionManager set the session manager for InfoSyncer. func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) { - is.manager = manager + is.managerMu.mu.Lock() + defer is.managerMu.mu.Unlock() + is.managerMu.SessionManager = manager } // GetSessionManager get the session manager. func (is *InfoSyncer) GetSessionManager() util2.SessionManager { - return is.manager + is.managerMu.mu.RLock() + defer is.managerMu.mu.RUnlock() + return is.managerMu.SessionManager } func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager { @@ -586,11 +594,11 @@ func (is *InfoSyncer) RemoveMinStartTS() { // ReportMinStartTS reports self server min start timestamp to ETCD. func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { - if is.manager == nil { - // Server may not start in time. + sm := is.GetSessionManager() + if sm == nil { return } - pl := is.manager.ShowProcessList() + pl := sm.ShowProcessList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) From cd5db428300feb51fbde00adcf1878aedda37147 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 15 Sep 2022 11:13:00 +0800 Subject: [PATCH 4/4] server: a better way to handle killed connection (#32809) (#37834) close pingcap/tidb#24031, ref pingcap/tidb#29212 --- server/conn.go | 16 ++++++++++------ server/server.go | 8 +++++--- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/server/conn.go b/server/conn.go index 769fe7eb7e80f..f8fef4eafcbb3 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1067,12 +1067,16 @@ func (cc *clientConn) Run(ctx context.Context) { if err != nil { if terror.ErrorNotEqual(err, io.EOF) { if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() { - idleTime := time.Since(start) - logutil.Logger(ctx).Info("read packet timeout, close this connection", - zap.Duration("idle", idleTime), - zap.Uint64("waitTimeout", waitTimeout), - zap.Error(err), - ) + if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown { + logutil.Logger(ctx).Info("read packet timeout because of killed connection") + } else { + idleTime := time.Since(start) + logutil.Logger(ctx).Info("read packet timeout, close this connection", + zap.Duration("idle", idleTime), + zap.Uint64("waitTimeout", waitTimeout), + zap.Error(err), + ) + } } else { errStack := errors.ErrorStack(err) if !strings.Contains(errStack, "use of closed network connection") { diff --git a/server/server.go b/server/server.go index 2be15b20b7bf9..a81b4c3b74d6c 100644 --- a/server/server.go +++ b/server/server.go @@ -618,9 +618,6 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo { defer s.rwlock.RUnlock() rs := make(map[uint64]*util.ProcessInfo, len(s.clients)) for _, client := range s.clients { - if atomic.LoadInt32(&client.status) == connStatusWaitShutdown { - continue - } if pi := client.ctx.ShowProcess(); pi != nil { rs[pi.ID] = pi } @@ -693,6 +690,11 @@ func killConn(conn *clientConn) { if cancelFunc != nil { cancelFunc() } + if conn.bufReadConn != nil { + if err := conn.bufReadConn.SetReadDeadline(time.Now()); err != nil { + logutil.BgLogger().Warn("error setting read deadline for kill.", zap.Error(err)) + } + } } // KillAllConnections kills all connections when server is not gracefully shutdown.