Skip to content

Commit

Permalink
Merge #38451
Browse files Browse the repository at this point in the history
38451: ccl: Move rowConverter to OSS r=adityamaru27 a=adityamaru27

`rowConverter` was previosuly in the `importccl` package. This change
moves it into its own file in `pkg/sql`.

This change is setup for the new CTAS processor which will require
the `RowConverter` to stream KVs, which can then be written to the
newly created table via AddSSTable.

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru27 committed Jun 26, 2019
2 parents d443002 + b185ed1 commit ac1900a
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 235 deletions.
26 changes: 1 addition & 25 deletions pkg/ccl/importccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func insertStmtToKVs(
return errors.Errorf("load insert: expected VALUES clause: %q", stmt)
}

b := inserter(f)
b := sql.Inserter(f)
computedIVarContainer := sqlbase.RowIndexedVarContainer{
Mapping: ri.InsertColIDtoRowIndex,
Cols: tableDesc.Columns,
Expand Down Expand Up @@ -330,30 +330,6 @@ func insertStmtToKVs(
return nil
}

type inserter func(roachpb.KeyValue)

func (i inserter) CPut(key, value, expValue interface{}) {
panic("unimplemented")
}

func (i inserter) Del(key ...interface{}) {
panic("unimplemented")
}

func (i inserter) Put(key, value interface{}) {
i(roachpb.KeyValue{
Key: *key.(*roachpb.Key),
Value: *value.(*roachpb.Value),
})
}

func (i inserter) InitPut(key, value interface{}, failOnTombstones bool) {
i(roachpb.KeyValue{
Key: *key.(*roachpb.Key),
Value: *value.(*roachpb.Value),
})
}

func writeSST(
ctx context.Context,
backup *backupccl.BackupDescriptor,
Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -161,34 +162,34 @@ func (c *csvInputReader) convertRecordWorker(ctx context.Context) error {
// Create a new evalCtx per converter so each go routine gets its own
// collationenv, which can't be accessed in parallel.
evalCtx := c.evalCtx.Copy()
conv, err := newRowConverter(c.tableDesc, evalCtx, c.kvCh)
conv, err := sql.NewRowConverter(c.tableDesc, evalCtx, c.kvCh)
if err != nil {
return err
}
if conv.evalCtx.SessionData == nil {
if conv.EvalCtx.SessionData == nil {
panic("uninitialized session data")
}

for batch := range c.recordCh {
for batchIdx, record := range batch.r {
rowNum := int64(batch.rowOffset + batchIdx)
for i, v := range record {
col := conv.visibleCols[i]
col := conv.VisibleCols[i]
if c.opts.NullEncoding != nil && v == *c.opts.NullEncoding {
conv.datums[i] = tree.DNull
conv.Datums[i] = tree.DNull
} else {
var err error
conv.datums[i], err = tree.ParseDatumStringAs(conv.visibleColTypes[i], v, conv.evalCtx)
conv.Datums[i], err = tree.ParseDatumStringAs(conv.VisibleColTypes[i], v, conv.EvalCtx)
if err != nil {
return wrapRowErr(err, batch.file, rowNum, pgcode.Syntax,
"parse %q as %s", col.Name, col.Type.SQLString())
}
}
}
if err := conv.row(ctx, batch.fileIndex, rowNum); err != nil {
if err := conv.Row(ctx, batch.fileIndex, rowNum); err != nil {
return wrapRowErr(err, batch.file, rowNum, pgcode.Uncategorized, "")
}
}
}
return conv.sendBatch(ctx)
return conv.SendBatch(ctx)
}
18 changes: 9 additions & 9 deletions pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// KVs using the mapped converter and sent to kvCh.
type mysqldumpReader struct {
evalCtx *tree.EvalContext
tables map[string]*rowConverter
tables map[string]*sql.RowConverter
kvCh chan []roachpb.KeyValue
debugRow func(tree.Datums)
}
Expand All @@ -56,13 +56,13 @@ func newMysqldumpReader(
) (*mysqldumpReader, error) {
res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh}

converters := make(map[string]*rowConverter, len(tables))
converters := make(map[string]*sql.RowConverter, len(tables))
for name, table := range tables {
if table == nil {
converters[name] = nil
continue
}
conv, err := newRowConverter(table, evalCtx, kvCh)
conv, err := sql.NewRowConverter(table, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -129,22 +129,22 @@ func (m *mysqldumpReader) readFile(
startingCount := count
for _, inputRow := range rows {
count++
if expected, got := len(conv.visibleCols), len(inputRow); expected != got {
if expected, got := len(conv.VisibleCols), len(inputRow); expected != got {
return errors.Errorf("expected %d values, got %d: %v", expected, got, inputRow)
}
for i, raw := range inputRow {
converted, err := mysqlValueToDatum(raw, conv.visibleColTypes[i], conv.evalCtx)
converted, err := mysqlValueToDatum(raw, conv.VisibleColTypes[i], conv.EvalCtx)
if err != nil {
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
}
conv.datums[i] = converted
conv.Datums[i] = converted
}
if err := conv.row(ctx, inputIdx, count); err != nil {
if err := conv.Row(ctx, inputIdx, count); err != nil {
return err
}
if m.debugRow != nil {
m.debugRow(conv.datums)
m.debugRow(conv.Datums)
}
}
default:
Expand All @@ -155,7 +155,7 @@ func (m *mysqldumpReader) readFile(
}
}
for _, conv := range m.tables {
if err := conv.sendBatch(ctx); err != nil {
if err := conv.SendBatch(ctx); err != nil {
return err
}
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/importccl/read_import_mysqlout.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
)

type mysqloutfileReader struct {
conv rowConverter
conv sql.RowConverter
opts roachpb.MySQLOutfileOptions
}

Expand All @@ -35,7 +36,7 @@ func newMysqloutfileReader(
tableDesc *sqlbase.TableDescriptor,
evalCtx *tree.EvalContext,
) (*mysqloutfileReader, error) {
conv, err := newRowConverter(tableDesc, evalCtx, kvCh)
conv, err := sql.NewRowConverter(tableDesc, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand All @@ -49,7 +50,7 @@ func (d *mysqloutfileReader) start(ctx ctxgroup.Group) {
}

func (d *mysqloutfileReader) inputFinished(ctx context.Context) {
close(d.conv.kvCh)
close(d.conv.KvCh)
}

func (d *mysqloutfileReader) readFiles(
Expand Down Expand Up @@ -85,9 +86,9 @@ func (d *mysqloutfileReader) readFile(

reader := bufio.NewReaderSize(input, 1024*64)
addField := func() error {
if len(row) >= len(d.conv.visibleCols) {
if len(row) >= len(d.conv.VisibleCols) {
return makeRowErr(inputName, count, pgcode.Syntax,
"too many columns, expected %d: %#v", len(d.conv.visibleCols), row)
"too many columns, expected %d: %#v", len(d.conv.VisibleCols), row)
}
if gotNull {
if len(field) != 0 {
Expand All @@ -99,9 +100,9 @@ func (d *mysqloutfileReader) readFile(
} else if !d.opts.HasEscape && string(field) == "NULL" {
row = append(row, tree.DNull)
} else {
datum, err := tree.ParseStringAs(d.conv.visibleColTypes[len(row)], string(field), d.conv.evalCtx)
datum, err := tree.ParseStringAs(d.conv.VisibleColTypes[len(row)], string(field), d.conv.EvalCtx)
if err != nil {
col := d.conv.visibleCols[len(row)]
col := d.conv.VisibleCols[len(row)]
return wrapRowErr(err, inputName, count, pgcode.Syntax,
"parse %q as %s", col.Name, col.Type.SQLString())
}
Expand All @@ -112,8 +113,8 @@ func (d *mysqloutfileReader) readFile(
return nil
}
addRow := func() error {
copy(d.conv.datums, row)
if err := d.conv.row(ctx, inputIdx, count); err != nil {
copy(d.conv.Datums, row)
if err := d.conv.Row(ctx, inputIdx, count); err != nil {
return wrapRowErr(err, inputName, count, pgcode.Uncategorized, "")
}
count++
Expand Down Expand Up @@ -226,5 +227,5 @@ func (d *mysqloutfileReader) readFile(
field = append(field, string(c)...)
}

return d.conv.sendBatch(ctx)
return d.conv.SendBatch(ctx)
}
21 changes: 11 additions & 10 deletions pkg/ccl/importccl/read_import_pgcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -31,7 +32,7 @@ import (
const defaultScanBuffer = 1024 * 1024 * 4

type pgCopyReader struct {
conv rowConverter
conv sql.RowConverter
opts roachpb.PgCopyOptions
}

Expand All @@ -43,7 +44,7 @@ func newPgCopyReader(
tableDesc *sqlbase.TableDescriptor,
evalCtx *tree.EvalContext,
) (*pgCopyReader, error) {
conv, err := newRowConverter(tableDesc, evalCtx, kvCh)
conv, err := sql.NewRowConverter(tableDesc, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +58,7 @@ func (d *pgCopyReader) start(ctx ctxgroup.Group) {
}

func (d *pgCopyReader) inputFinished(ctx context.Context) {
close(d.conv.kvCh)
close(d.conv.KvCh)
}

func (d *pgCopyReader) readFiles(
Expand Down Expand Up @@ -276,27 +277,27 @@ func (d *pgCopyReader) readFile(
if err != nil {
return wrapRowErr(err, inputName, count, pgcode.Uncategorized, "")
}
if len(row) != len(d.conv.visibleColTypes) {
if len(row) != len(d.conv.VisibleColTypes) {
return makeRowErr(inputName, count, pgcode.Syntax,
"expected %d values, got %d", len(d.conv.visibleColTypes), len(row))
"expected %d values, got %d", len(d.conv.VisibleColTypes), len(row))
}
for i, s := range row {
if s == nil {
d.conv.datums[i] = tree.DNull
d.conv.Datums[i] = tree.DNull
} else {
d.conv.datums[i], err = tree.ParseDatumStringAs(d.conv.visibleColTypes[i], *s, d.conv.evalCtx)
d.conv.Datums[i], err = tree.ParseDatumStringAs(d.conv.VisibleColTypes[i], *s, d.conv.EvalCtx)
if err != nil {
col := d.conv.visibleCols[i]
col := d.conv.VisibleCols[i]
return wrapRowErr(err, inputName, count, pgcode.Syntax,
"parse %q as %s", col.Name, col.Type.SQLString())
}
}
}

if err := d.conv.row(ctx, inputIdx, count); err != nil {
if err := d.conv.Row(ctx, inputIdx, count); err != nil {
return wrapRowErr(err, inputName, count, pgcode.Uncategorized, "")
}
}

return d.conv.sendBatch(ctx)
return d.conv.SendBatch(ctx)
}
Loading

0 comments on commit ac1900a

Please sign in to comment.