Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Fix crash during sort spill (#47581) #47625

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}
})
if e.rowChunks.NumRow() > 0 {
e.rowChunks.Sort()
err := e.rowChunks.Sort()
if err != nil {
return err
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
return nil
Expand Down
147 changes: 80 additions & 67 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (m *mutexForRowContainer) RUnlock() {
m.rLock.RUnlock()
}

type spillHelper interface {
SpillToDisk()
hasEnoughDataToSpill(t *memory.Tracker) bool
}

// RowContainer provides a place for many rows, so many that we might want to spill them into disk.
// nolint:structcheck
type RowContainer struct {
Expand Down Expand Up @@ -121,6 +126,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer {

// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *RowContainer) SpillToDisk() {
c.spillToDisk(nil)
}

func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool {
return true
}

func (c *RowContainer) spillToDisk(preSpillError error) {
c.m.Lock()
defer c.m.Unlock()
if c.alreadySpilled() {
Expand Down Expand Up @@ -153,6 +166,10 @@ func (c *RowContainer) SpillToDisk() {
panic("out of disk quota when spilling")
}
})
if preSpillError != nil {
c.m.records.spillError = preSpillError
return
}
for i := 0; i < n; i++ {
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
Expand Down Expand Up @@ -313,8 +330,9 @@ func (c *RowContainer) Close() (err error) {
func (c *RowContainer) ActionSpill() *SpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SpillDiskAction{
c: c,
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}
c: c,
baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}},
}
}
return c.actionSpill
}
Expand All @@ -323,23 +341,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction {
func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
c.actionSpill = &SpillDiskAction{
c: c,
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
baseSpillDiskAction: &baseSpillDiskAction{
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
}
return c.actionSpill
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
type baseSpillDiskAction struct {
memory.BaseOOMAction
c *RowContainer
m sync.Mutex
once sync.Once
cond spillStatusCond
Expand All @@ -350,6 +366,20 @@ type SpillDiskAction struct {
testWg sync.WaitGroup
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
c *RowContainer
*baseSpillDiskAction
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
a.action(t, a.c)
}

type spillStatusCond struct {
*sync.Cond
// status indicates different stages for the Action
Expand All @@ -367,38 +397,35 @@ const (
spilledYet
)

func (a *SpillDiskAction) setStatus(status spillStatus) {
func (a *baseSpillDiskAction) setStatus(status spillStatus) {
a.cond.L.Lock()
defer a.cond.L.Unlock()
a.cond.status = status
}

func (a *SpillDiskAction) getStatus() spillStatus {
func (a *baseSpillDiskAction) getStatus() spillStatus {
a.cond.L.Lock()
defer a.cond.L.Unlock()
return a.cond.status
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) {
a.m.Lock()
defer a.m.Unlock()

if a.getStatus() == notSpilled {
if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.SpillToDisk()
spillHelper.SpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.SpillToDisk()
go spillHelper.SpillToDisk()
})
return
}
Expand All @@ -418,20 +445,20 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
}

// Reset resets the status for SpillDiskAction.
func (a *SpillDiskAction) Reset() {
func (a *baseSpillDiskAction) Reset() {
a.m.Lock()
defer a.m.Unlock()
a.setStatus(notSpilled)
a.once = sync.Once{}
}

// GetPriority get the priority of the Action.
func (*SpillDiskAction) GetPriority() int64 {
func (*baseSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// WaitForTest waits all goroutine have gone.
func (a *SpillDiskAction) WaitForTest() {
func (a *baseSpillDiskAction) WaitForTest() {
a.testWg.Wait()
}

Expand Down Expand Up @@ -522,9 +549,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool {
}

// Sort inits pointers and sorts the records.
func (c *SortedRowContainer) Sort() {
func (c *SortedRowContainer) Sort() (ret error) {
c.ptrM.Lock()
defer c.ptrM.Unlock()
ret = nil
defer func() {
if r := recover(); r != nil {
ret = fmt.Errorf("%v", r)
}
}()
if c.ptrM.rowPtrs != nil {
return
}
Expand All @@ -539,12 +572,24 @@ func (c *SortedRowContainer) Sort() {
c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) {
if val.(bool) {
panic("sort meet error")
}
})
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
return
}

func (c *SortedRowContainer) sortAndSpillToDisk() {
c.Sort()
c.RowContainer.SpillToDisk()
// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *SortedRowContainer) SpillToDisk() {
err := c.Sort()
c.RowContainer.spillToDisk(err)
}

func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool {
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10
}

// Add appends a chunk into the SortedRowContainer.
Expand All @@ -571,8 +616,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) {
func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpill(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction,
}
}
return c.actionSpill
Expand All @@ -581,8 +626,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
// ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test.
func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpillForTest(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction,
}
return c.actionSpill
}
Expand All @@ -597,45 +642,13 @@ func (c *SortedRowContainer) GetMemTracker() *memory.Tracker {
// triggered.
type SortAndSpillDiskAction struct {
c *SortedRowContainer
*SpillDiskAction
*baseSpillDiskAction
}

// Action sends a signal to trigger sortAndSpillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
a.m.Lock()
defer a.m.Unlock()
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.sortAndSpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.sortAndSpillToDisk()
})
return
}

a.cond.L.Lock()
for a.cond.status == spilling {
a.cond.Wait()
}
a.cond.L.Unlock()

if !t.CheckExceed() {
return
}
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
a.action(t, a.c)
}

// WaitForTest waits all goroutine have gone.
Expand Down
36 changes: 36 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,42 @@ func TestPanicWhenSpillToDisk(t *testing.T) {
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func TestPanicDuringSortedRowContainerSpill(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
byItemsDesc := []bool{false}
keyColumns := []int{0}
keyCmpFuncs := []CompareFunc{cmpInt64}
sz := 20
rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs)

chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}
var tracker *memory.Tracker
var err error
tracker = rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer"))
}()
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err = rc.GetRow(RowPtr{})
require.EqualError(t, err, "sort meet error")
}

func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
}
Expand Down