From e984ec55948810deb602632887b68baebbf2449e Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 16 Jul 2018 14:20:35 +0000 Subject: [PATCH] importccl: pre-read schemas in mysqldump import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This switches mysqldump import to use read schemas from during setup on the gateway, rather than during sampling, similar to how pgdump operates (i.e. in three passes over the input rather than two). This simplifies handling foreign keys, which can sometimes appear in a table definition before the table they reference — making them hard to correctly resolve immediately. Reading though the whole file to capture all the schemas before evaluating them should make that a bit easier. In the future, a return to 2-pass could be possible either if it turns out that KVs can be produced correctly even if the schema is later changed by a foreign key, or by oversampling raw rows of the input *without* converting during the read extracting the schemas, then using those schemas to convert the sampled rows to KVs from which the splits can be sampled. Release note: none. --- pkg/ccl/importccl/import_stmt.go | 2 + pkg/ccl/importccl/read_import_mysql.go | 51 +--------- pkg/sql/distsql_plan_csv.go | 129 ++++++++++++------------- 3 files changed, 69 insertions(+), 113 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index c0f8839c9832..4312d3e5d421 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -687,6 +687,8 @@ func importPlanHook( fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)} switch format.Format { case roachpb.IOFileFormat_Mysqldump: + evalCtx := &p.ExtendedEvalContext().EvalContext + tableDescs, err = readMysqlCreateTable(reader, evalCtx, parentID, match) case roachpb.IOFileFormat_PgDump: evalCtx := &p.ExtendedEvalContext().EvalContext tableDescs, err = readPostgresCreateTable(reader, evalCtx, p.ExecCfg().Settings, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize)) diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 6705ef83511c..bdba0e320ace 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -17,8 +17,6 @@ import ( "strconv" "strings" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/pkg/errors" mysqltypes "vitess.io/vitess/go/sqltypes" mysql "vitess.io/vitess/go/vt/sqlparser" @@ -41,11 +39,9 @@ import ( // tables with names that appear in the `tables` map is converted to Cockroach // KVs using the mapped converter and sent to kvCh. type mysqldumpReader struct { - evalCtx *tree.EvalContext - tables map[string]*rowConverter - importAll bool // import any table encountered. - kvCh chan kvBatch - + evalCtx *tree.EvalContext + tables map[string]*rowConverter + kvCh chan kvBatch debugRow func(tree.Datums) } @@ -54,7 +50,7 @@ var _ inputConverter = &mysqldumpReader{} func newMysqldumpReader( kvCh chan kvBatch, tables map[string]*sqlbase.TableDescriptor, evalCtx *tree.EvalContext, ) (*mysqldumpReader, error) { - res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh, importAll: len(tables) == 0} + res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh} converters := make(map[string]*rowConverter, len(tables)) for name, table := range tables { @@ -69,7 +65,6 @@ func newMysqldumpReader( converters[name] = conv } res.tables = converters - return res, nil } @@ -83,7 +78,6 @@ func (m *mysqldumpReader) inputFinished(ctx context.Context) { func (m *mysqldumpReader) readFile( ctx context.Context, input io.Reader, inputIdx int32, inputName string, progressFn progressFn, ) error { - var generatedIDs sqlbase.ID var inserts, count int64 r := bufio.NewReaderSize(input, 1024*64) tokens := mysql.NewTokenizer(r) @@ -100,43 +94,6 @@ func (m *mysqldumpReader) readFile( return errors.Wrap(err, "mysql parse error") } switch i := stmt.(type) { - case *mysql.DDL: - if i.Action == mysql.DropStr { - continue - } - if i.Action != mysql.CreateStr { - return errors.Errorf("unsupported %q statement in mysqldump", i.Action) - } - name := i.NewName.Name.String() - conv, ok := m.tables[name] - // If we already have this schema, skip it. - if conv != nil { - continue - } - // If we're only importing the named tables and this is not one, skip it. - if !m.importAll && !ok { - continue - } - - generatedIDs++ - id := defaultCSVTableID + generatedIDs - tbl, err := mysqlTableToCockroach(m.evalCtx, defaultCSVParentID, id, name, i.TableSpec) - if err != nil { - return err - } - conv, err = newRowConverter(tbl, m.evalCtx, m.kvCh) - if err != nil { - return err - } - kv := roachpb.KeyValue{Key: sqlbase.MakeDescMetadataKey(id)} - if err := kv.Value.SetProto(tbl); err != nil { - return err - } - kv.Value.InitChecksum(kv.Key) - conv.kvBatch = append(conv.kvBatch, kv) - - m.tables[name] = conv - case *mysql.Insert: name := i.Table.Name.String() conv, ok := m.tables[name] diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index b029e8d4b517..8c79b2d74323 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -248,61 +248,63 @@ func LoadCSV( details := job.Details().(jobspb.ImportDetails) samples := details.Samples - var parsedTables map[sqlbase.ID]*sqlbase.TableDescriptor if samples == nil { var err error - samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs) + samples, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs) if err != nil { return err } } - // If sampling returns parsed table definitions, we need to potentially assign - // them real IDs and re-key the samples with those IDs, then update the job - // details to record the tables and their matching samples. - if len(parsedTables) > 0 { - importing := to == "" // are we actually ingesting, or just transforming? - - rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables)) - - // Update the tables map with the parsed tables and allocate them real IDs. - for _, parsed := range parsedTables { - name := parsed.Name - if existing, ok := tables[name]; ok && existing != nil { - return errors.Errorf("unexpected parsed table definition for %q", name) - } - tables[name] = parsed - - // If we're actually importing, we'll need a real ID for this table. - if importing { - rekeys[parsed.ID] = parsed - parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB) - if err != nil { - return err + /* + TODO(dt): when we enable reading schemas during sampling, might do this: + // If sampling returns parsed table definitions, we need to potentially assign + // them real IDs and re-key the samples with those IDs, then update the job + // details to record the tables and their matching samples. + if len(parsedTables) > 0 { + importing := to == "" // are we actually ingesting, or just transforming? + + rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables)) + + // Update the tables map with the parsed tables and allocate them real IDs. + for _, parsed := range parsedTables { + name := parsed.Name + if existing, ok := tables[name]; ok && existing != nil { + return errors.Errorf("unexpected parsed table definition for %q", name) + } + tables[name] = parsed + + // If we're actually importing, we'll need a real ID for this table. + if importing { + rekeys[parsed.ID] = parsed + parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB) + if err != nil { + return err + } + } } - } - } - // The samples were created using the dummy IDs, but the IMPORT run will use - // the actual IDs, so we need to re-key the samples so that they actually - // act as splits in the IMPORTed key-space. - if importing { - kr, err := makeRewriter(rekeys) - if err != nil { - return err - } - for i := range samples { - var ok bool - samples[i], ok, err = kr.RewriteKey(samples[i]) - if err != nil { - return err - } - if !ok { - return errors.Errorf("expected rewriter to rewrite key %v", samples[i]) + // The samples were created using the dummy IDs, but the IMPORT run will use + // the actual IDs, so we need to re-key the samples so that they actually + // act as splits in the IMPORTed key-space. + if importing { + kr, err := makeRewriter(rekeys) + if err != nil { + return err + } + for i := range samples { + var ok bool + samples[i], ok, err = kr.RewriteKey(samples[i]) + if err != nil { + return err + } + if !ok { + return errors.Errorf("expected rewriter to rewrite key %v", samples[i]) + } + } } } - } - } + */ if len(tables) == 0 { return errors.Errorf("must specify table(s) to import") @@ -438,12 +440,6 @@ func LoadCSV( d := details.(*jobspb.Payload_Import).Import d.Samples = samples - if len(parsedTables) > 0 { - d.Tables = make([]jobspb.ImportDetails_Table, 0, len(tables)) - for _, tbl := range tables { - d.Tables = append(d.Tables, jobspb.ImportDetails_Table{Desc: tbl}) - } - } return prog.Completed() }, ); err != nil { @@ -483,7 +479,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( planCtx *planningCtx, csvSpecs []*distsqlrun.ReadImportDataSpec, sstSpecs []distsqlrun.SSTWriterSpec, -) ([][]byte, map[sqlbase.ID]*sqlbase.TableDescriptor, error) { +) ([][]byte, error) { // splitSize is the target number of bytes at which to create SST files. We // attempt to do this by sampling, which is what the first DistSQL plan of this // function does. CSV rows are converted into KVs. The total size of the KV is @@ -501,7 +497,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( } sampleSize := splitSize / oversample if sampleSize > math.MaxInt32 { - return nil, nil, errors.Errorf("SST size must fit in an int32: %d", splitSize) + return nil, errors.Errorf("SST size must fit in an int32: %d", splitSize) } var p physicalPlan @@ -530,7 +526,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( d.SamplingProgress = make([]float32, len(csvSpecs)) return d.Completed() }); err != nil { - return nil, nil, err + return nil, err } // We only need the key during sorting. @@ -558,23 +554,24 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( ) var samples [][]byte - parsedTables := make(map[sqlbase.ID]*sqlbase.TableDescriptor) sampleCount := 0 rowResultWriter := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { key := roachpb.Key(*row[0].(*tree.DBytes)) - if keys.IsDescriptorKey(key) { - kv := roachpb.KeyValue{Key: key} - kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes)) - var desc sqlbase.TableDescriptor - if err := kv.Value.GetProto(&desc); err != nil { - return err + /* + TODO(dt): when we enable reading schemas during sampling, might do this: + if keys.IsDescriptorKey(key) { + kv := roachpb.KeyValue{Key: key} + kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes)) + var desc sqlbase.TableDescriptor + if err := kv.Value.GetProto(&desc); err != nil { + return err + } + parsedTables[desc.ID] = &desc + return nil } - parsedTables[desc.ID] = &desc - return nil - } - + */ sampleCount++ sampleCount = sampleCount % int(oversample) if sampleCount == 0 { @@ -606,10 +603,10 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( samples = nil dsp.Run(planCtx, nil, &p, recv, evalCtx) if err := rowResultWriter.Err(); err != nil { - return nil, nil, err + return nil, err } log.VEventf(ctx, 1, "generated %d splits; begin routing for job %s", len(samples), job.Payload().Description) - return samples, parsedTables, nil + return samples, nil }