Skip to content

Commit

Permalink
[opt] optimize point queries
Browse files Browse the repository at this point in the history
in join operators, don't build hashtable if build side has only 1 row
  • Loading branch information
aunjgr committed Nov 27, 2024
1 parent 8cb120e commit d6c79a9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 42 deletions.
116 changes: 84 additions & 32 deletions pkg/sql/colexec/dedupjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (dedupJoin *DedupJoin) build(analyzer process.Analyzer, proc *process.Proce
if dedupJoin.OnDuplicateAction != plan.Node_UPDATE {
ctr.matched.InitWithSize(ctr.batchRowCount)
} else {
ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()) + 1)
ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()))
}
}
return
Expand Down Expand Up @@ -194,10 +194,27 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
}
}

if ap.OnDuplicateAction != plan.Node_UPDATE {
if ap.OnDuplicateAction != plan.Node_UPDATE || ctr.mp.HashOnUnique() {
if ctr.matched.Count() == 0 {
ap.ctr.buf = ctr.batches
ctr.batches = nil
ap.ctr.buf = make([]*batch.Batch, len(ctr.batches))
for i := range ap.ctr.buf {
ap.ctr.buf[i] = batch.NewWithSize(len(ap.Result))
batSize := ctr.batches[i].RowCount()
for j, rp := range ap.Result {
if rp.Rel == 1 {
ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.RightTypes[rp.Pos])
if err := ap.ctr.buf[i].Vecs[j].UnionBatch(ctr.batches[i].Vecs[rp.Pos], 0, batSize, nil, proc.Mp()); err != nil {
return err
}
} else {
ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.LeftTypes[rp.Pos])
if err := vector.AppendMultiFixed(ap.ctr.buf[i].Vecs[j], 0, true, batSize, proc.Mp()); err != nil {
return err
}
}
}
ap.ctr.buf[i].SetRowCount(batSize)
}

return nil
}
Expand Down Expand Up @@ -293,7 +310,7 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp())

bitmapLen := uint64(ctr.matched.Len())
for i := uint64(1); i < bitmapLen; i++ {
for i := uint64(0); i < bitmapLen; i++ {
if ctr.matched.Contains(i) {
continue
}
Expand All @@ -309,41 +326,55 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
}
}

sels = ctr.mp.GetSels(i)
sels = ctr.mp.GetSels(i + 1)
idx1, idx2 := sels[0]/colexec.DefaultBatchSize, sels[0]%colexec.DefaultBatchSize
err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1)
if err != nil {
return err
}

for _, sel := range sels[1:] {
idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if len(sels) == 1 {
for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil {
return err
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}
}
}
} else {
err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1)
if err != nil {
return err
}

vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
for _, sel := range sels[1:] {
idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
return err
}
}

for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
}
vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if err != nil {
return err
}
}

for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}

for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -428,8 +459,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
return err
}

sels := ctr.mp.GetSels(vals[k])
for _, sel := range sels {
if ctr.mp.HashOnUnique() {
sel := vals[k] - 1
idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
Expand All @@ -447,6 +478,27 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
} else {
sels := ctr.mp.GetSels(vals[k])
for _, sel := range sels {
idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
return err
}

vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if err != nil {
return err
}
}

for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
}
}

for j, rp := range ap.Result {
Expand All @@ -461,7 +513,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
}
}

ctr.matched.Add(vals[k])
ctr.matched.Add(vals[k] - 1)
rowCntInc++
}
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/sql/colexec/hashmap_util/hashmap_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,15 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee

// if groupcount == inputrowcount, it means building hashmap on unique rows
// we can free sels now
if !hb.IsDedup {
if hb.keyWidth <= 8 {
if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) {
hb.MultiSels.Free()
}
} else {
if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) {
hb.MultiSels.Free()
}
if hb.keyWidth <= 8 {
if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) {
hb.MultiSels.Free()
}
} else {
if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) {
hb.MultiSels.Free()
}
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/sql/plan/bind_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(

// handle primary/unique key confliction
if builder.canSkipDedup(dmlCtx.tableDefs[0]) {
// load do not handle primary/unique key confliction
// restore/load do not handle primary/unique key confliction
for i, idxDef := range tableDef.Indexes {
if !idxDef.TableExist || skipUniqueIdx[i] {
continue
Expand Down

0 comments on commit d6c79a9

Please sign in to comment.