Skip to content

Commit

Permalink
Merge branch 'main' into 0109-debug-cte
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 10, 2025
2 parents bfef2e3 + 442fe7d commit 542eb2a
Show file tree
Hide file tree
Showing 127 changed files with 2,656 additions and 16,842 deletions.
2 changes: 1 addition & 1 deletion pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
return err
}
if latestVersion.Version != currentCN.Version {
panic("BUG: current cn's version(" +
s.logger.Fatal("BUG: current cn's version(" +
currentCN.Version +
") must equal cluster latest version(" +
latestVersion.Version +
Expand Down
10 changes: 6 additions & 4 deletions pkg/bootstrap/service_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {
// cluster is upgrading to v1, only v1's cn can start up.
if !v.IsReady() {
if v.Version != final.Version {
panic(fmt.Sprintf("cannot upgrade to version %s, because version %s is in upgrading",
s.logger.Fatal(fmt.Sprintf("cannot upgrade to version %s, because version %s is in upgrading",
final.Version,
v.Version))
} else if v.VersionOffset != final.VersionOffset {
panic(fmt.Sprintf("cannot upgrade to version %s with versionOffset[%d], because version %s with versionOffset[%d] is in upgrading",
s.logger.Fatal(fmt.Sprintf("cannot upgrade to version %s with versionOffset[%d], because version %s with versionOffset[%d] is in upgrading",
final.Version,
final.VersionOffset,
v.Version,
Expand All @@ -183,7 +183,7 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {

// cluster is running at v1, cannot startup a old version to join cluster.
if v.IsReady() && versions.Compare(final.Version, v.Version) < 0 {
panic(fmt.Sprintf("cannot startup a old version %s to join cluster, current version is %s",
s.logger.Fatal(fmt.Sprintf("cannot startup a old version %s to join cluster, current version is %s",
final.Version,
v.Version))
}
Expand All @@ -193,6 +193,8 @@ func (s *service) doCheckUpgrade(ctx context.Context) error {
// 2: add upgrades from latest version to final version
checker := func() (bool, error) {
if v.Version == final.Version && v.VersionOffset >= final.VersionOffset {
// if the schema version has reached finalVersion, mark finalVersion Completed and return
s.upgrade.finalVersionCompleted.Store(true)
return true, nil
}

Expand Down Expand Up @@ -396,7 +398,7 @@ func (s *service) performUpgrade(
zap.String("final", final.Version))
return false, nil
default:
panic(fmt.Sprintf("BUG: invalid state %d", state))
s.logger.Fatal(fmt.Sprintf("BUG: invalid state %d", state))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/service_upgrade_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *service) MaybeUpgradeTenant(
return err
}
if latestVersion.Version != currentCN.Version {
panic("BUG: current cn's version(" +
s.logger.Fatal("BUG: current cn's version(" +
currentCN.Version +
") must equal cluster latest version(" +
latestVersion.Version +
Expand Down
5 changes: 3 additions & 2 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *service) getFinalVersionHandle() VersionHandle {
// Get the original version of the upgrade framework
func (s *service) getFounderVersionHandle() VersionHandle {
if len(s.handles) == 0 {
panic("Waring: no upgrade version handles available, please check the code")
s.logger.Fatal("Waring: no upgrade version handles available, please check the code")
}
return s.handles[0]
}
Expand All @@ -66,5 +66,6 @@ func (s *service) getVersionHandle(version string) VersionHandle {
return h
}
}
panic("missing upgrade handle for version: " + version)
s.logger.Fatal("missing upgrade handle for version: " + version)
return nil
}
2 changes: 1 addition & 1 deletion pkg/bootstrap/versions/upgrade_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ var CheckViewDefinition = func(txn executor.TxnExecutor, accountId uint32, schem
})

if loaded && n > 1 {
panic("BUG: Duplicate column names in table")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: Duplicate column names in table")
}
return loaded, view_def, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func GetTenantCreateVersionForUpdate(
return true
})
if version == "" {
panic(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
}
return version, nil
}
Expand All @@ -153,7 +153,7 @@ func UpgradeTenantVersion(
}
defer res.Close()
if res.AffectedRows != 1 {
panic(fmt.Sprintf("BUG: update tenant: %d failed with AffectedRows %d",
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: update tenant: %d failed with AffectedRows %d",
tenantID, res.AffectedRows))
}
return nil
Expand All @@ -178,7 +178,7 @@ func GetTenantVersion(
return true
})
if version == "" {
panic(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: missing tenant: %d", tenantID))
}
return version, nil
}
4 changes: 2 additions & 2 deletions pkg/bootstrap/versions/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func MustGetLatestReadyVersion(
return true
})
if version == "" {
panic("missing latest ready version")
getLogger(txn.Txn().TxnOptions().CN).Fatal("missing latest ready version")
}
return version, nil
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func GetVersionState(
return true
})
if loaded && n > 1 {
panic("BUG: missing version " + version)
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: missing version " + version)
}
return state, loaded, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func GetUpgradeVersions(
return nil, err
}
if len(values) == 0 && mustHave {
panic("BUG: missing version upgrade")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: missing version upgrade")
}
return values, nil
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func GetUpgradingTenantVersion(txn executor.TxnExecutor) (VersionUpgrade, bool,
return VersionUpgrade{}, false, nil
}
if len(values) != 1 {
panic("BUG: invalid version upgrade")
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: invalid version upgrade")
}
return values[0], true, nil
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func GetUpgradeVersionForUpdateByID(
return VersionUpgrade{}, err
}
if len(values) != 1 {
panic(fmt.Sprintf("BUG: can not get version upgrade by primary key: %d", id))
getLogger(txn.Txn().TxnOptions().CN).Fatal(fmt.Sprintf("BUG: can not get version upgrade by primary key: %d", id))
}
return values[0], nil
}
Expand Down
79 changes: 5 additions & 74 deletions pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,48 +879,11 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps
if !needUpdate {
return cached, nil
}
tableDefs, err := table.TableDefs(ctx)
if err != nil {
return nil, err
}
var partitionInfo *plan.PartitionByDef
for _, def := range tableDefs {
if partitionDef, ok := def.(*engine.PartitionDef); ok {
if partitionDef.Partitioned > 0 {
p := &plan.PartitionByDef{}
err = p.UnMarshalPartitionInfo(([]byte)(partitionDef.Partition))
if err != nil {
return nil, err
}
partitionInfo = p
}
break
}
}

var statsInfo *pb.StatsInfo
// This is a partition table.
if partitionInfo != nil {
statsInfo = plan2.NewStatsInfo()
for _, partitionTable := range partitionInfo.PartitionTableNames {
parCtx, parTable, err := tcc.getRelation(dbName, partitionTable, sub, snapshot)
if err != nil {
return cached, err
}

newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx)
parStats, err := parTable.Stats(newParCtx, true)
if err != nil {
return cached, err
}
statsInfo.Merge(parStats)
}
} else {
newCtx := perfcounter.AttachCalcTableStatsKey(ctx)
statsInfo, err = table.Stats(newCtx, true)
if err != nil {
return cached, err
}
newCtx := perfcounter.AttachCalcTableStatsKey(ctx)
statsInfo, err := table.Stats(newCtx, true)
if err != nil {
return cached, err
}

if statsInfo != nil {
Expand Down Expand Up @@ -948,47 +911,15 @@ func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string,
return nil, false
}

var partitionInfo *plan2.PartitionByDef
engineDefs, err := table.TableDefs(ctx)
if err != nil {
return nil, false
}
for _, def := range engineDefs {
if partitionDef, ok := def.(*engine.PartitionDef); ok {
if partitionDef.Partitioned > 0 {
p := &plan2.PartitionByDef{}
err = p.UnMarshalPartitionInfo(([]byte)(partitionDef.Partition))
if err != nil {
return nil, false
}
partitionInfo = p
}
}
}

second := time.Now().Unix()
var diff int64 = 3
if partitionInfo != nil {
diff = 30
}
if s.ApproxObjectNumber > 0 && second-s.TimeSecond < diff {
// do not call ApproxObjectsNum within a short time limit
return s, false
}
s.TimeSecond = second

approxNumObjects := 0
if partitionInfo != nil {
for _, PartitionTableName := range partitionInfo.PartitionTableNames {
_, ptable, err := tcc.getRelation(dbName, PartitionTableName, nil, snapshot)
if err != nil {
return nil, false
}
approxNumObjects += ptable.ApproxObjectsNum(ctx)
}
} else {
approxNumObjects = table.ApproxObjectsNum(ctx)
}
approxNumObjects := table.ApproxObjectsNum(ctx)
if approxNumObjects == 0 {
return nil, false
}
Expand Down
13 changes: 0 additions & 13 deletions pkg/pb/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,6 @@ func NewRemoveColumnReq(did, tid uint64, idx, seqnum uint32) *AlterTableReq {
}
}

func NewAddPartitionReq(did, tid uint64, partitionDef *plan.PartitionByDef) *AlterTableReq {
return &AlterTableReq{
DbId: did,
TableId: tid,
Kind: AlterKind_AddPartition,
Operation: &AlterTableReq_AddPartition{
AddPartition: &AlterTableAddPartition{
PartitionDef: partitionDef,
},
},
}
}

func NewRenameColumnReq(did, tid uint64, oldname, newname string, seqnum uint32) *AlterTableReq {
return &AlterTableReq{
DbId: did,
Expand Down
Loading

0 comments on commit 542eb2a

Please sign in to comment.