Skip to content

Commit

Permalink
Merge branch 'main' into moremetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
reusee authored Jul 31, 2024
2 parents 5d4a8bd + e31b2ed commit 190ad27
Show file tree
Hide file tree
Showing 8 changed files with 8,576 additions and 8,544 deletions.
28 changes: 22 additions & 6 deletions pkg/frontend/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,12 +642,6 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (e
return moerr.NewInternalError(ctx, "pitr %s does not exist", pitrName)
}

// check privilege
// only sys account can restore other account's pitr
if len(srcAccountName) != 0 && tenantInfo.GetTenant() != sysAccountName {
return moerr.NewInternalError(ctx, "only sys account can restore other account's pitr")
}

// check if the database can be restore
if len(dbName) != 0 && needSkipDb(dbName) {
return moerr.NewInternalError(ctx, "database %s can not be restore", dbName)
Expand Down Expand Up @@ -787,6 +781,10 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (e

// restore according the restore level
switch restoreLevel {
case tree.RESTORELEVELCLUSTER:
if err = restoreToCluster(ctx, ses, bh, pitrName, ts); err != nil {
return
}
case tree.RESTORELEVELACCOUNT:
if err = restoreToAccountWithPitr(ctx, ses.GetService(), bh, pitrName, ts, fkTableMap, viewMap, tenantInfo.TenantID); err != nil {
return
Expand Down Expand Up @@ -1527,6 +1525,16 @@ func addTimeSpan(length int, unit string) (time.Time, error) {
func checkPitrValidOrNot(pitrRecord *pitrRecord, stmt *tree.RestorePitr, tenantInfo *TenantInfo) (err error) {
restoreLevel := stmt.Level
switch restoreLevel {
case tree.RESTORELEVELCLUSTER:
// check the level
// if the pitr level is account/ database/table, return err
if pitrRecord.level == tree.PITRLEVELACCOUNT.String() || pitrRecord.level == tree.PITRLEVELDATABASE.String() || pitrRecord.level == tree.PITRLEVELTABLE.String() {
return moerr.NewInternalErrorNoCtx("restore level %v is not allowed for cluster restore", pitrRecord.level)
}
// if the accout not sys account, return err
if tenantInfo.GetTenantID() != sysAccountID {
return moerr.NewInternalErrorNoCtx("account %s is not allowed to restore cluster level pitr %s", tenantInfo.GetTenant(), pitrRecord.pitrName)
}
case tree.RESTORELEVELACCOUNT:

if len(stmt.AccountName) == 0 { // restore self account
Expand All @@ -1537,8 +1545,16 @@ func checkPitrValidOrNot(pitrRecord *pitrRecord, stmt *tree.RestorePitr, tenantI
if pitrRecord.level == tree.PITRLEVELACCOUNT.String() && pitrRecord.accountId != uint64(tenantInfo.TenantID) {
return moerr.NewInternalErrorNoCtx("pitr %s is not allowed to restore account %v", pitrRecord.pitrName, tenantInfo.GetTenant())
}
// if the pitr level is cluster, the tenant must be sys account
if pitrRecord.level == tree.PITRLEVELCLUSTER.String() && tenantInfo.GetTenantID() != sysAccountID {
return moerr.NewInternalErrorNoCtx("account %s is not allowed to restore cluster level pitr %s", tenantInfo.GetTenant(), pitrRecord.pitrName)
}
} else {
// sys restore other account's pitr
// if the accout not sys account, return err
if tenantInfo.GetTenantID() != sysAccountID {
return moerr.NewInternalErrorNoCtx("account %s is not allowed to restore other account %s", tenantInfo.GetTenant(), string(stmt.AccountName))
}
// if the pitr level is cluster, the scource account can not be empty
if pitrRecord.level == tree.PITRLEVELCLUSTER.String() && len(stmt.SrcAccountName) == 0 {
return moerr.NewInternalErrorNoCtx("source account %s can not be empty when restore cluster level pitr %s", string(stmt.AccountName), pitrRecord.pitrName)
Expand Down
83 changes: 41 additions & 42 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,7 @@ func (c *Compile) compileExternScan(n *plan.Node) ([]*Scope, error) {

if len(fileList) == 0 {
ret := newScope(Normal)
ret.NodeInfo = getEngineNode(c)
ret.DataSource = &Source{isConst: true, node: n}

currentFirstFlag := c.anal.isFirst
Expand Down Expand Up @@ -1994,6 +1995,7 @@ func (c *Compile) compileTableFunction(n *plan.Node, ss []*Scope) []*Scope {

func (c *Compile) compileValueScan(n *plan.Node) ([]*Scope, error) {
ds := newScope(Normal)
ds.NodeInfo = getEngineNode(c)
ds.DataSource = &Source{isConst: true, node: n}
ds.NodeInfo = engine.Node{Addr: c.addr, Mcpu: 1}
ds.Proc = process.NewFromProc(c.proc, c.proc.Ctx, 0)
Expand Down Expand Up @@ -2457,10 +2459,6 @@ func (c *Compile) compileBroadcastJoin(node, left, right *plan.Node, ns []*plan.
leftTyps[i] = dupType(&expr.Typ)
}

if plan2.IsShuffleChildren(left, ns) {
probeScopes = c.mergeShuffleJoinScopeList(probeScopes)
}

switch node.JoinType {
case plan.Node_INNER:
rs = c.newBroadcastJoinScopeList(probeScopes, buildScopes, node)
Expand Down Expand Up @@ -3688,59 +3686,60 @@ func (c *Compile) newJoinScopeListWithBucket(rs, left, right []*Scope, n *plan.N
return rs
}

func (c *Compile) newMergeRemoteScopeByCN(ss []*Scope) []*Scope {
rs := make([]*Scope, 0, len(c.cnList))
for i := range c.cnList {
cn := c.cnList[i]
currentSS := make([]*Scope, 0, cn.Mcpu)
for j := range ss {
if isSameCN(ss[j].NodeInfo.Addr, cn.Addr) {
currentSS = append(currentSS, ss[j])
}
}
if len(currentSS) > 0 {
mergeScope := c.newMergeRemoteScope(currentSS, cn)
rs = append(rs, mergeScope)
}
}

return rs
}

func (c *Compile) newBroadcastJoinScopeList(probeScopes []*Scope, buildScopes []*Scope, n *plan.Node) []*Scope {
length := len(probeScopes)
rs := make([]*Scope, length)
idx := 0
for i := range probeScopes {
rs[i] = newScope(Remote)
rs := c.newMergeRemoteScopeByCN(probeScopes)
for i := range rs {
rs[i].IsJoin = true
rs[i].NodeInfo = probeScopes[i].NodeInfo
rs[i].BuildIdx = 1
if isSameCN(rs[i].NodeInfo.Addr, c.addr) {
idx = i
rs[i].NodeInfo.Mcpu = c.generateCPUNumber(ncpu, int(n.Stats.BlockNum))
rs[i].BuildIdx = len(rs[i].Proc.Reg.MergeReceivers)
w := &process.WaitRegister{
Ctx: rs[i].Proc.Ctx,
Ch: make(chan *process.RegisterMessage, 10),
}
rs[i].PreScopes = []*Scope{probeScopes[i]}
rs[i].Proc = process.NewFromProc(c.proc, c.proc.Ctx, 2)
probeScopes[i].setRootOperator(
connector.NewArgument().
WithReg(rs[i].Proc.Reg.MergeReceivers[0]))
rs[i].Proc.Reg.MergeReceivers = append(rs[i].Proc.Reg.MergeReceivers, w)
}

// all join's first flag will setting in newLeftScope and newRightScope
// so we set it to false now
if c.IsTpQuery() {
rs[0].PreScopes = append(rs[0].PreScopes, buildScopes[0])
} else {
c.anal.isFirst = false
mergeChildren := c.newMergeScope(buildScopes)

mergeChildren.setRootOperator(constructDispatch(1, rs, c.addr, n, false))
mergeChildren.IsEnd = true
rs[idx].PreScopes = append(rs[idx].PreScopes, mergeChildren)
}

for i := range rs {
mergeOp := merge.NewArgument()
rs[i].setRootOperator(mergeOp)
for i := range rs {
if isSameCN(rs[i].NodeInfo.Addr, c.addr) {
mergeBuild := buildScopes[0]
if len(buildScopes) > 1 {
mergeBuild = c.newMergeScope(buildScopes)
}
mergeBuild.setRootOperator(constructDispatch(rs[i].BuildIdx, rs, c.addr, n, false))
mergeBuild.IsEnd = true
rs[i].PreScopes = append(rs[i].PreScopes, mergeBuild)
break
}
}
}

return rs
}

func (c *Compile) mergeShuffleJoinScopeList(child []*Scope) []*Scope {
lenCN := len(c.cnList)
dop := len(child) / lenCN
mergeScope := make([]*Scope, 0, lenCN)
for i, n := range c.cnList {
start := i * dop
end := start + dop
ss := child[start:end]
mergeScope = append(mergeScope, c.newMergeRemoteScope(ss, n))
}
return mergeScope
}

func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) ([]*Scope, []*Scope) {
single := len(c.cnList) <= 1
if single {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/debugTools.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func debugShowScopes(ss []*Scope, gap int, rmp map[*process.WaitRegister]int) st
if ss[i].Proc != nil {
receiverStr = getReceiverStr(ss[i], ss[i].Proc.Reg.MergeReceivers)
}
str += fmt.Sprintf("Scope %d (Magic: %s, Receiver: %s): [", i+1, magicShow(ss[i].Magic), receiverStr)
str += fmt.Sprintf("Scope %d (Magic: %s, mcpu: %v, Receiver: %s): [", i+1, magicShow(ss[i].Magic), ss[i].NodeInfo.Mcpu, receiverStr)

vm.HandleAllOp(ss[i].RootOp, func(parentOp vm.Operator, op vm.Operator) error {
if op.GetOperatorBase().NumChildren() != 0 {
Expand Down
Loading

0 comments on commit 190ad27

Please sign in to comment.