From 9804737e205f7de96c1f0383d90340b974e1ed18 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Sun, 26 Sep 2021 12:49:38 -0400 Subject: [PATCH] importccl,workloadccl: plumb semaCtx with type resolver to workload This change plumbs the semaCtx that has a type resolver through to the import workload workers. This allows for resolution of UDTs in tables being imported into via workload fixtures. Release note: None --- pkg/ccl/importccl/bench_test.go | 3 ++- pkg/ccl/importccl/import_processor.go | 2 +- pkg/ccl/importccl/read_import_workload.go | 17 +++++++++++------ pkg/ccl/workloadccl/format/sstable.go | 3 ++- pkg/sql/sem/tree/type_name.go | 2 +- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/importccl/bench_test.go b/pkg/ccl/importccl/bench_test.go index 24ea1aaf03ba..4e241908afde 100644 --- a/pkg/ccl/importccl/bench_test.go +++ b/pkg/ccl/importccl/bench_test.go @@ -179,7 +179,8 @@ func benchmarkConvertToKVs(b *testing.B, g workload.Generator) { SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}), Codec: keys.SystemSQLCodec, } - return wc.Worker(ctx, evalCtx) + semaCtx := tree.MakeSemaContext() + return wc.Worker(ctx, evalCtx, &semaCtx) }) for kvBatch := range kvCh { for i := range kvBatch.KVs { diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 0925491792e5..6938267bcadb 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -273,7 +273,7 @@ func makeInputConverter( } } if isWorkload { - return newWorkloadReader(kvCh, singleTable, evalCtx), nil + return newWorkloadReader(semaCtx, evalCtx, singleTable, kvCh), nil } return newCSVInputReader( semaCtx, kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism), diff --git a/pkg/ccl/importccl/read_import_workload.go b/pkg/ccl/importccl/read_import_workload.go index 69d7f5f4b9dd..bb86bd82793e 100644 --- a/pkg/ccl/importccl/read_import_workload.go +++ b/pkg/ccl/importccl/read_import_workload.go @@ -35,6 +35,7 @@ import ( ) type workloadReader struct { + semaCtx *tree.SemaContext evalCtx *tree.EvalContext table catalog.TableDescriptor kvCh chan row.KVBatch @@ -43,9 +44,12 @@ type workloadReader struct { var _ inputConverter = &workloadReader{} func newWorkloadReader( - kvCh chan row.KVBatch, table catalog.TableDescriptor, evalCtx *tree.EvalContext, + semaCtx *tree.SemaContext, + evalCtx *tree.EvalContext, + table catalog.TableDescriptor, + kvCh chan row.KVBatch, ) *workloadReader { - return &workloadReader{evalCtx: evalCtx, table: table, kvCh: kvCh} + return &workloadReader{semaCtx: semaCtx, evalCtx: evalCtx, table: table, kvCh: kvCh} } func (w *workloadReader) start(ctx ctxgroup.Group) { @@ -164,7 +168,7 @@ func (w *workloadReader) readFiles( for _, wc := range wcs { if err := ctxgroup.GroupWorkers(ctx, runtime.GOMAXPROCS(0), func(ctx context.Context, _ int) error { evalCtx := w.evalCtx.Copy() - return wc.Worker(ctx, evalCtx) + return wc.Worker(ctx, evalCtx, w.semaCtx) }); err != nil { return err } @@ -216,9 +220,10 @@ func NewWorkloadKVConverter( // minimzing the amount of overlapping SSTs ingested. // // This worker needs its own EvalContext and DatumAlloc. -func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error { - semaCtx := tree.MakeSemaContext() - conv, err := row.NewDatumRowConverter(ctx, &semaCtx, w.tableDesc, nil, /* targetColNames */ +func (w *WorkloadKVConverter) Worker( + ctx context.Context, evalCtx *tree.EvalContext, semaCtx *tree.SemaContext, +) error { + conv, err := row.NewDatumRowConverter(ctx, semaCtx, w.tableDesc, nil, /* targetColNames */ evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */) if err != nil { return err diff --git a/pkg/ccl/workloadccl/format/sstable.go b/pkg/ccl/workloadccl/format/sstable.go index 5fced8ab0306..bbd404d80bf1 100644 --- a/pkg/ccl/workloadccl/format/sstable.go +++ b/pkg/ccl/workloadccl/format/sstable.go @@ -85,7 +85,8 @@ func ToSSTable(t workload.Table, tableID descpb.ID, ts time.Time) ([]byte, error SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}), Codec: keys.SystemSQLCodec, } - return wc.Worker(ctx, evalCtx) + semaCtx := tree.MakeSemaContext() + return wc.Worker(ctx, evalCtx, &semaCtx) }) var sst []byte var kvs sortableKVs diff --git a/pkg/sql/sem/tree/type_name.go b/pkg/sql/sem/tree/type_name.go index 9285ff1ffb42..78103db8171a 100644 --- a/pkg/sql/sem/tree/type_name.go +++ b/pkg/sql/sem/tree/type_name.go @@ -164,7 +164,7 @@ func ResolveType( return resolver.ResolveType(ctx, t) case *OIDTypeReference: if resolver == nil { - return nil, pgerror.Newf(pgcode.UndefinedObject, "type OID %d does not exist", t.OID) + return nil, pgerror.Newf(pgcode.UndefinedObject, "type resolver unavailable to resolve type OID %d", t.OID) } return resolver.ResolveTypeByOID(ctx, t.OID) default: