diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index c0f8839c9832..4312d3e5d421 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -687,6 +687,8 @@ func importPlanHook( fks := fkHandler{skip: skipFKs, allowed: true, resolver: make(fkResolver)} switch format.Format { case roachpb.IOFileFormat_Mysqldump: + evalCtx := &p.ExtendedEvalContext().EvalContext + tableDescs, err = readMysqlCreateTable(reader, evalCtx, parentID, match) case roachpb.IOFileFormat_PgDump: evalCtx := &p.ExtendedEvalContext().EvalContext tableDescs, err = readPostgresCreateTable(reader, evalCtx, p.ExecCfg().Settings, match, parentID, walltime, fks, int(format.PgDump.MaxRowSize)) diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 6705ef83511c..bdba0e320ace 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -17,8 +17,6 @@ import ( "strconv" "strings" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/pkg/errors" mysqltypes "vitess.io/vitess/go/sqltypes" mysql "vitess.io/vitess/go/vt/sqlparser" @@ -41,11 +39,9 @@ import ( // tables with names that appear in the `tables` map is converted to Cockroach // KVs using the mapped converter and sent to kvCh. type mysqldumpReader struct { - evalCtx *tree.EvalContext - tables map[string]*rowConverter - importAll bool // import any table encountered. - kvCh chan kvBatch - + evalCtx *tree.EvalContext + tables map[string]*rowConverter + kvCh chan kvBatch debugRow func(tree.Datums) } @@ -54,7 +50,7 @@ var _ inputConverter = &mysqldumpReader{} func newMysqldumpReader( kvCh chan kvBatch, tables map[string]*sqlbase.TableDescriptor, evalCtx *tree.EvalContext, ) (*mysqldumpReader, error) { - res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh, importAll: len(tables) == 0} + res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh} converters := make(map[string]*rowConverter, len(tables)) for name, table := range tables { @@ -69,7 +65,6 @@ func newMysqldumpReader( converters[name] = conv } res.tables = converters - return res, nil } @@ -83,7 +78,6 @@ func (m *mysqldumpReader) inputFinished(ctx context.Context) { func (m *mysqldumpReader) readFile( ctx context.Context, input io.Reader, inputIdx int32, inputName string, progressFn progressFn, ) error { - var generatedIDs sqlbase.ID var inserts, count int64 r := bufio.NewReaderSize(input, 1024*64) tokens := mysql.NewTokenizer(r) @@ -100,43 +94,6 @@ func (m *mysqldumpReader) readFile( return errors.Wrap(err, "mysql parse error") } switch i := stmt.(type) { - case *mysql.DDL: - if i.Action == mysql.DropStr { - continue - } - if i.Action != mysql.CreateStr { - return errors.Errorf("unsupported %q statement in mysqldump", i.Action) - } - name := i.NewName.Name.String() - conv, ok := m.tables[name] - // If we already have this schema, skip it. - if conv != nil { - continue - } - // If we're only importing the named tables and this is not one, skip it. - if !m.importAll && !ok { - continue - } - - generatedIDs++ - id := defaultCSVTableID + generatedIDs - tbl, err := mysqlTableToCockroach(m.evalCtx, defaultCSVParentID, id, name, i.TableSpec) - if err != nil { - return err - } - conv, err = newRowConverter(tbl, m.evalCtx, m.kvCh) - if err != nil { - return err - } - kv := roachpb.KeyValue{Key: sqlbase.MakeDescMetadataKey(id)} - if err := kv.Value.SetProto(tbl); err != nil { - return err - } - kv.Value.InitChecksum(kv.Key) - conv.kvBatch = append(conv.kvBatch, kv) - - m.tables[name] = conv - case *mysql.Insert: name := i.Table.Name.String() conv, ok := m.tables[name] diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index b029e8d4b517..8c79b2d74323 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -248,61 +248,63 @@ func LoadCSV( details := job.Details().(jobspb.ImportDetails) samples := details.Samples - var parsedTables map[sqlbase.ID]*sqlbase.TableDescriptor if samples == nil { var err error - samples, parsedTables, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs) + samples, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, oversample, &planCtx, inputSpecs, sstSpecs) if err != nil { return err } } - // If sampling returns parsed table definitions, we need to potentially assign - // them real IDs and re-key the samples with those IDs, then update the job - // details to record the tables and their matching samples. - if len(parsedTables) > 0 { - importing := to == "" // are we actually ingesting, or just transforming? - - rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables)) - - // Update the tables map with the parsed tables and allocate them real IDs. - for _, parsed := range parsedTables { - name := parsed.Name - if existing, ok := tables[name]; ok && existing != nil { - return errors.Errorf("unexpected parsed table definition for %q", name) - } - tables[name] = parsed - - // If we're actually importing, we'll need a real ID for this table. - if importing { - rekeys[parsed.ID] = parsed - parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB) - if err != nil { - return err + /* + TODO(dt): when we enable reading schemas during sampling, might do this: + // If sampling returns parsed table definitions, we need to potentially assign + // them real IDs and re-key the samples with those IDs, then update the job + // details to record the tables and their matching samples. + if len(parsedTables) > 0 { + importing := to == "" // are we actually ingesting, or just transforming? + + rekeys := make(map[sqlbase.ID]*sqlbase.TableDescriptor, len(parsedTables)) + + // Update the tables map with the parsed tables and allocate them real IDs. + for _, parsed := range parsedTables { + name := parsed.Name + if existing, ok := tables[name]; ok && existing != nil { + return errors.Errorf("unexpected parsed table definition for %q", name) + } + tables[name] = parsed + + // If we're actually importing, we'll need a real ID for this table. + if importing { + rekeys[parsed.ID] = parsed + parsed.ID, err = GenerateUniqueDescID(ctx, phs.ExecCfg().DB) + if err != nil { + return err + } + } } - } - } - // The samples were created using the dummy IDs, but the IMPORT run will use - // the actual IDs, so we need to re-key the samples so that they actually - // act as splits in the IMPORTed key-space. - if importing { - kr, err := makeRewriter(rekeys) - if err != nil { - return err - } - for i := range samples { - var ok bool - samples[i], ok, err = kr.RewriteKey(samples[i]) - if err != nil { - return err - } - if !ok { - return errors.Errorf("expected rewriter to rewrite key %v", samples[i]) + // The samples were created using the dummy IDs, but the IMPORT run will use + // the actual IDs, so we need to re-key the samples so that they actually + // act as splits in the IMPORTed key-space. + if importing { + kr, err := makeRewriter(rekeys) + if err != nil { + return err + } + for i := range samples { + var ok bool + samples[i], ok, err = kr.RewriteKey(samples[i]) + if err != nil { + return err + } + if !ok { + return errors.Errorf("expected rewriter to rewrite key %v", samples[i]) + } + } } } - } - } + */ if len(tables) == 0 { return errors.Errorf("must specify table(s) to import") @@ -438,12 +440,6 @@ func LoadCSV( d := details.(*jobspb.Payload_Import).Import d.Samples = samples - if len(parsedTables) > 0 { - d.Tables = make([]jobspb.ImportDetails_Table, 0, len(tables)) - for _, tbl := range tables { - d.Tables = append(d.Tables, jobspb.ImportDetails_Table{Desc: tbl}) - } - } return prog.Completed() }, ); err != nil { @@ -483,7 +479,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( planCtx *planningCtx, csvSpecs []*distsqlrun.ReadImportDataSpec, sstSpecs []distsqlrun.SSTWriterSpec, -) ([][]byte, map[sqlbase.ID]*sqlbase.TableDescriptor, error) { +) ([][]byte, error) { // splitSize is the target number of bytes at which to create SST files. We // attempt to do this by sampling, which is what the first DistSQL plan of this // function does. CSV rows are converted into KVs. The total size of the KV is @@ -501,7 +497,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( } sampleSize := splitSize / oversample if sampleSize > math.MaxInt32 { - return nil, nil, errors.Errorf("SST size must fit in an int32: %d", splitSize) + return nil, errors.Errorf("SST size must fit in an int32: %d", splitSize) } var p physicalPlan @@ -530,7 +526,7 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( d.SamplingProgress = make([]float32, len(csvSpecs)) return d.Completed() }); err != nil { - return nil, nil, err + return nil, err } // We only need the key during sorting. @@ -558,23 +554,24 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( ) var samples [][]byte - parsedTables := make(map[sqlbase.ID]*sqlbase.TableDescriptor) sampleCount := 0 rowResultWriter := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { key := roachpb.Key(*row[0].(*tree.DBytes)) - if keys.IsDescriptorKey(key) { - kv := roachpb.KeyValue{Key: key} - kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes)) - var desc sqlbase.TableDescriptor - if err := kv.Value.GetProto(&desc); err != nil { - return err + /* + TODO(dt): when we enable reading schemas during sampling, might do this: + if keys.IsDescriptorKey(key) { + kv := roachpb.KeyValue{Key: key} + kv.Value.RawBytes = []byte(*row[1].(*tree.DBytes)) + var desc sqlbase.TableDescriptor + if err := kv.Value.GetProto(&desc); err != nil { + return err + } + parsedTables[desc.ID] = &desc + return nil } - parsedTables[desc.ID] = &desc - return nil - } - + */ sampleCount++ sampleCount = sampleCount % int(oversample) if sampleCount == 0 { @@ -606,10 +603,10 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( samples = nil dsp.Run(planCtx, nil, &p, recv, evalCtx) if err := rowResultWriter.Err(); err != nil { - return nil, nil, err + return nil, err } log.VEventf(ctx, 1, "generated %d splits; begin routing for job %s", len(samples), job.Payload().Description) - return samples, parsedTables, nil + return samples, nil }