Skip to content

Commit

Permalink
Merge #82371
Browse files Browse the repository at this point in the history
82371: changefeedccl: correctly handle old-style protobufs in rowfetcher_cache r=miretskiy a=miretskiy

Fixes #82309.

Release note (bug fix): Fixed a bug where changefeeds created before
upgrading to 22.1 would silently fail to emit any data other than
resolved timestamps.

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Jun 3, 2022
2 parents 88834c3 + c3fffbf commit d8d995d
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ go_test(
"bench_test.go",
"changefeed_test.go",
"encoder_test.go",
"event_processing_test.go",
"helpers_tenant_shim_test.go",
"helpers_test.go",
"main_test.go",
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,11 @@ func createBenchmarkChangefeed(
return nil, nil, err
}
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
eventConsumer := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater,
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater,
sink, encoder, details, TestingKnobs{}, nil)
if err != nil {
return nil, nil, err
}
tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) {
event, err := buf.Get(ctx)
if err != nil {
Expand Down
34 changes: 17 additions & 17 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func TestNoopPredicate(t *testing.T) {

serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
ctx := context.Background()
decoder := cdcevent.NewEventDecoder(ctx, &serverCfg,
jobspb.ChangefeedDetails{
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{
{
Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY,
TableID: desc.GetID(),
FamilyName: "most",
},
decoder, err := cdcevent.NewEventDecoder(
ctx, &serverCfg,
[]jobspb.ChangefeedTargetSpecification{
{
Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY,
TableID: desc.GetID(),
FamilyName: "most",
},
})
}, false)
require.NoError(t, err)

popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc)
defer cleanup()
Expand Down Expand Up @@ -452,18 +452,18 @@ CREATE TABLE foo (
targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY
}

details := jobspb.ChangefeedDetails{
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{
{
Type: targetType,
TableID: desc.GetID(),
FamilyName: tc.familyName,
},
targets := []jobspb.ChangefeedTargetSpecification{
{
Type: targetType,
TableID: desc.GetID(),
FamilyName: tc.familyName,
},
}

serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
ctx := context.Background()
decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, details)
decoder, err := cdcevent.NewEventDecoder(ctx, &serverCfg, targets, false)
require.NoError(t, err)

for _, action := range tc.setupActions {
sqlDB.Exec(t, action)
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -346,18 +345,23 @@ func getEventDescriptorCached(

// NewEventDecoder returns key value decoder.
func NewEventDecoder(
ctx context.Context, cfg *execinfra.ServerConfig, details jobspb.ChangefeedDetails,
) Decoder {
rfCache := newRowFetcherCache(
ctx context.Context,
cfg *execinfra.ServerConfig,
targets []jobspb.ChangefeedTargetSpecification,
includeVirtual bool,
) (Decoder, error) {
rfCache, err := newRowFetcherCache(
ctx,
cfg.Codec,
cfg.LeaseManager.(*lease.Manager),
cfg.CollectionFactory,
cfg.DB,
details.TargetSpecifications,
targets,
)
if err != nil {
return nil, err
}

includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull)
eventDescriptorCache := cache.NewUnorderedCache(defaultCacheConfig)
getEventDescriptor := func(
desc catalog.TableDescriptor,
Expand All @@ -370,7 +374,7 @@ func NewEventDecoder(
return &eventDecoder{
getEventDescriptor: getEventDescriptor,
rfCache: rfCache,
}
}, nil
}

// DecodeKV decodes key value at specified schema timestamp.
Expand Down
34 changes: 14 additions & 20 deletions pkg/ccl/changefeedccl/cdcevent/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -149,7 +148,7 @@ CREATE TABLE foo (
for _, tc := range []struct {
testName string
familyName string // Must be set if targetType ChangefeedTargetSpecification_COLUMN_FAMILY
virtualColumn changefeedbase.VirtualColumnVisibility
includeVirtual bool
actions []string
expectMainFamily []decodeExpectation
expectOnlyCFamily []decodeExpectation
Expand All @@ -167,10 +166,10 @@ CREATE TABLE foo (
},
},
{
testName: "main/primary_cols_with_virtual",
familyName: "main",
actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'second test')"},
virtualColumn: changefeedbase.OptVirtualColumnsNull,
testName: "main/primary_cols_with_virtual",
familyName: "main",
actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'second test')"},
includeVirtual: true,
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"second test", "1"},
Expand Down Expand Up @@ -286,26 +285,21 @@ CREATE TABLE foo (
targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY
}

details := jobspb.ChangefeedDetails{
Opts: map[string]string{
changefeedbase.OptVirtualColumns: string(tc.virtualColumn),
},
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{
{
Type: targetType,
TableID: tableDesc.GetID(),
FamilyName: tc.familyName,
},
},
}

for _, action := range tc.actions {
sqlDB.Exec(t, action)
}

targets := []jobspb.ChangefeedTargetSpecification{
{
Type: targetType,
TableID: tableDesc.GetID(),
FamilyName: tc.familyName,
},
}
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
ctx := context.Background()
decoder := NewEventDecoder(ctx, &serverCfg, details)
decoder, err := NewEventDecoder(ctx, &serverCfg, targets, tc.includeVirtual)
require.NoError(t, err)
expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily)
for i := 0; i < expectedEvents; i++ {
v := popRow(t)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func newRowFetcherCache(
cf *descs.CollectionFactory,
db *kv.DB,
specs []jobspb.ChangefeedTargetSpecification,
) *rowFetcherCache {
) (*rowFetcherCache, error) {
if len(specs) == 0 {
return nil, errors.AssertionFailedf("Expected at least one spec, found 0")
}
watchedFamilies := make(map[watchedFamily]struct{}, len(specs))
for _, s := range specs {
watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{}
Expand All @@ -92,7 +95,7 @@ func newRowFetcherCache(
db: db,
fetchers: cache.NewUnorderedCache(defaultCacheConfig),
watchedFamilies: watchedFamilies,
}
}, nil
}

func refreshUDT(
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func newChangeAggregatorProcessor(
}

if _, needTopics := ca.spec.Feed.Opts[changefeedbase.OptTopicInValue]; needTopics {
ca.topicNamer, err = MakeTopicNamer(ca.spec.Feed.TargetSpecifications)
ca.topicNamer, err = MakeTopicNamer(AllTargets(ca.spec.Feed))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -297,9 +297,16 @@ func (ca *changeAggregator) Start(ctx context.Context) {
return
}

ca.eventConsumer = newKVEventToRowConsumer(
ca.eventConsumer, err = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)

if err != nil {
// Early abort in the case that there is an error setting up the consumption.
ca.MoveToDraining(err)
ca.cancel()
return
}
}

func (ca *changeAggregator) startKVFeed(
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,9 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetS
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
if ts.StatementTimeName == "" {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
}
targets = append(targets, ts)
}
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,23 @@ func newKVEventToRowConsumer(
details jobspb.ChangefeedDetails,
knobs TestingKnobs,
topicNamer *TopicNamer,
) *kvEventToRowConsumer {
) (*kvEventToRowConsumer, error) {
includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull)
decoder, err := cdcevent.NewEventDecoder(ctx, cfg, AllTargets(details), includeVirtual)
if err != nil {
return nil, err
}
return &kvEventToRowConsumer{
frontier: frontier,
encoder: encoder,
decoder: cdcevent.NewEventDecoder(ctx, cfg, details),
decoder: decoder,
sink: sink,
cursor: cursor,
details: details,
knobs: knobs,
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
topicNamer: topicNamer,
}
}, nil
}

func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) {
Expand All @@ -75,7 +80,7 @@ func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (Topic
return topic, nil
}
}
for _, s := range c.details.TargetSpecifications {
for _, s := range AllTargets(c.details) {
if s.TableID == eventMeta.TableID && (s.FamilyName == "" || s.FamilyName == eventMeta.FamilyName) {
topic, err := makeTopicDescriptorFromSpec(s, eventMeta)
if err != nil {
Expand Down
Loading

0 comments on commit d8d995d

Please sign in to comment.