Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-0703a64f76ba
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Sep 15, 2022
2 parents 16b9f3b + cd5db42 commit 56a20db
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 38 deletions.
30 changes: 19 additions & 11 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -91,12 +92,15 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA

// InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down.
type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
manager util2.SessionManager
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
managerMu struct {
mu sync.RWMutex
util2.SessionManager
}
session *concurrency.Session
topologySession *concurrency.Session
prometheusAddr string
Expand Down Expand Up @@ -201,12 +205,16 @@ func (is *InfoSyncer) init(ctx context.Context, skipRegisterToDashboard bool) er

// SetSessionManager set the session manager for InfoSyncer.
func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) {
is.manager = manager
is.managerMu.mu.Lock()
defer is.managerMu.mu.Unlock()
is.managerMu.SessionManager = manager
}

// GetSessionManager get the session manager.
func (is *InfoSyncer) GetSessionManager() util2.SessionManager {
return is.manager
is.managerMu.mu.RLock()
defer is.managerMu.mu.RUnlock()
return is.managerMu.SessionManager
}

func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager {
Expand Down Expand Up @@ -586,11 +594,11 @@ func (is *InfoSyncer) RemoveMinStartTS() {

// ReportMinStartTS reports self server min start timestamp to ETCD.
func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
if is.manager == nil {
// Server may not start in time.
sm := is.GetSessionManager()
if sm == nil {
return
}
pl := is.manager.ShowProcessList()
pl := sm.ShowProcessList()

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
Expand Down
40 changes: 40 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,3 +1597,43 @@ func TestRandomPanicAggConsume(t *testing.T) {
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
}

func TestIssue35295(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t100")
// This bug only happens on partition prune mode = 'static'
tk.MustExec("set @@tidb_partition_prune_mode = 'static'")
tk.MustExec(`CREATE TABLE t100 (
ID bigint(20) unsigned NOT NULL AUTO_INCREMENT,
col1 int(10) NOT NULL DEFAULT '0' COMMENT 'test',
money bigint(20) NOT NULL COMMENT 'test',
logtime datetime NOT NULL COMMENT '记录时间',
PRIMARY KEY (ID,logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='test'
PARTITION BY RANGE COLUMNS(logtime) (
PARTITION p20220608 VALUES LESS THAN ("20220609"),
PARTITION p20220609 VALUES LESS THAN ("20220610"),
PARTITION p20220610 VALUES LESS THAN ("20220611"),
PARTITION p20220611 VALUES LESS THAN ("20220612"),
PARTITION p20220612 VALUES LESS THAN ("20220613"),
PARTITION p20220613 VALUES LESS THAN ("20220614"),
PARTITION p20220614 VALUES LESS THAN ("20220615"),
PARTITION p20220615 VALUES LESS THAN ("20220616"),
PARTITION p20220616 VALUES LESS THAN ("20220617"),
PARTITION p20220617 VALUES LESS THAN ("20220618"),
PARTITION p20220618 VALUES LESS THAN ("20220619"),
PARTITION p20220619 VALUES LESS THAN ("20220620"),
PARTITION p20220620 VALUES LESS THAN ("20220621"),
PARTITION p20220621 VALUES LESS THAN ("20220622"),
PARTITION p20220622 VALUES LESS THAN ("20220623"),
PARTITION p20220623 VALUES LESS THAN ("20220624"),
PARTITION p20220624 VALUES LESS THAN ("20220625")
);`)
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-09 00:00:00');")
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-10 00:00:00');")
tk.MustQuery("SELECT /*+STREAM_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
tk.MustQuery("SELECT /*+HASH_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
}
10 changes: 8 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,9 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 {
Expand Down Expand Up @@ -1406,10 +1408,14 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
groupChecker: newVecGroupChecker(b.ctx, v.GroupByItems),
aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),
}

if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for i, aggDesc := range v.AggFuncs {
aggFunc := aggfuncs.Build(b.ctx, aggDesc, i)
Expand Down
2 changes: 0 additions & 2 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func (a *AggFuncDesc) Split(ordinal []int) (partialAggDesc, finalAggDesc *AggFun
partialAggDesc.Mode = Partial1Mode
} else if a.Mode == FinalMode {
partialAggDesc.Mode = Partial2Mode
} else {
panic("Error happened during AggFuncDesc.Split, the AggFunctionMode is not CompleteMode or FinalMode.")
}
finalAggDesc = &AggFuncDesc{
Mode: FinalMode, // We only support FinalMode now in final phase.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ type basePhysicalAgg struct {
MppPartitionCols []*property.MPPPartitionColumn
}

func (p *basePhysicalAgg) isFinalAgg() bool {
func (p *basePhysicalAgg) IsFinalAgg() bool {
if len(p.AggFuncs) > 0 {
if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode {
return true
Expand Down
10 changes: 10 additions & 0 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl
if pushedAgg == nil {
return nil
}

// Update the agg mode for the pushed down aggregation.
for _, aggFunc := range pushedAgg.AggFuncs {
if aggFunc.Mode == aggregation.CompleteMode {
aggFunc.Mode = aggregation.Partial1Mode
} else if aggFunc.Mode == aggregation.FinalMode {
aggFunc.Mode = aggregation.Partial2Mode
}
}

newChildren := make([]LogicalPlan, 0, len(union.Children()))
for _, child := range union.Children() {
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
Expand Down
22 changes: 19 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,15 @@ func BuildFinalModeAggregation(

finalAggFunc.OrderByItems = byItems
finalAggFunc.HasDistinct = aggFunc.HasDistinct
finalAggFunc.Mode = aggregation.CompleteMode
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
finalAggFunc.Mode = aggregation.CompleteMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
} else {
if aggFunc.Name == ast.AggFuncGroupConcat && len(aggFunc.OrderByItems) > 0 {
// group_concat can only run in one phase if it has order by items but without distinct property
Expand Down Expand Up @@ -1789,7 +1797,15 @@ func BuildFinalModeAggregation(
}
}

finalAggFunc.Mode = aggregation.FinalMode
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
finalAggFunc.Mode = aggregation.FinalMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
}

finalAggFunc.Args = args
Expand Down Expand Up @@ -1855,7 +1871,7 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection {
}
// no avgs
// for final agg, always add project due to in-compatibility between TiDB and TiFlash
if len(p.schema.Columns) == len(newSchema.Columns) && !p.isFinalAgg() {
if len(p.schema.Columns) == len(newSchema.Columns) && !p.IsFinalAgg() {
return nil
}
// add remaining columns to exprs
Expand Down
16 changes: 10 additions & 6 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,12 +1067,16 @@ func (cc *clientConn) Run(ctx context.Context) {
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() {
idleTime := time.Since(start)
logutil.Logger(ctx).Info("read packet timeout, close this connection",
zap.Duration("idle", idleTime),
zap.Uint64("waitTimeout", waitTimeout),
zap.Error(err),
)
if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
logutil.Logger(ctx).Info("read packet timeout because of killed connection")
} else {
idleTime := time.Since(start)
logutil.Logger(ctx).Info("read packet timeout, close this connection",
zap.Duration("idle", idleTime),
zap.Uint64("waitTimeout", waitTimeout),
zap.Error(err),
)
}
} else {
errStack := errors.ErrorStack(err)
if !strings.Contains(errStack, "use of closed network connection") {
Expand Down
8 changes: 5 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,6 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
defer s.rwlock.RUnlock()
rs := make(map[uint64]*util.ProcessInfo, len(s.clients))
for _, client := range s.clients {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
}
if pi := client.ctx.ShowProcess(); pi != nil {
rs[pi.ID] = pi
}
Expand Down Expand Up @@ -693,6 +690,11 @@ func killConn(conn *clientConn) {
if cancelFunc != nil {
cancelFunc()
}
if conn.bufReadConn != nil {
if err := conn.bufReadConn.SetReadDeadline(time.Now()); err != nil {
logutil.BgLogger().Warn("error setting read deadline for kill.", zap.Error(err))
}
}
}

// KillAllConnections kills all connections when server is not gracefully shutdown.
Expand Down
27 changes: 19 additions & 8 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@ type StatementContext struct {
TaskMapBakTS uint64 // counter for

// stmtCache is used to store some statement-related values.
stmtCache map[StmtCacheKey]interface{}
// add mutex to protect stmtCache concurrent access
// https://github.com/pingcap/tidb/issues/36159
stmtCache struct {
mu sync.Mutex
data map[StmtCacheKey]interface{}
}

// Map to store all CTE storages of current SQL.
// Will clean up at the end of the execution.
Expand Down Expand Up @@ -277,23 +282,29 @@ const (

// GetOrStoreStmtCache gets the cached value of the given key if it exists, otherwise stores the value.
func (sc *StatementContext) GetOrStoreStmtCache(key StmtCacheKey, value interface{}) interface{} {
if sc.stmtCache == nil {
sc.stmtCache = make(map[StmtCacheKey]interface{})
sc.stmtCache.mu.Lock()
defer sc.stmtCache.mu.Unlock()
if sc.stmtCache.data == nil {
sc.stmtCache.data = make(map[StmtCacheKey]interface{})
}
if _, ok := sc.stmtCache[key]; !ok {
sc.stmtCache[key] = value
if _, ok := sc.stmtCache.data[key]; !ok {
sc.stmtCache.data[key] = value
}
return sc.stmtCache[key]
return sc.stmtCache.data[key]
}

// ResetInStmtCache resets the cache of given key.
func (sc *StatementContext) ResetInStmtCache(key StmtCacheKey) {
delete(sc.stmtCache, key)
sc.stmtCache.mu.Lock()
defer sc.stmtCache.mu.Unlock()
delete(sc.stmtCache.data, key)
}

// ResetStmtCache resets all cached values.
func (sc *StatementContext) ResetStmtCache() {
sc.stmtCache = make(map[StmtCacheKey]interface{})
sc.stmtCache.mu.Lock()
defer sc.stmtCache.mu.Unlock()
sc.stmtCache.data = make(map[StmtCacheKey]interface{})
}

// SQLDigest gets normalized and digest for provided sql.
Expand Down

0 comments on commit 56a20db

Please sign in to comment.