From d844ef281f9fef23334607c2169d0903605e338f Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 3 Feb 2023 14:36:13 +0800 Subject: [PATCH 1/8] fix --- executor/hash_table.go | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 2ba840d04fdc9..c387a776d6bb9 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -114,7 +114,8 @@ type hashRowContainer struct { memTracker *memory.Tracker // chkBuf buffer the data reads from the disk if rowContainer is spilled. - chkBuf *chunk.Chunk + chkBuf *chunk.Chunk + chkBufSizeForOneProbe int64 } func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { @@ -213,6 +214,15 @@ func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRo return matched, nil } +// SignalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered. +const SignalCheckpointForJoin int = 1 << 14 + +// RowSize is the size of Row. +const RowSize = int64(unsafe.Sizeof(chunk.Row{})) + +// RowPtrSize is the size of RowPtr. +const RowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{})) + // GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called // in multiple goroutines while each goroutine should keep its own // h and buf. @@ -225,7 +235,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = matched[:0] var matchedRow chunk.Row matchedPtrs = matchedPtrs[:0] - for _, ptr := range innerPtrs { + + // Some variables used for memTracker. + var ( + matchedDataSize = int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize + lastChunkBufPointer *chunk.Chunk = nil + memDelta int64 = 0 + ) + c.chkBuf = nil + c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*RowPtrSize) + defer c.memTracker.Consume(-int64(cap(innerPtrs)) * RowPtrSize) + c.chkBufSizeForOneProbe = 0 + + for i, ptr := range innerPtrs { matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf) if err != nil { return nil, nil, err @@ -239,6 +261,21 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk atomic.AddInt64(&c.stat.probeCollision, 1) continue } + if c.chkBuf != lastChunkBufPointer { + if lastChunkBufPointer != nil { + lastChunkSize := lastChunkBufPointer.MemoryUsage() + c.chkBufSizeForOneProbe += lastChunkSize + memDelta += lastChunkSize + } + lastChunkBufPointer = c.chkBuf + } + if i&SignalCheckpointForJoin == 0 { + // Trigger Consume for checking the OOM Action signal + memDelta += int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize - matchedDataSize + matchedDataSize = int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize + c.memTracker.Consume(memDelta + 1) + memDelta = 0 + } matched = append(matched, matchedRow) if needPtr { matchedPtrs = append(matchedPtrs, ptr) From 2cec13c5e17d16b911fb2465fc1fcc86b842d873 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 6 Feb 2023 12:37:33 +0800 Subject: [PATCH 2/8] test --- executor/join_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/executor/join_test.go b/executor/join_test.go index a5d5f6efc9fb5..6f56d0a18dc8e 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2892,3 +2892,20 @@ func TestOuterJoin(t *testing.T) { ), ) } + +func TestCartesianJoinPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1)") + tk.MustExec("set tidb_mem_quota_query = 1 << 30") + tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'") + tk.MustExec("set global tidb_enable_tmp_storage_on_oom = off;") + for i := 0; i < 14; i++ { + tk.MustExec("insert into t select * from t") + } + err := tk.QueryToErr("desc analyze select * from t t1, t t2, t t3, t t4, t t5, t t6;") + require.NotNil(t, err) + require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) +} From 9dae4188e472d20bc5017afa5f723c76e1fc53b4 Mon Sep 17 00:00:00 2001 From: Shenghui Wu Date: Mon, 6 Feb 2023 23:28:08 +0800 Subject: [PATCH 3/8] Update executor/hash_table.go Co-authored-by: HuaiyuXu --- executor/hash_table.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index c387a776d6bb9..ed6ed3de86153 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -214,14 +214,14 @@ func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRo return matched, nil } -// SignalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered. -const SignalCheckpointForJoin int = 1 << 14 +// signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered. +const signalCheckpointForJoin int = 1 << 14 -// RowSize is the size of Row. -const RowSize = int64(unsafe.Sizeof(chunk.Row{})) +// rowSize is the size of Row. +const rowSize = int64(unsafe.Sizeof(chunk.Row{})) -// RowPtrSize is the size of RowPtr. -const RowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{})) +// rowPtrSize is the size of RowPtr. +const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{})) // GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called // in multiple goroutines while each goroutine should keep its own From 6e26138850483c84f6bf8a02c9244438d9dc33e6 Mon Sep 17 00:00:00 2001 From: Shenghui Wu Date: Mon, 6 Feb 2023 23:28:15 +0800 Subject: [PATCH 4/8] Update executor/hash_table.go Co-authored-by: HuaiyuXu --- executor/hash_table.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index ed6ed3de86153..9e22b90ab11b0 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -261,14 +261,12 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk atomic.AddInt64(&c.stat.probeCollision, 1) continue } - if c.chkBuf != lastChunkBufPointer { - if lastChunkBufPointer != nil { + if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil { lastChunkSize := lastChunkBufPointer.MemoryUsage() c.chkBufSizeForOneProbe += lastChunkSize memDelta += lastChunkSize - } - lastChunkBufPointer = c.chkBuf } + lastChunkBufPointer = c.chkBuf if i&SignalCheckpointForJoin == 0 { // Trigger Consume for checking the OOM Action signal memDelta += int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize - matchedDataSize From 6c5cc0b47bea0a60e95cd7b8ec92830e0896b9b6 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 6 Feb 2023 23:36:35 +0800 Subject: [PATCH 5/8] fix build --- executor/hash_table.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 9e22b90ab11b0..ffc60443b1cde 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -238,13 +238,13 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk // Some variables used for memTracker. var ( - matchedDataSize = int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize + matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize lastChunkBufPointer *chunk.Chunk = nil memDelta int64 = 0 ) c.chkBuf = nil - c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*RowPtrSize) - defer c.memTracker.Consume(-int64(cap(innerPtrs)) * RowPtrSize) + c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize) + defer c.memTracker.Consume(-int64(cap(innerPtrs)) * rowPtrSize) c.chkBufSizeForOneProbe = 0 for i, ptr := range innerPtrs { @@ -267,10 +267,10 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk memDelta += lastChunkSize } lastChunkBufPointer = c.chkBuf - if i&SignalCheckpointForJoin == 0 { + if i&signalCheckpointForJoin == 0 { // Trigger Consume for checking the OOM Action signal - memDelta += int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize - matchedDataSize - matchedDataSize = int64(cap(matched))*RowSize + int64(cap(matchedPtrs))*RowPtrSize + memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize + matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize c.memTracker.Consume(memDelta + 1) memDelta = 0 } From 9fbb0adb55e6f414a077a42634dad985b80530b7 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Feb 2023 10:26:03 +0800 Subject: [PATCH 6/8] fmt --- executor/hash_table.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index ffc60443b1cde..d0a46d8f50303 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -262,9 +262,9 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk continue } if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil { - lastChunkSize := lastChunkBufPointer.MemoryUsage() - c.chkBufSizeForOneProbe += lastChunkSize - memDelta += lastChunkSize + lastChunkSize := lastChunkBufPointer.MemoryUsage() + c.chkBufSizeForOneProbe += lastChunkSize + memDelta += lastChunkSize } lastChunkBufPointer = c.chkBuf if i&signalCheckpointForJoin == 0 { From 4981e19494dd11f49637839bb0105422764b4e0b Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Feb 2023 14:22:01 +0800 Subject: [PATCH 7/8] fix --- executor/hash_table.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/hash_table.go b/executor/hash_table.go index d0a46d8f50303..44d3477560bfd 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -279,6 +279,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matchedPtrs = append(matchedPtrs, ptr) } } + c.memTracker.Consume(memDelta) return matched, matchedPtrs, err } From f1cbab875a470475d81e4e5ec104b262acb14fac Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Feb 2023 14:26:53 +0800 Subject: [PATCH 8/8] fix --- executor/hash_table.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 44d3477560bfd..57d4519be50b8 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -244,7 +244,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk ) c.chkBuf = nil c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize) - defer c.memTracker.Consume(-int64(cap(innerPtrs)) * rowPtrSize) + defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta) c.chkBufSizeForOneProbe = 0 for i, ptr := range innerPtrs { @@ -257,10 +257,6 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk if err != nil { return nil, nil, err } - if !ok { - atomic.AddInt64(&c.stat.probeCollision, 1) - continue - } if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil { lastChunkSize := lastChunkBufPointer.MemoryUsage() c.chkBufSizeForOneProbe += lastChunkSize @@ -274,12 +270,15 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk c.memTracker.Consume(memDelta + 1) memDelta = 0 } + if !ok { + atomic.AddInt64(&c.stat.probeCollision, 1) + continue + } matched = append(matched, matchedRow) if needPtr { matchedPtrs = append(matchedPtrs, ptr) } } - c.memTracker.Consume(memDelta) return matched, matchedPtrs, err }