Skip to content

Commit

Permalink
changefeedccl: move PTS code to dedicated files
Browse files Browse the repository at this point in the history
This change moves tests and helpers related to managing PTS
records to their own dedicated files. This change also
adds comments to each test related to PTS records.

Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Jul 13, 2023
1 parent 78f390d commit 8c7a79c
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 547 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"parallel_io.go",
"parquet.go",
"parquet_sink_cloudstorage.go",
"protected_timestamps.go",
"retry.go",
"scheduled_changefeed.go",
"schema_registry.go",
Expand Down Expand Up @@ -180,7 +181,6 @@ go_test(
srcs = [
"alter_changefeed_test.go",
"avro_test.go",
"changefeed_dist_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_test.go",
Expand All @@ -190,6 +190,7 @@ go_test(
"name_test.go",
"nemeses_test.go",
"parquet_test.go",
"protected_timestamps_test.go",
"scheduled_changefeed_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
Expand Down
58 changes: 0 additions & 58 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/gogo/protobuf/jsonpb"
)

Expand Down Expand Up @@ -108,58 +102,6 @@ func emitResolvedTimestamp(
return nil
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp.
func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
jobID jobspb.JobID,
targets changefeedbase.Targets,
resolved hlc.Timestamp,
) *ptpb.Record {
ptsID := uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", ptsID, resolved)
return jobsprotectedts.MakeRecord(
ptsID, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
}

func makeTargetToProtect(targets changefeedbase.Targets) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+1)
_ = targets.EachTableID(func(id descpb.ID) error {
tablesToProtect = append(tablesToProtect, id)
return nil
})
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets changefeedbase.Targets) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
spansToProtect := make([]roachpb.Span, 0, targets.NumUniqueTables()+1)
addTablePrefix := func(id uint32) {
tablePrefix := codec.TablePrefix(id)
spansToProtect = append(spansToProtect, roachpb.Span{
Key: tablePrefix,
EndKey: tablePrefix.PrefixEnd(),
})
}
_ = targets.EachTableID(func(id descpb.ID) error {
addTablePrefix(uint32(id))
return nil
})
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
}

// Inject the change feed details marshal logic into the jobspb package.
func init() {
jobspb.ChangefeedDetailsMarshaler = func(m *jobspb.ChangefeedDetails, marshaller *jsonpb.Marshaler) ([]byte, error) {
Expand Down
61 changes: 0 additions & 61 deletions pkg/ccl/changefeedccl/changefeed_dist_test.go

This file was deleted.

Loading

0 comments on commit 8c7a79c

Please sign in to comment.