diff --git a/util/chunk/BUILD.bazel b/util/chunk/BUILD.bazel index 1ff89ec1d07f2..94132870f766c 100644 --- a/util/chunk/BUILD.bazel +++ b/util/chunk/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "pool.go", "row.go", "row_container.go", + "row_container_reader.go", ], importpath = "github.com/pingcap/tidb/util/chunk", visibility = ["//visibility:public"], diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 2bca7a62e17c5..3c40e3861665b 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -15,6 +15,7 @@ package chunk import ( + "bufio" "io" "os" "strconv" @@ -172,13 +173,37 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) { func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx)) chkSize := l.numRowsOfEachChunk[chkIdx] - for rowIdx := 0; rowIdx < chkSize; rowIdx++ { - _, _, err := l.GetRowAndAppendToChunk(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}, chk) - if err != nil { - return chk, err + + firstRowOffset, err := l.getOffset(uint32(chkIdx), 0) + if err != nil { + return nil, err + } + + // this channel is big enough and will never be blocked. + formatCh := make(chan rowInDisk, chkSize) + var formatChErr error + go func() { + defer close(formatCh) + + // If the row is small, a bufio can significantly improve the performance. As benchmark shows, it's still not bad + // for longer rows. + r := bufio.NewReader(l.dataFile.getSectionReader(firstRowOffset)) + format := rowInDisk{numCol: len(l.fieldTypes)} + for rowIdx := 0; rowIdx < chkSize; rowIdx++ { + _, err = format.ReadFrom(r) + if err != nil { + formatChErr = err + break + } + + formatCh <- format } + }() + + for format := range formatCh { + _, chk = format.toRow(l.fieldTypes, chk) } - return chk, nil + return chk, formatChErr } // GetRow gets a Row from the ListInDisk by RowPtr. diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 1217e86f31944..2b80fe644cd58 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -259,6 +259,7 @@ func testListInDisk(t *testing.T, concurrency int) { } func BenchmarkListInDisk_GetChunk(b *testing.B) { + b.StopTimer() numChk, numRow := 10, 1000 chks, fields := initChunks(numChk, numRow) l := NewListInDisk(fields) @@ -267,6 +268,7 @@ func BenchmarkListInDisk_GetChunk(b *testing.B) { _ = l.Add(chk) } + b.StartTimer() for i := 0; i < b.N; i++ { v := i % numChk _, _ = l.GetChunk(v) diff --git a/util/chunk/row_container_reader.go b/util/chunk/row_container_reader.go new file mode 100644 index 0000000000000..b96b20c6921c1 --- /dev/null +++ b/util/chunk/row_container_reader.go @@ -0,0 +1,163 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "context" + "runtime" + "sync" + + "github.com/pingcap/tidb/util/logutil" +) + +// RowContainerReader is a forward-only iterator for the row container. It provides an interface similar to other +// iterators, but it doesn't provide `ReachEnd` function and requires manually closing to release goroutine. +// +// It's recommended to use the following pattern to use it: +// +// for iter := NewRowContainerReader(rc); iter.Current() != iter.End(); iter.Next() { +// ... +// } +// iter.Close() +// if iter.Error() != nil { +// } +type RowContainerReader interface { + // Next returns the next Row. + Next() Row + + // Current returns the current Row. + Current() Row + + // End returns the invalid end Row. + End() Row + + // Error returns none-nil error if anything wrong happens during the iteration. + Error() error + + // Close closes the dumper + Close() +} + +var _ RowContainerReader = &rowContainerReader{} + +// rowContainerReader is a forward-only iterator for the row container +// It will spawn two goroutines for reading chunks from disk, and converting the chunk to rows. The row will only be sent +// to `rowCh` inside only after when the full chunk has been read, to avoid concurrently read/write to the chunk. +// +// TODO: record the memory allocated for the channel and chunks. +type rowContainerReader struct { + // context, cancel and waitgroup are used to stop and wait until all goroutine stops. + ctx context.Context + cancel func() + wg sync.WaitGroup + + rc *RowContainer + + currentRow Row + rowCh chan Row + + // this error will only be set by worker + err error +} + +// Next implements RowContainerReader +func (reader *rowContainerReader) Next() Row { + for row := range reader.rowCh { + reader.currentRow = row + return row + } + reader.currentRow = reader.End() + return reader.End() +} + +// Current implements RowContainerReader +func (reader *rowContainerReader) Current() Row { + return reader.currentRow +} + +// End implements RowContainerReader +func (*rowContainerReader) End() Row { + return Row{} +} + +// Error implements RowContainerReader +func (reader *rowContainerReader) Error() error { + return reader.err +} + +func (reader *rowContainerReader) initializeChannel() { + if reader.rc.NumChunks() == 0 { + reader.rowCh = make(chan Row, 1024) + } else { + assumeChunkSize := reader.rc.NumRowsOfChunk(0) + // To avoid blocking in sending to `rowCh` and don't start reading the next chunk, it'd be better to give it + // a buffer at least larger than a single chunk. Here it's allocated twice the chunk size to leave some margin. + reader.rowCh = make(chan Row, 2*assumeChunkSize) + } +} + +// Close implements RowContainerReader +func (reader *rowContainerReader) Close() { + reader.cancel() + reader.wg.Wait() +} + +func (reader *rowContainerReader) startWorker() { + reader.wg.Add(1) + go func() { + defer close(reader.rowCh) + defer reader.wg.Done() + + for chkIdx := 0; chkIdx < reader.rc.NumChunks(); chkIdx++ { + chk, err := reader.rc.GetChunk(chkIdx) + if err != nil { + reader.err = err + return + } + + for i := 0; i < chk.NumRows(); i++ { + select { + case reader.rowCh <- chk.GetRow(i): + case <-reader.ctx.Done(): + return + } + } + } + }() +} + +// NewRowContainerReader creates a forward only iterator for row container +func NewRowContainerReader(rc *RowContainer) *rowContainerReader { + ctx, cancel := context.WithCancel(context.Background()) + + reader := &rowContainerReader{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + + rc: rc, + } + reader.initializeChannel() + reader.startWorker() + reader.Next() + runtime.SetFinalizer(reader, func(reader *rowContainerReader) { + if reader.ctx.Err() == nil { + logutil.BgLogger().Warn("rowContainerReader is closed by finalizer") + reader.Close() + } + }) + + return reader +} diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index b0972c179388a..2160496813cfd 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -15,10 +15,14 @@ package chunk import ( + "crypto/rand" + rand2 "math/rand" + "sync" "testing" "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/memory" @@ -303,3 +307,187 @@ func TestActionBlocked(t *testing.T) { ac.Action(tracker) require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond) } + +func insertBytesRowsIntoRowContainer(t *testing.T, chkCount int, rowPerChk int) (*RowContainer, [][]byte) { + longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(4096).Build() + fields := []*types.FieldType{&longVarCharTyp} + + rc := NewRowContainer(fields, chkCount) + + allRows := [][]byte{} + // insert chunks + for i := 0; i < chkCount; i++ { + chk := NewChunkWithCapacity(fields, rowPerChk) + // insert rows for each chunk + for j := 0; j < rowPerChk; j++ { + length := rand2.Uint32() + randomBytes := make([]byte, length%4096) + _, err := rand.Read(randomBytes) + require.NoError(t, err) + + chk.AppendBytes(0, randomBytes) + allRows = append(allRows, randomBytes) + } + require.NoError(t, rc.Add(chk)) + } + + return rc, allRows +} + +func TestRowContainerReaderInDisk(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16) + rc.SpillToDisk() + + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 16; i++ { + for j := 0; j < 16; j++ { + row := reader.Current() + require.Equal(t, allRows[i*16+j], row.GetBytes(0)) + reader.Next() + } + } +} + +func TestCloseRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16) + rc.SpillToDisk() + + // read 8.5 of these chunks + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 8; i++ { + for j := 0; j < 16; j++ { + row := reader.Current() + require.Equal(t, allRows[i*16+j], row.GetBytes(0)) + reader.Next() + } + } + for j := 0; j < 8; j++ { + row := reader.Current() + require.Equal(t, allRows[8*16+j], row.GetBytes(0)) + reader.Next() + } +} + +func TestConcurrentSpillWithRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024) + + var wg sync.WaitGroup + // concurrently read and spill to disk + wg.Add(1) + go func() { + defer wg.Done() + reader := NewRowContainerReader(rc) + defer reader.Close() + + for i := 0; i < 16; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } + }() + rc.SpillToDisk() + wg.Wait() +} + +func TestReadAfterSpillWithRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024) + + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 8; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } + rc.SpillToDisk() + for i := 8; i < 16; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } +} + +func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 512) +} + +func BenchmarkRowContainerReaderInDiskWithRowSize1024(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 1024) +} + +func BenchmarkRowContainerReaderInDiskWithRowSize4096(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 4096) +} + +func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) { + b.StopTimer() + + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = b.TempDir() + }) + + longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(rowLength).Build() + fields := []*types.FieldType{&longVarCharTyp} + + randomBytes := make([]byte, rowLength) + _, err := rand.Read(randomBytes) + require.NoError(b, err) + + // create a row container which stores the data in disk + rc := NewRowContainer(fields, 1<<10) + rc.SpillToDisk() + + // insert `b.N * 1<<10` rows (`b.N` chunks) into the rc + for i := 0; i < b.N; i++ { + chk := NewChunkWithCapacity(fields, 1<<10) + for j := 0; j < 1<<10; j++ { + chk.AppendBytes(0, randomBytes) + } + + rc.Add(chk) + } + + reader := NewRowContainerReader(rc) + defer reader.Close() + b.StartTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1<<10; i++ { + reader.Next() + } + } + require.NoError(b, reader.Error()) +}