Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: reduce memory usage and GC overhead for hash join. #2957

Merged
merged 3 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ const (

func resultRowToRow(t table.Table, h int64, data []types.Datum, tableAsName *model.CIStr) *Row {
entry := &RowKeyEntry{
Handle: h,
Tbl: t,
TableAsName: tableAsName,
Handle: h,
Tbl: t,
}
if tableAsName != nil && tableAsName.L != "" {
entry.TableName = tableAsName.L
} else {
entry.TableName = t.Meta().Name.L
}
return &Row{Data: data, RowKeys: []*RowKeyEntry{entry}}
}
Expand Down
12 changes: 8 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type RowKeyEntry struct {
// Row key.
Handle int64
// Table alias name.
TableAsName *model.CIStr
TableName string
}

// Executor executes a query.
Expand Down Expand Up @@ -691,9 +691,13 @@ func (e *TableScanExec) getRow(handle int64) (*Row, error) {

// Put rowKey to the tail of record row.
rke := &RowKeyEntry{
Tbl: e.t,
Handle: handle,
TableAsName: e.asName,
Tbl: e.t,
Handle: handle,
}
if e.asName != nil && e.asName.L != "" {
rke.TableName = e.asName.L
} else {
rke.TableName = e.t.Meta().Name.L
}
row.RowKeys = append(row.RowKeys, rke)
return row, nil
Expand Down
91 changes: 75 additions & 16 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/types"
)

Expand All @@ -34,7 +35,7 @@ var (

// HashJoinExec implements the hash join algorithm.
type HashJoinExec struct {
hashTable map[string][]*Row
hashTable *mvmap.MVMap
smallHashKey []*expression.Column
bigHashKey []*expression.Column
smallExec Executor
Expand Down Expand Up @@ -66,6 +67,10 @@ type HashJoinExec struct {

// Channels for output.
resultCh chan *execResult

// rowKeyCache is used to store the table and table name from a row.
// Because every row has the same table name and table, we can use a single row key cache.
rowKeyCache []*RowKeyEntry
}

// hashJoinCtx holds the variables needed to do a hash join in one of many concurrent goroutines.
Expand Down Expand Up @@ -104,9 +109,9 @@ func makeJoinRow(a *Row, b *Row) *Row {
return ret
}

// getHashKey gets the hash key when given a row and hash columns.
// getJoinKey gets the hash key when given a row and hash columns.
// It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code.
func getHashKey(sc *variable.StatementContext, cols []*expression.Column, row *Row, targetTypes []*types.FieldType,
func getJoinKey(sc *variable.StatementContext, cols []*expression.Column, row *Row, targetTypes []*types.FieldType,
vals []types.Datum, bytes []byte) (bool, []byte, error) {
var err error
for i, col := range cols {
Expand Down Expand Up @@ -208,9 +213,10 @@ func (e *HashJoinExec) prepare() error {
e.wg.Add(1)
go e.fetchBigExec()

e.hashTable = make(map[string][]*Row)
e.hashTable = mvmap.NewMVMap()
e.cursor = 0
sc := e.ctx.GetSessionVars().StmtCtx
var buffer []byte
for {
row, err := e.smallExec.Next()
if err != nil {
Expand All @@ -231,18 +237,19 @@ func (e *HashJoinExec) prepare() error {
continue
}
}
hasNull, hashcode, err := getHashKey(sc, e.smallHashKey, row, e.targetTypes, e.hashJoinContexts[0].datumBuffer, nil)
hasNull, joinKey, err := getJoinKey(sc, e.smallHashKey, row, e.targetTypes, e.hashJoinContexts[0].datumBuffer, nil)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
if rows, ok := e.hashTable[string(hashcode)]; !ok {
e.hashTable[string(hashcode)] = []*Row{row}
} else {
e.hashTable[string(hashcode)] = append(rows, row)
buffer = buffer[:0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse the buffer memory.

buffer, err = e.encodeRow(buffer, row)
if err != nil {
return errors.Trace(err)
}
e.hashTable.Put(joinKey, buffer)
}

e.resultCh = make(chan *execResult, e.concurrency)
Expand All @@ -257,6 +264,54 @@ func (e *HashJoinExec) prepare() error {
return nil
}

func (e *HashJoinExec) encodeRow(b []byte, row *Row) ([]byte, error) {
numRowKeys := int64(len(row.RowKeys))
b = codec.EncodeVarint(b, numRowKeys)
for _, rowKey := range row.RowKeys {
b = codec.EncodeVarint(b, rowKey.Handle)
}
if numRowKeys > 0 && e.rowKeyCache == nil {
e.rowKeyCache = make([]*RowKeyEntry, len(row.RowKeys))
for i := 0; i < len(row.RowKeys); i++ {
rk := new(RowKeyEntry)
rk.Tbl = row.RowKeys[i].Tbl
rk.TableName = row.RowKeys[i].TableName
e.rowKeyCache[i] = rk
}
}
b, err := codec.EncodeValue(b, row.Data...)
return b, errors.Trace(err)
}

func (e *HashJoinExec) decodeRow(data []byte) (*Row, error) {
row := new(Row)
data, entryLen, err := codec.DecodeVarint(data)
if err != nil {
return nil, errors.Trace(err)
}
for i := 0; i < int(entryLen); i++ {
entry := new(RowKeyEntry)
data, entry.Handle, err = codec.DecodeVarint(data)
if err != nil {
return nil, errors.Trace(err)
}
entry.Tbl = e.rowKeyCache[i].Tbl
entry.TableName = e.rowKeyCache[i].TableName
row.RowKeys = append(row.RowKeys, entry)
}
values := make([]types.Datum, e.smallExec.Schema().Len())
err = codec.SetRawValues(data, values)
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.smallExec.Schema())
if err != nil {
return nil, errors.Trace(err)
}
row.Data = values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused with Row struct, it has a Data and a RowKeys fields, what's the difference?

return row, nil
}

func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
e.wg.Wait()
close(e.resultCh)
Expand Down Expand Up @@ -341,20 +396,25 @@ func (e *HashJoinExec) joinOneBigRow(ctx *hashJoinCtx, bigRow *Row, result *exec
// constructMatchedRows creates matching result rows from a row in the big table.
func (e *HashJoinExec) constructMatchedRows(ctx *hashJoinCtx, bigRow *Row) (matchedRows []*Row, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
hasNull, hashcode, err := getHashKey(sc, e.bigHashKey, bigRow, e.targetTypes, ctx.datumBuffer, ctx.hashKeyBuffer[0:0:cap(ctx.hashKeyBuffer)])
hasNull, joinKey, err := getJoinKey(sc, e.bigHashKey, bigRow, e.targetTypes, ctx.datumBuffer, ctx.hashKeyBuffer[0:0:cap(ctx.hashKeyBuffer)])
if err != nil {
return nil, errors.Trace(err)
}

if hasNull {
return
}
rows, ok := e.hashTable[string(hashcode)]
if !ok {
values := e.hashTable.Get(joinKey)
if len(values) == 0 {
return
}
// match eq condition
for _, smallRow := range rows {
for _, value := range values {
var smallRow *Row
smallRow, err = e.decodeRow(value)
if err != nil {
return nil, errors.Trace(err)
}
otherMatched := true
var matchedRow *Row
if e.leftSmall {
Expand All @@ -372,7 +432,6 @@ func (e *HashJoinExec) constructMatchedRows(ctx *hashJoinCtx, bigRow *Row) (matc
matchedRows = append(matchedRows, matchedRow)
}
}

return matchedRows, nil
}

Expand Down Expand Up @@ -662,7 +721,7 @@ func (e *HashSemiJoinExec) prepare() error {
continue
}
}
hasNull, hashcode, err := getHashKey(sc, e.smallHashKey, row, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
hasNull, hashcode, err := getJoinKey(sc, e.smallHashKey, row, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -683,7 +742,7 @@ func (e *HashSemiJoinExec) prepare() error {

func (e *HashSemiJoinExec) rowIsMatched(bigRow *Row) (matched bool, hasNull bool, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
hasNull, hashcode, err := getHashKey(sc, e.bigHashKey, bigRow, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
hasNull, hashcode, err := getJoinKey(sc, e.bigHashKey, bigRow, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return false, false, errors.Trace(err)
}
Expand Down
8 changes: 7 additions & 1 deletion executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,13 @@ func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CISt
continue
}
}
rowKeyEntry := &RowKeyEntry{Handle: h, Tbl: t, TableAsName: asName}
rowKeyEntry := &RowKeyEntry{Handle: h, Tbl: t}
if asName != nil && asName.L != "" {
rowKeyEntry.TableName = asName.L
} else {
rowKeyEntry.TableName = t.Meta().Name.L
}

row := &Row{Data: newData, RowKeys: []*RowKeyEntry{rowKeyEntry}}
us.addedRows = append(us.addedRows, row)
}
Expand Down
19 changes: 2 additions & 17 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,12 @@ func (e *DeleteExec) deleteMultiTables() error {
}

func isMatchTableName(entry *RowKeyEntry, tblMap map[int64][]string) bool {
var name string
if entry.TableAsName != nil {
name = entry.TableAsName.L
}
if len(name) == 0 {
name = entry.Tbl.Meta().Name.L
}

names, ok := tblMap[entry.Tbl.Meta().ID]
if !ok {
return false
}
for _, n := range names {
if n == name {
if n == entry.TableName {
return true
}
}
Expand Down Expand Up @@ -1202,16 +1194,9 @@ func (e *UpdateExec) fetchRows() error {
}

func getTableOffset(schema *expression.Schema, entry *RowKeyEntry) int {
t := entry.Tbl
var tblName string
if entry.TableAsName == nil || len(entry.TableAsName.L) == 0 {
tblName = t.Meta().Name.L
} else {
tblName = entry.TableAsName.L
}
for i := 0; i < schema.Len(); i++ {
s := schema.Columns[i]
if s.TblName.L == tblName {
if s.TblName.L == entry.TableName {
return i
}
}
Expand Down
Loading