Skip to content

Commit

Permalink
importinto: support split large csv file (#46480)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
D3Hunter authored Sep 1, 2023
1 parent 60b1019 commit 3036b6d
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 46 deletions.
17 changes: 13 additions & 4 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (

type blockParser struct {
// states for the lexer
reader PooledReader
buf []byte
reader PooledReader
// stores data that has NOT been parsed yet, it shares same memory as appendBuf.
buf []byte
// used to read data from the reader, the data will be moved to other buffers.
blockBuf []byte
isLastChunk bool

Expand All @@ -50,7 +52,11 @@ type blockParser struct {

rowPool *zeropool.Pool[[]types.Datum]
lastRow Row
// Current file offset.
// the reader position we have parsed, if the underlying reader is not
// a compressed file, it's the file position we have parsed too.
// this value may go backward when failed to read quoted field, but it's
// for printing error message, and the parser should not be used later,
// so it's ok, see readQuotedField.
pos int64

// cache
Expand Down Expand Up @@ -101,7 +107,7 @@ type Chunk struct {
// we estimate row-id range of the chunk using file-size divided by some factor(depends on column count)
// after estimation, we will rebase them for all chunks of this table in this instance,
// then it's rebased again based on all instances of parallel import.
// allocatable row-id is in range [PrevRowIDMax, RowIDMax).
// allocatable row-id is in range (PrevRowIDMax, RowIDMax].
// PrevRowIDMax will be increased during local encoding
PrevRowIDMax int64
RowIDMax int64
Expand All @@ -111,6 +117,9 @@ type Chunk struct {

// Row is the content of a row.
type Row struct {
// RowID is the row id of the row.
// as objects of this struct is reused, this RowID is increased when reading
// next row.
RowID int64
Row []types.Datum
Length int
Expand Down
4 changes: 4 additions & 0 deletions disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ go_test(
srcs = [
"dispatcher_test.go",
"subtask_executor_test.go",
"wrapper_test.go",
],
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 3,
deps = [
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//disttask/framework/proto",
"//disttask/framework/storage",
Expand Down
1 change: 1 addition & 0 deletions disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (*postProcessStepMinimalTask) String() string {
// Chunk records the chunk information.
type Chunk struct {
Path string
FileSize int64
Offset int64
EndOffset int64
PrevRowIDMax int64
Expand Down
6 changes: 5 additions & 1 deletion disttask/importinto/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
Path: chunk.Path,
Type: chunk.Type,
Compression: chunk.Compression,
FileSize: chunk.EndOffset,
FileSize: chunk.FileSize,
},
Chunk: mydump.Chunk{
PrevRowIDMax: chunk.PrevRowIDMax,
RowIDMax: chunk.RowIDMax,
Offset: chunk.Offset,
EndOffset: chunk.EndOffset,
},
Timestamp: chunk.Timestamp,
}
Expand All @@ -42,9 +44,11 @@ func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
func toChunk(chunkCheckpoint checkpoints.ChunkCheckpoint) Chunk {
return Chunk{
Path: chunkCheckpoint.FileMeta.Path,
FileSize: chunkCheckpoint.FileMeta.FileSize,
Offset: chunkCheckpoint.Chunk.Offset,
EndOffset: chunkCheckpoint.Chunk.EndOffset,
PrevRowIDMax: chunkCheckpoint.Chunk.PrevRowIDMax,
RowIDMax: chunkCheckpoint.Chunk.RowIDMax,
Type: chunkCheckpoint.FileMeta.Type,
Compression: chunkCheckpoint.FileMeta.Compression,
Timestamp: chunkCheckpoint.Timestamp,
Expand Down
46 changes: 46 additions & 0 deletions disttask/importinto/wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importinto

import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/stretchr/testify/require"
)

func TestChunkConvert(t *testing.T) {
chunkCheckpoint := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{
Path: "a.csv",
Offset: 11,
},
FileMeta: mydump.SourceFileMeta{
Path: "a.csv",
FileSize: 123,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
},
Chunk: mydump.Chunk{
PrevRowIDMax: 33,
RowIDMax: 789,
Offset: 11,
EndOffset: 123,
},
}
restored := toChunkCheckpoint(toChunk(chunkCheckpoint))
require.Equal(t, chunkCheckpoint, restored)
}
1 change: 1 addition & 0 deletions executor/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestImportIntoOptionsNegativeCase(t *testing.T) {
{OptionStr: "skip_rows=true", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: "split_file='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "split_file, skip_rows=2", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: "disk_quota='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "disk_quota='220MiBxxx'", Err: exeerrors.ErrInvalidOptionVal},
Expand Down
11 changes: 6 additions & 5 deletions executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ type deliveredRow struct {
offset int64
}

type deliverResult struct {
err error
}

type deliverKVBatch struct {
dataKVs kv.Pairs
indexKVs kv.Pairs
Expand Down Expand Up @@ -174,6 +170,10 @@ func (p *chunkProcessor) encodeLoop(ctx context.Context) error {
var err error
reachEOF := false
for !reachEOF {
readPos, _ := p.parser.Pos()
if readPos >= p.chunkInfo.Chunk.EndOffset {
break
}
var readDur, encodeDur time.Duration
canDeliver := false
rowBatch := make([]deliveredRow, 0, MinDeliverRowCnt)
Expand All @@ -183,6 +183,7 @@ func (p *chunkProcessor) encodeLoop(ctx context.Context) error {
for !canDeliver {
readDurStart := time.Now()
err = p.parser.ReadRow()
readPos, _ = p.parser.Pos()
// todo: we can implement a ScannedPos which don't return error, will change it later.
newOffset, _ = p.parser.ScannedPos()

Expand Down Expand Up @@ -217,7 +218,7 @@ func (p *chunkProcessor) encodeLoop(ctx context.Context) error {
// pebble cannot allow > 4.0G kv in one batch.
// we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt.
// so add this check.
if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt {
if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt || readPos == p.chunkInfo.Chunk.EndOffset {
canDeliver = true
kvSize = 0
}
Expand Down
38 changes: 27 additions & 11 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type LoadDataReaderInfo struct {
Remote *mydump.SourceFileMeta
}

// Plan describes the plan of LOAD DATA.
// Plan describes the plan of LOAD DATA and IMPORT INTO.
type Plan struct {
DBName string
DBID int64
Expand All @@ -170,7 +170,10 @@ type Plan struct {
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
Restrictive bool

SQLMode mysql.SQLMode
SQLMode mysql.SQLMode
// Charset is the charset of the data file when file is CSV or TSV.
// it might be nil when using LOAD DATA and no charset is specified.
// for IMPORT INTO, it is always non-nil.
Charset *string
ImportantSysVars map[string]string

Expand Down Expand Up @@ -230,11 +233,11 @@ type LoadDataController struct {
Table table.Table

// how input field(or input column) from data file is mapped, either to a column or variable.
// if there's NO column list clause in load data statement, then it's table's columns
// if there's NO column list clause in SQL statement, then it's table's columns
// else it's user defined list.
FieldMappings []*FieldMapping
// see InsertValues.InsertColumns
// todo: our behavior is different with mysql. such as for table t(a,b)
// Note: our behavior is different with mysql. such as for table t(a,b)
// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
// - ref columns in set clause is allowed in mysql, but not in tidb
Expand Down Expand Up @@ -463,7 +466,6 @@ func (e *LoadDataController) checkFieldParams() error {
}
// NOTE: IMPORT INTO also don't support user set empty LinesTerminatedBy or FieldsTerminatedBy,
// but it's check in initOptions.
// TODO: support lines terminated is "".
if len(e.LinesTerminatedBy) == 0 {
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty")
}
Expand Down Expand Up @@ -662,6 +664,15 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load
}
}

// when split-file is set, data file will be split into chunks of 256 MiB.
// skip_rows should be 0 or 1, we add this restriction to simplify skip_rows
// logic, so we only need to skip on the first chunk for each data file.
// CSV parser limit each row size to LargestEntryLimit(120M), the first row
// will NOT cross file chunk.
if p.SplitFile && p.IgnoreLines > 1 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled")
}

p.adjustOptions()
return nil
}
Expand Down Expand Up @@ -852,7 +863,7 @@ func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig {
return csvConfig
}

// InitDataFiles initializes the data store and load data files.
// InitDataFiles initializes the data store and files.
func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
u, err2 := storage.ParseRawURL(e.Path)
if err2 != nil {
Expand Down Expand Up @@ -1063,6 +1074,11 @@ func (e *LoadDataController) GetParser(
}
parser.SetLogger(litlog.Logger{Logger: logutil.Logger(ctx)})

return parser, nil
}

// HandleSkipNRows skips the first N rows of the data file.
func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error {
// handle IGNORE N LINES
ignoreOneLineFn := parser.ReadRow
if csvParser, ok := parser.(*mydump.CSVParser); ok {
Expand All @@ -1074,17 +1090,17 @@ func (e *LoadDataController) GetParser(

ignoreLineCnt := e.IgnoreLines
for ignoreLineCnt > 0 {
err = ignoreOneLineFn()
err := ignoreOneLineFn()
if err != nil {
if errors.Cause(err) == io.EOF {
return parser, nil
return nil
}
return nil, err
return err
}

ignoreLineCnt--
}
return parser, nil
return nil
}

func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo {
Expand Down Expand Up @@ -1174,5 +1190,5 @@ func GetMsgFromBRError(err error) string {
return raw[:len(raw)-len(berrMsg)-len(": ")]
}

// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
// TestSyncCh is used in unit test to synchronize the execution.
var TestSyncCh = make(chan struct{})
4 changes: 2 additions & 2 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestInitOptionsPositiveCase(t *testing.T) {
fieldsEscapedByOption+"='', "+
fieldsDefinedNullByOption+"='N', "+
linesTerminatedByOption+"='END', "+
skipRowsOption+"=3, "+
skipRowsOption+"=1, "+
diskQuotaOption+"='100gib', "+
checksumTableOption+"='optional', "+
threadOption+"=100000, "+
Expand All @@ -101,7 +101,7 @@ func TestInitOptionsPositiveCase(t *testing.T) {
require.Equal(t, "", plan.FieldsEscapedBy, sql)
require.Equal(t, []string{"N"}, plan.FieldNullDef, sql)
require.Equal(t, "END", plan.LinesTerminatedBy, sql)
require.Equal(t, uint64(3), plan.IgnoreLines, sql)
require.Equal(t, uint64(1), plan.IgnoreLines, sql)
require.Equal(t, config.ByteSize(100<<30), plan.DiskQuota, sql)
require.Equal(t, config.OpLevelOptional, plan.Checksum, sql)
require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs
Expand Down
23 changes: 20 additions & 3 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"sync"
"time"
"unicode/utf8"

"github.com/docker/go-units"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -270,9 +271,19 @@ func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.Chunk
if err != nil {
return nil, err
}
// todo: when support checkpoint, we should set pos too.
// WARN: parser.SetPos can only be set before we read anything now. should fix it before set pos.
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
if chunk.Chunk.Offset == 0 {
// if data file is split, only the first chunk need to do skip.
// see check in initOptions.
if err = ti.LoadDataController.HandleSkipNRows(parser); err != nil {
return nil, err
}
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
} else {
// if we reached here, the file must be an uncompressed CSV file.
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, err
}
}
return parser, nil
}

Expand Down Expand Up @@ -341,6 +352,12 @@ func (e *LoadDataController) PopulateChunks(ctx context.Context) (ecp map[int32]
IOWorkers: nil,
Store: e.dataStore,
TableMeta: tableMeta,

StrictFormat: e.SplitFile,
DataCharacterSet: *e.Charset,
DataInvalidCharReplace: string(utf8.RuneError),
ReadBlockSize: LoadDataReadBlockSize,
CSV: *e.GenerateCSVConfig(),
}
tableRegions, err2 := mydump.MakeTableRegions(ctx, dataDivideCfg)

Expand Down
3 changes: 3 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func (w *encodeWorker) processStream(
if err != nil {
return err
}
if err = w.controller.HandleSkipNRows(dataParser); err != nil {
return err
}
err = w.processOneStream(ctx, dataParser, outCh)
terror.Log(dataParser.Close())
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion tests/realtikvtest/importintotest4/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ go_test(
name = "importintotest4_test",
timeout = "moderate",
srcs = [
"dummy_test.go",
"main_test.go",
"split_file_test.go",
],
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/config",
"//config",
"//disttask/framework/storage",
"//disttask/importinto",
"//kv",
"//testkit",
"//tests/realtikvtest",
Expand Down
Loading

0 comments on commit 3036b6d

Please sign in to comment.