Skip to content

Commit

Permalink
importccl,workloadccl: plumb semaCtx with type resolver to workload
Browse files Browse the repository at this point in the history
This change plumbs the semaCtx that has a type resolver through
to the import workload workers. This allows for resolution of
UDTs in tables being imported into via workload fixtures.

Release note: None
  • Loading branch information
adityamaru committed Sep 27, 2021
1 parent a0d6d46 commit 9804737
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func benchmarkConvertToKVs(b *testing.B, g workload.Generator) {
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
semaCtx := tree.MakeSemaContext()
return wc.Worker(ctx, evalCtx, &semaCtx)
})
for kvBatch := range kvCh {
for i := range kvBatch.KVs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func makeInputConverter(
}
}
if isWorkload {
return newWorkloadReader(kvCh, singleTable, evalCtx), nil
return newWorkloadReader(semaCtx, evalCtx, singleTable, kvCh), nil
}
return newCSVInputReader(
semaCtx, kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
Expand Down
17 changes: 11 additions & 6 deletions pkg/ccl/importccl/read_import_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

type workloadReader struct {
semaCtx *tree.SemaContext
evalCtx *tree.EvalContext
table catalog.TableDescriptor
kvCh chan row.KVBatch
Expand All @@ -43,9 +44,12 @@ type workloadReader struct {
var _ inputConverter = &workloadReader{}

func newWorkloadReader(
kvCh chan row.KVBatch, table catalog.TableDescriptor, evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
evalCtx *tree.EvalContext,
table catalog.TableDescriptor,
kvCh chan row.KVBatch,
) *workloadReader {
return &workloadReader{evalCtx: evalCtx, table: table, kvCh: kvCh}
return &workloadReader{semaCtx: semaCtx, evalCtx: evalCtx, table: table, kvCh: kvCh}
}

func (w *workloadReader) start(ctx ctxgroup.Group) {
Expand Down Expand Up @@ -164,7 +168,7 @@ func (w *workloadReader) readFiles(
for _, wc := range wcs {
if err := ctxgroup.GroupWorkers(ctx, runtime.GOMAXPROCS(0), func(ctx context.Context, _ int) error {
evalCtx := w.evalCtx.Copy()
return wc.Worker(ctx, evalCtx)
return wc.Worker(ctx, evalCtx, w.semaCtx)
}); err != nil {
return err
}
Expand Down Expand Up @@ -216,9 +220,10 @@ func NewWorkloadKVConverter(
// minimzing the amount of overlapping SSTs ingested.
//
// This worker needs its own EvalContext and DatumAlloc.
func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error {
semaCtx := tree.MakeSemaContext()
conv, err := row.NewDatumRowConverter(ctx, &semaCtx, w.tableDesc, nil, /* targetColNames */
func (w *WorkloadKVConverter) Worker(
ctx context.Context, evalCtx *tree.EvalContext, semaCtx *tree.SemaContext,
) error {
conv, err := row.NewDatumRowConverter(ctx, semaCtx, w.tableDesc, nil, /* targetColNames */
evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func ToSSTable(t workload.Table, tableID descpb.ID, ts time.Time) ([]byte, error
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
semaCtx := tree.MakeSemaContext()
return wc.Worker(ctx, evalCtx, &semaCtx)
})
var sst []byte
var kvs sortableKVs
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sem/tree/type_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func ResolveType(
return resolver.ResolveType(ctx, t)
case *OIDTypeReference:
if resolver == nil {
return nil, pgerror.Newf(pgcode.UndefinedObject, "type OID %d does not exist", t.OID)
return nil, pgerror.Newf(pgcode.UndefinedObject, "type resolver unavailable to resolve type OID %d", t.OID)
}
return resolver.ResolveTypeByOID(ctx, t.OID)
default:
Expand Down

0 comments on commit 9804737

Please sign in to comment.