Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#41081
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wshwsh12 authored and ti-chi-bot committed Feb 9, 2023
1 parent 5ccc10b commit a7cc0bc
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 0 deletions.
121 changes: 121 additions & 0 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ type hashRowContainer struct {
hashTable baseHashTable

rowContainer *chunk.RowContainer
<<<<<<< HEAD
=======
memTracker *memory.Tracker

// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
chkBufSizeForOneProbe int64
>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
Expand All @@ -104,6 +112,88 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
return &newHRC
}

<<<<<<< HEAD
=======
// GetMatchedRows get matched rows from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row) ([]chunk.Row, error) {
matchedRows, _, err := c.GetMatchedRowsAndPtrs(probeKey, probeRow, hCtx, matched, nil, false)
return matchedRows, err
}

func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildRowPos, needCheckProbeRowPos []int) ([]chunk.Row, error) {
// for NAAJ probe row with null, we should match them with all build rows.
var (
ok bool
err error
innerPtrs []chunk.RowPtr
)
c.hashTable.Iter(
func(_ uint64, e *entry) {
entryAddr := e
for entryAddr != nil {
innerPtrs = append(innerPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
})
matched = matched[:0]
if len(innerPtrs) == 0 {
return matched, nil
}
// all built bucket rows come from hash table, their bitmap are all nil (doesn't contain any null). so
// we could only use the probe null bits to filter valid rows.
if probeKeyNullBits != nil && len(probeHCtx.naKeyColIdx) > 1 {
// if len(probeHCtx.naKeyColIdx)=1
// that means the NA-Join probe key is directly a (null) <-> (fetch all buckets), nothing to do.
// else like
// (null, 1, 2), we should use the not-null probe bit to filter rows. Only fetch rows like
// ( ? , 1, 2), that exactly with value as 1 and 2 in the second and third join key column.
needCheckProbeRowPos = needCheckProbeRowPos[:0]
needCheckBuildRowPos = needCheckBuildRowPos[:0]
keyColLen := len(c.hCtx.naKeyColIdx)
for i := 0; i < keyColLen; i++ {
// since all bucket is from hash table (Not Null), so the buildSideNullBits check is eliminated.
if probeKeyNullBits.UnsafeIsSet(i) {
continue
}
needCheckBuildRowPos = append(needCheckBuildRowPos, c.hCtx.naKeyColIdx[i])
needCheckProbeRowPos = append(needCheckProbeRowPos, probeHCtx.naKeyColIdx[i])
}
}
var mayMatchedRow chunk.Row
for _, ptr := range innerPtrs {
mayMatchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
if probeKeyNullBits != nil && len(probeHCtx.naKeyColIdx) > 1 {
// check the idxs-th value of the join columns.
ok, err = codec.EqualChunkRow(c.sc, mayMatchedRow, c.hCtx.allTypes, needCheckBuildRowPos, probeSideRow, probeHCtx.allTypes, needCheckProbeRowPos)
if err != nil {
return nil, err
}
if !ok {
continue
}
// once ok. just append the (maybe) valid build row for latter other conditions check if any.
}
matched = append(matched, mayMatchedRow)
}
return matched, nil
}

// signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered.
const signalCheckpointForJoin int = 1 << 14

// rowSize is the size of Row.
const rowSize = int64(unsafe.Sizeof(chunk.Row{}))

// rowPtrSize is the size of RowPtr.
const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))

>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
Expand All @@ -114,9 +204,27 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
}
matched = make([]chunk.Row, 0, len(innerPtrs))
var matchedRow chunk.Row
<<<<<<< HEAD
matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs))
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
=======
matchedPtrs = matchedPtrs[:0]

// Some variables used for memTracker.
var (
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
lastChunkBufPointer *chunk.Chunk = nil
memDelta int64 = 0
)
c.chkBuf = nil
c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize)
defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta)
c.chkBufSizeForOneProbe = 0

for i, ptr := range innerPtrs {
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
if err != nil {
return
}
Expand All @@ -125,6 +233,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
if err != nil {
return
}
if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
lastChunkSize := lastChunkBufPointer.MemoryUsage()
c.chkBufSizeForOneProbe += lastChunkSize
memDelta += lastChunkSize
}
lastChunkBufPointer = c.chkBuf
if i&signalCheckpointForJoin == 0 {
// Trigger Consume for checking the OOM Action signal
memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
c.memTracker.Consume(memDelta + 1)
memDelta = 0
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
continue
Expand Down
17 changes: 17 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2728,3 +2728,20 @@ func (s *testSuiteJoinSerial) TestIssue31129(c *C) {
c.Assert(failpoint.Disable(fpName1), IsNil)
c.Assert(failpoint.Disable(fpName2), IsNil)
}

func TestCartesianJoinPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("set tidb_mem_quota_query = 1 << 30")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
tk.MustExec("set global tidb_enable_tmp_storage_on_oom = off;")
for i := 0; i < 14; i++ {
tk.MustExec("insert into t select * from t")
}
err := tk.QueryToErr("desc analyze select * from t t1, t t2, t t3, t t4, t t5, t t6;")
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}

0 comments on commit a7cc0bc

Please sign in to comment.