Skip to content

Commit

Permalink
importccl: pre-read schemas in mysqldump import
Browse files Browse the repository at this point in the history
This switches mysqldump import to use read schemas from during setup on
the gateway, rather than during sampling, similar to how pgdump operates
(i.e. in three passes over the input rather than two).

This simplifies handling foreign keys, which can sometimes appear in
a table definition before the table they reference — making them hard to
correctly resolve immediately. Reading though the whole file to capture
all the schemas before evaluating them should make that a bit easier.

In the future, a return to 2-pass could be possible either if it turns
out that KVs can be produced correctly even if the schema is later
changed by a foreign key, or by oversampling raw rows of the input
*without* converting during the read extracting the schemas, then using
those schemas to convert the sampled rows to KVs from which the splits
can be sampled.

Release note: none.
  • Loading branch information
dt committed Jul 20, 2018
1 parent 80958ce commit e984ec5
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 113 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ func importPlanHook(
fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)}
switch format.Format {
case roachpb.IOFileFormat_Mysqldump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readMysqlCreateTable(reader, evalCtx, parentID, match)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(reader, evalCtx, p.ExecCfg().Settings, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize))
Expand Down
51 changes: 4 additions & 47 deletions pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"

"github.com/pkg/errors"
mysqltypes "vitess.io/vitess/go/sqltypes"
mysql "vitess.io/vitess/go/vt/sqlparser"
Expand All @@ -41,11 +39,9 @@ import (
// tables with names that appear in the `tables` map is converted to Cockroach
// KVs using the mapped converter and sent to kvCh.
type mysqldumpReader struct {
evalCtx *tree.EvalContext
tables map[string]*rowConverter
importAll bool // import any table encountered.
kvCh chan kvBatch

evalCtx *tree.EvalContext
tables map[string]*rowConverter
kvCh chan kvBatch
debugRow func(tree.Datums)
}

Expand All @@ -54,7 +50,7 @@ var _ inputConverter = &mysqldumpReader{}
func newMysqldumpReader(
kvCh chan kvBatch, tables map[string]*sqlbase.TableDescriptor, evalCtx *tree.EvalContext,
) (*mysqldumpReader, error) {
res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh, importAll: len(tables) == 0}
res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh}

converters := make(map[string]*rowConverter, len(tables))
for name, table := range tables {
Expand All @@ -69,7 +65,6 @@ func newMysqldumpReader(
converters[name] = conv
}
res.tables = converters

return res, nil
}

Expand All @@ -83,7 +78,6 @@ func (m *mysqldumpReader) inputFinished(ctx context.Context) {
func (m *mysqldumpReader) readFile(
ctx context.Context, input io.Reader, inputIdx int32, inputName string, progressFn progressFn,
) error {
var generatedIDs sqlbase.ID
var inserts, count int64
r := bufio.NewReaderSize(input, 1024*64)
tokens := mysql.NewTokenizer(r)
Expand All @@ -100,43 +94,6 @@ func (m *mysqldumpReader) readFile(
return errors.Wrap(err, "mysql parse error")
}
switch i := stmt.(type) {
case *mysql.DDL:
if i.Action == mysql.DropStr {
continue
}
if i.Action != mysql.CreateStr {
return errors.Errorf("unsupported %q statement in mysqldump", i.Action)
}
name := i.NewName.Name.String()
conv, ok := m.tables[name]
// If we already have this schema, skip it.
if conv != nil {
continue
}
// If we're only importing the named tables and this is not one, skip it.
if !m.importAll && !ok {
continue
}

generatedIDs++
id := defaultCSVTableID + generatedIDs
tbl, err := mysqlTableToCockroach(m.evalCtx, defaultCSVParentID, id, name, i.TableSpec)
if err != nil {
return err
}
conv, err = newRowConverter(tbl, m.evalCtx, m.kvCh)
if err != nil {
return err
}
kv := roachpb.KeyValue{Key: sqlbase.MakeDescMetadataKey(id)}
if err := kv.Value.SetProto(tbl); err != nil {
return err
}
kv.Value.InitChecksum(kv.Key)
conv.kvBatch = append(conv.kvBatch, kv)

m.tables[name] = conv

case *mysql.Insert:
name := i.Table.Name.String()
conv, ok := m.tables[name]
Expand Down
129 changes: 63 additions & 66 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,61 +248,63 @@ func LoadCSV(

details := job.Details().(jobspb.ImportDetails)
samples := details.Samples
var parsedTables map[sqlbase.ID]*sqlbase.TableDescriptor
if samples == nil {
var err error
samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs)
samples, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs)
if err != nil {
return err
}
}

// If sampling returns parsed table definitions, we need to potentially assign
// them real IDs and re-key the samples with those IDs, then update the job
// details to record the tables and their matching samples.
if len(parsedTables) > 0 {
importing := to == "" // are we actually ingesting, or just transforming?

rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables))

// Update the tables map with the parsed tables and allocate them real IDs.
for _, parsed := range parsedTables {
name := parsed.Name
if existing, ok := tables[name]; ok && existing != nil {
return errors.Errorf("unexpected parsed table definition for %q", name)
}
tables[name] = parsed

// If we're actually importing, we'll need a real ID for this table.
if importing {
rekeys[parsed.ID] = parsed
parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB)
if err != nil {
return err
/*
TODO(dt): when we enable reading schemas during sampling, might do this:
// If sampling returns parsed table definitions, we need to potentially assign
// them real IDs and re-key the samples with those IDs, then update the job
// details to record the tables and their matching samples.
if len(parsedTables) > 0 {
importing := to == "" // are we actually ingesting, or just transforming?
rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables))
// Update the tables map with the parsed tables and allocate them real IDs.
for _, parsed := range parsedTables {
name := parsed.Name
if existing, ok := tables[name]; ok && existing != nil {
return errors.Errorf("unexpected parsed table definition for %q", name)
}
tables[name] = parsed
// If we're actually importing, we'll need a real ID for this table.
if importing {
rekeys[parsed.ID] = parsed
parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB)
if err != nil {
return err
}
}
}
}
}
// The samples were created using the dummy IDs, but the IMPORT run will use
// the actual IDs, so we need to re-key the samples so that they actually
// act as splits in the IMPORTed key-space.
if importing {
kr, err := makeRewriter(rekeys)
if err != nil {
return err
}
for i := range samples {
var ok bool
samples[i], ok, err = kr.RewriteKey(samples[i])
if err != nil {
return err
}
if !ok {
return errors.Errorf("expected rewriter to rewrite key %v", samples[i])
// The samples were created using the dummy IDs, but the IMPORT run will use
// the actual IDs, so we need to re-key the samples so that they actually
// act as splits in the IMPORTed key-space.
if importing {
kr, err := makeRewriter(rekeys)
if err != nil {
return err
}
for i := range samples {
var ok bool
samples[i], ok, err = kr.RewriteKey(samples[i])
if err != nil {
return err
}
if !ok {
return errors.Errorf("expected rewriter to rewrite key %v", samples[i])
}
}
}
}
}
}
*/

if len(tables) == 0 {
return errors.Errorf("must specify table(s) to import")
Expand Down Expand Up @@ -438,12 +440,6 @@ func LoadCSV(

d := details.(*jobspb.Payload_Import).Import
d.Samples = samples
if len(parsedTables) > 0 {
d.Tables = make([]jobspb.ImportDetails_Table, 0, len(tables))
for _, tbl := range tables {
d.Tables = append(d.Tables, jobspb.ImportDetails_Table{Desc: tbl})
}
}
return prog.Completed()
},
); err != nil {
Expand Down Expand Up @@ -483,7 +479,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
planCtx *planningCtx,
csvSpecs []*distsqlrun.ReadImportDataSpec,
sstSpecs []distsqlrun.SSTWriterSpec,
) ([][]byte, map[sqlbase.ID]*sqlbase.TableDescriptor, error) {
) ([][]byte, error) {
// splitSize is the target number of bytes at which to create SST files. We
// attempt to do this by sampling, which is what the first DistSQL plan of this
// function does. CSV rows are converted into KVs. The total size of the KV is
Expand All @@ -501,7 +497,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
}
sampleSize := splitSize / oversample
if sampleSize > math.MaxInt32 {
return nil, nil, errors.Errorf("SST size must fit in an int32: %d", splitSize)
return nil, errors.Errorf("SST size must fit in an int32: %d", splitSize)
}

var p physicalPlan
Expand Down Expand Up @@ -530,7 +526,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
d.SamplingProgress = make([]float32, len(csvSpecs))
return d.Completed()
}); err != nil {
return nil, nil, err
return nil, err
}

// We only need the key during sorting.
Expand Down Expand Up @@ -558,23 +554,24 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
)

var samples [][]byte
parsedTables := make(map[sqlbase.ID]*sqlbase.TableDescriptor)

sampleCount := 0
rowResultWriter := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
key := roachpb.Key(*row[0].(*tree.DBytes))

if keys.IsDescriptorKey(key) {
kv := roachpb.KeyValue{Key: key}
kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes))
var desc sqlbase.TableDescriptor
if err := kv.Value.GetProto(&desc); err != nil {
return err
/*
TODO(dt): when we enable reading schemas during sampling, might do this:
if keys.IsDescriptorKey(key) {
kv := roachpb.KeyValue{Key: key}
kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes))
var desc sqlbase.TableDescriptor
if err := kv.Value.GetProto(&desc); err != nil {
return err
}
parsedTables[desc.ID] = &desc
return nil
}
parsedTables[desc.ID] = &desc
return nil
}

*/
sampleCount++
sampleCount = sampleCount % int(oversample)
if sampleCount == 0 {
Expand Down Expand Up @@ -606,10 +603,10 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
samples = nil
dsp.Run(planCtx, nil, &p, recv, evalCtx)
if err := rowResultWriter.Err(); err != nil {
return nil, nil, err
return nil, err
}

log.VEventf(ctx, 1, "generated %d splits; begin routing for job %s", len(samples), job.Payload().Description)

return samples, parsedTables, nil
return samples, nil
}

0 comments on commit e984ec5

Please sign in to comment.