Skip to content

Commit

Permalink
importccl: add mysqloutfile converter
Browse files Browse the repository at this point in the history
Release note: none.
  • Loading branch information
dt committed May 17, 2018
1 parent 6062937 commit 9c3fa62
Show file tree
Hide file tree
Showing 14 changed files with 4,910 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func BenchmarkConvertRecord(b *testing.B) {
// start up workers.
for i := 0; i < runtime.NumCPU(); i++ {
group.Go(func() error {
return c.convertRecord(ctx, kvCh)
return c.convertRecord(ctx)
})
}
const batchSize = 500
Expand Down
74 changes: 43 additions & 31 deletions pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,38 @@ import (
)

type csvInputReader struct {
expectedCols int
kvCh chan kvBatch
recordCh chan csvRecord
batchSize int
batch csvRecord
opts roachpb.CSVOptions
tableDesc *sqlbase.TableDescriptor
expectedCols int
}

var _ inputConverter = &csvInputReader{}

func newCSVInputReader(
opts roachpb.CSVOptions, tableDesc *sqlbase.TableDescriptor, expectedCols int,
kvCh chan kvBatch, opts roachpb.CSVOptions, tableDesc *sqlbase.TableDescriptor, expectedCols int,
) *csvInputReader {
return &csvInputReader{
opts: opts,
kvCh: kvCh,
expectedCols: expectedCols,
tableDesc: tableDesc,
recordCh: make(chan csvRecord),
batchSize: 500,
}
}

func (c *csvInputReader) start(ctx context.Context, group *errgroup.Group, kvCh chan kvBatch) {
func (c *csvInputReader) start(ctx context.Context, group *errgroup.Group) {
group.Go(func() error {
sCtx, span := tracing.ChildSpan(ctx, "convertcsv")
defer tracing.FinishSpan(span)

defer close(kvCh)
defer close(c.kvCh)
return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error {
return c.convertRecord(ctx, kvCh)
return c.convertRecord(ctx)
})
})
}
Expand All @@ -57,14 +64,31 @@ func (c *csvInputReader) inputFinished() {
close(c.recordCh)
}

func (c *csvInputReader) flushBatch(ctx context.Context, finished bool, progFn func(finished bool) error) error {
// if the batch isn't empty, we need to flush it.
if len(c.batch.r) > 0 {
select {
case <-ctx.Done():
return ctx.Err()
case c.recordCh <- c.batch:
}
}
if progressErr := progFn(finished); progressErr != nil {
return progressErr
}
if !finished {
c.batch.r = make([][]string, 0, c.batchSize)
}
return nil
}

func (c *csvInputReader) readFile(
ctx context.Context,
input io.Reader,
inputIdx int32,
inputName string,
progressFn func(finished bool) error,
) error {
done := ctx.Done()
cr := csv.NewReader(input)
if c.opts.Comma != 0 {
cr.Comma = c.opts.Comma
Expand All @@ -73,36 +97,24 @@ func (c *csvInputReader) readFile(
cr.LazyQuotes = true
cr.Comment = c.opts.Comment

const batchSize = 500

batch := csvRecord{
c.batch = csvRecord{
file: inputName,
fileIndex: inputIdx,
rowOffset: 1,
r: make([][]string, 0, batchSize),
r: make([][]string, 0, c.batchSize),
}

var count int64
for i := 1; ; i++ {
record, err := cr.Read()
if err == io.EOF || len(batch.r) >= batchSize {
// if the batch isn't empty, we need to flush it.
if len(batch.r) > 0 {
select {
case <-done:
return ctx.Err()
case c.recordCh <- batch:
count += int64(len(batch.r))
}
finished := err == io.EOF
if finished || len(c.batch.r) >= c.batchSize {
if err := c.flushBatch(ctx, finished, progressFn); err != nil {
return err
}
if progressErr := progressFn(err == io.EOF); progressErr != nil {
return progressErr
}
if err == io.EOF {
break
}
batch.rowOffset = i
batch.r = make([][]string, 0, batchSize)
c.batch.rowOffset = i
}
if finished {
break
}
if err != nil {
return errors.Wrapf(err, "row %d: reading CSV record", i)
Expand All @@ -119,7 +131,7 @@ func (c *csvInputReader) readFile(
} else {
return errors.Errorf("row %d: expected %d fields, got %d", i, c.expectedCols, len(record))
}
batch.r = append(batch.r, record)
c.batch.r = append(c.batch.r, record)
}
return nil
}
Expand All @@ -133,8 +145,8 @@ type csvRecord struct {

// convertRecord converts CSV records into KV pairs and sends them on the
// kvCh chan.
func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- kvBatch) error {
conv, err := newRowConverter(ctx, c.tableDesc, kvCh)
func (c *csvInputReader) convertRecord(ctx context.Context) error {
conv, err := newRowConverter(ctx, c.tableDesc, c.kvCh)
if err != nil {
return err
}
Expand Down
175 changes: 175 additions & 0 deletions pkg/ccl/importccl/read_import_mysqlout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package importccl

import (
"bufio"
"context"
"io"
"unicode"

"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

type mysqloutfileReader struct {
csvInputReader
opts roachpb.MySQLOutfileOptions
}

var _ inputConverter = &mysqloutfileReader{}

func newMysqloutfileReader(
kvCh chan kvBatch,
opts roachpb.MySQLOutfileOptions,
tableDesc *sqlbase.TableDescriptor,
expectedCols int,
) *mysqloutfileReader {
return &mysqloutfileReader{
csvInputReader: *newCSVInputReader(kvCh, roachpb.CSVOptions{}, tableDesc, expectedCols),
opts: opts,
}
}

func (d *mysqloutfileReader) readFile(
ctx context.Context,
input io.Reader,
inputIdx int32,
inputName string,
progressFn func(finished bool) error,
) error {

var count int

// The current row being read.
var row []string
// the current field being read.
var field []byte

// If we have an escaping char defined, seeing it means the next char is to be
// treated as escaped -- usually that means literal but has some specific
// mappings defined as well.
var nextLiteral bool

// If we have an enclosing char defined, seeing it begins reading a field --
// which means we do not look for separators until we see the end of the field
// as indicated by the matching enclosing char.
var readingField bool

reader := bufio.NewReaderSize(input, 1024*64)

for {
c, w, err := reader.ReadRune()
finished := err == io.EOF

// First check that if we're done and everything looks good.
if finished {
if nextLiteral {
return errors.New("unmatched literal")
}
if readingField {
return errors.New("unmatched field enclosure")
}
// flush the last row if we have one.
if len(row) > 0 {
d.csvInputReader.batch.r = append(d.csvInputReader.batch.r, row)
}
}

// Check if we need to flush due to capacity or finished.
if finished || len(d.csvInputReader.batch.r) > d.csvInputReader.batchSize {
if err := d.csvInputReader.flushBatch(ctx, finished, progressFn); err != nil {
return err
}
d.csvInputReader.batch.rowOffset = count
}

if finished {
break
}

if err != nil {
return err
}
if c == unicode.ReplacementChar && w == 1 {
if err := reader.UnreadRune(); err != nil {
return err
}
raw, err := reader.ReadByte()
if err != nil {
return err
}
field = append(field, raw)
continue
}

// Do we need to check for escaping?
if d.opts.HasEscape {
if nextLiteral {
nextLiteral = false
// See https://dev.mysql.com/doc/refman/8.0/en/load-data.html.
switch c {
case '0':
field = append(field, byte(0))
case 'b':
field = append(field, '\b')
case 'n':
field = append(field, '\n')
case 'r':
field = append(field, '\r')
case 't':
field = append(field, '\t')
case 'Z':
field = append(field, byte(26))
case 'N':
field = append(field, '\\', 'N')
default:
field = append(field, string(c)...)
}
continue
}

if c == d.opts.Escape {
nextLiteral = true
continue
}
}

// If we have a defined enclose char, check if we're starting or stopping
// an enclosed field. Technically, the enclose char can be made mandatory,
// so potentially we could have a flag to enforce that, returning an error
// if it is missing, but we don't need to care for simply decoding.
if d.opts.Enclose != roachpb.MySQLOutfileOptions_Never && c == d.opts.Encloser {
readingField = !readingField
continue
}

// Are we done with the field, or even the whole row?
if !readingField && (c == d.opts.FieldSeparator || c == d.opts.RowSeparator) {
row = append(row, string(field))
field = field[:0]

if c == d.opts.RowSeparator {
if expected := d.csvInputReader.expectedCols; expected != len(row) {
return errors.Errorf("row %d: expected %d columns, go %d: %#v", count+1, expected, len(row), row)
}
d.csvInputReader.batch.r = append(d.csvInputReader.batch.r, row)
count++
row = make([]string, 0, len(row))
}
continue
}

field = append(field, string(c)...)
}

return nil
}
Loading

0 comments on commit 9c3fa62

Please sign in to comment.