diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index ae8b64764751..ad2436a2cf30 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -243,8 +243,11 @@ func createBenchmarkChangefeed( return nil, nil, err } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig - eventConsumer := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater, + eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater, sink, encoder, details, TestingKnobs{}, nil) + if err != nil { + return nil, nil, err + } tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) { event, err := buf.Get(ctx) if err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 4d96c5fb7a19..95030180f167 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -173,7 +173,7 @@ func newChangeAggregatorProcessor( } if _, needTopics := ca.spec.Feed.Opts[changefeedbase.OptTopicInValue]; needTopics { - ca.topicNamer, err = MakeTopicNamer(ca.spec.Feed.TargetSpecifications) + ca.topicNamer, err = MakeTopicNamer(AllTargets(ca.spec.Feed)) if err != nil { return nil, err } @@ -302,10 +302,16 @@ func (ca *changeAggregator) Start(ctx context.Context) { if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { ca.eventConsumer = newNativeKVConsumer(ca.sink) } else { - ca.eventConsumer = newKVEventToRowConsumer( + ca.eventConsumer, err = newKVEventToRowConsumer( ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater, ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer) } + if err != nil { + // Early abort in the case that there is an error creating the consumer. + ca.MoveToDraining(err) + ca.cancel() + return + } } func (ca *changeAggregator) startKVFeed( @@ -704,8 +710,8 @@ func newKVEventToRowConsumer( details jobspb.ChangefeedDetails, knobs TestingKnobs, topicNamer *TopicNamer, -) kvEventConsumer { - rfCache := newRowFetcherCache( +) (kvEventConsumer, error) { + rfCache, err := newRowFetcherCache( ctx, cfg.Codec, cfg.LeaseManager.(*lease.Manager), @@ -713,6 +719,9 @@ func newKVEventToRowConsumer( cfg.DB, details, ) + if err != nil { + return nil, err + } return &kvEventToRowConsumer{ frontier: frontier, @@ -724,7 +733,7 @@ func newKVEventToRowConsumer( knobs: knobs, topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, - } + }, nil } type tableDescriptorTopic struct { @@ -860,7 +869,7 @@ func (c *kvEventToRowConsumer) topicForRow(r encodeRow) (TopicDescriptor, error) if err != nil { return noTopic{}, err } - for _, s := range c.details.TargetSpecifications { + for _, s := range AllTargets(c.details) { if s.TableID == r.tableDesc.GetID() && (s.FamilyName == "" || s.FamilyName == family.Name) { topic, err := makeTopicDescriptorFromSpecForRow(s, r) if err != nil { diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 8f87f007e88e..8fd620a9b529 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -79,8 +79,11 @@ func newRowFetcherCache( cf *descs.CollectionFactory, db *kv.DB, details jobspb.ChangefeedDetails, -) *rowFetcherCache { - specs := details.TargetSpecifications +) (*rowFetcherCache, error) { + specs := AllTargets(details) + if len(specs) == 0 { + return nil, errors.Newf("Could not derive any target specifications from %v", details) + } watchedFamilies := make(map[watchedFamily]struct{}, len(specs)) for _, s := range specs { watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{} @@ -92,7 +95,7 @@ func newRowFetcherCache( db: db, fetchers: cache.NewUnorderedCache(rfCacheConfig), watchedFamilies: watchedFamilies, - } + }, nil } func (c *rowFetcherCache) TableDescForKey(