Skip to content

Commit

Permalink
span: Rewrite span frontier to use btree
Browse files Browse the repository at this point in the history
Rewrite span frontier to use btree instead of
LLRB (left leaning red black trees).

The old implementation was very CPU intensive, and
very ineficient in memory allocation.
Btree based implementation corrects these deficience.

Both implementation are available, with the ability
to revert to old LLRB based implementation via setting
`COCKROACH_BTREE_SPAN_FRONTIER_ENABLED` to false.

In addition to performance enhancements, the following
changes were made:
  * New implementation assumes that spans passed to the frontier
    during construction or Forward() are never mutated by the caller.
    This assumption is verified under unit tests.
  * Default Frontier implementation is no longer thread safe.  Instead,
    the caller may explicitly construct thread safe frontier via
    `MakeConcurrentFrontier`

Enahnce frontier tests by implementing fuzz tests.

Epic: CRDB-26372

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 12, 2023
1 parent 35175d4 commit b061ed4
Show file tree
Hide file tree
Showing 24 changed files with 4,028 additions and 605 deletions.
1 change: 1 addition & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto log_ch
pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto logging.md ../../../docs/generated/logging.md
pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto severity.go severity/severity_generated.go
pkg/util/log/sinks.go://go:generate mockgen -package=log -destination=mocks_generated_test.go --mock_names=TestingLogSink=MockLogSink . TestingLogSink
pkg/util/span/frontier.go://go:generate ../interval/generic/gen.sh *frontierEntry span
pkg/util/timeutil/zoneinfo.go://go:generate go run gen/main.go
"

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_span_coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func checkCoverage(
if err != nil {
return err
}
defer frontier.Release()

// The main loop below requires the entire frontier be caught up to the start
// time of each step it proceeds, however a span introduced in a later backup
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ func runGenerativeSplitAndScatter(
if err != nil {
return err
}
defer introducedSpanFrontier.Release()

backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User())
if err != nil {
return err
Expand All @@ -297,6 +299,8 @@ func runGenerativeSplitAndScatter(
if err != nil {
return err
}
defer checkpointFrontier.Release()

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
spec.HighWater,
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func restore(
if err != nil {
return emptyRowCount, err
}
defer introducedSpanFrontier.Release()

if err := checkCoverage(restoreCtx, dataToRestore.getSpans(), backupManifests); err != nil {
return emptyRowCount, err
Expand All @@ -310,6 +311,7 @@ func restore(
if err != nil {
return emptyRowCount, err
}
defer progressTracker.close()

var filter spanCoveringFilter
if filter, err = func() (spanCoveringFilter, error) {
Expand Down
16 changes: 11 additions & 5 deletions pkg/ccl/backupccl/restore_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type progressTracker struct {
// fields that may get updated while read are put in the lock.
syncutil.Mutex

checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier

// res tracks the amount of data that has been ingested.
res roachpb.RowCount
Expand Down Expand Up @@ -87,7 +87,7 @@ func makeProgressTracker(
) (*progressTracker, error) {

var (
checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier
err error
nextRequiredSpanKey map[string]roachpb.Key
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry
Expand Down Expand Up @@ -119,10 +119,16 @@ func makeProgressTracker(
pt.endTime = endTime
return pt, nil
}

func (pt *progressTracker) close() {
pt.mu.Lock()
defer pt.mu.Unlock()
if pt.mu.checkpointFrontier != nil {
pt.mu.checkpointFrontier.Release()
}
}
func loadCheckpointFrontier(
requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry,
) (*spanUtils.Frontier, error) {
) (spanUtils.Frontier, error) {
numRequiredSpans := len(requiredSpans) - 1
contiguousSpan := roachpb.Span{
Key: requiredSpans[0].Key,
Expand All @@ -146,7 +152,7 @@ func loadCheckpointFrontier(
// first N spans in the frontier that remain below the maxBytes memory limit
// will return.
func persistFrontier(
frontier *spanUtils.Frontier, maxBytes int64,
frontier spanUtils.Frontier, maxBytes int64,
) []jobspb.RestoreProgress_FrontierEntry {
var used int64
completedSpansSlice := make([]jobspb.RestoreProgress_FrontierEntry, 0)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func makeSimpleImportSpans(
// that manifests are sorted in increasing EndTime.
func createIntroducedSpanFrontier(
manifests []backuppb.BackupManifest, asOf hlc.Timestamp,
) (*spanUtils.Frontier, error) {
) (spanUtils.Frontier, error) {
introducedSpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -368,17 +368,17 @@ func makeEntry(start, end roachpb.Key) execinfrapb.RestoreSpanEntry {
// spanCoveringFilter holds metadata that filters which backups and required spans are used to
// populate a restoreSpanEntry
type spanCoveringFilter struct {
checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier
highWaterMark roachpb.Key
introducedSpanFrontier *spanUtils.Frontier
introducedSpanFrontier spanUtils.Frontier
useFrontierCheckpointing bool
targetSize int64
}

func makeSpanCoveringFilter(
checkpointFrontier *spanUtils.Frontier,
checkpointFrontier spanUtils.Frontier,
highWater roachpb.Key,
introducedSpanFrontier *spanUtils.Frontier,
introducedSpanFrontier spanUtils.Frontier,
targetSize int64,
useFrontierCheckpointing bool,
) (spanCoveringFilter, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func makeImportSpans(
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
highWaterMark []byte,
targetSize int64,
introducedSpanFrontier *spanUtils.Frontier,
introducedSpanFrontier spanUtils.Frontier,
completedSpans []jobspb.RestoreProgress_FrontierEntry,
useSimpleImportSpans bool,
) ([]execinfrapb.RestoreSpanEntry, error) {
Expand Down
44 changes: 23 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type timestampLowerBoundOracle interface {
}

type changeAggregatorLowerBoundOracle struct {
sf *span.Frontier
sf frontier
initialInclusiveLowerBound hlc.Timestamp
}

Expand Down Expand Up @@ -245,7 +245,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
return
}
timestampOracle := &changeAggregatorLowerBoundOracle{
sf: ca.frontier.SpanFrontier(),
sf: ca.frontier,
initialInclusiveLowerBound: feed.ScanTime,
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
ca.sink = &errorWrapperSink{wrapped: ca.sink}
ca.eventConsumer, ca.sink, err = newEventConsumer(
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
ca.MoveToDraining(err)
Expand Down Expand Up @@ -1672,20 +1672,11 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
}

func (s *spanFrontier) frontierTimestamp() hlc.Timestamp {
return s.Frontier.Frontier()
}

// schemaChangeFrontier encapsulates the span frontier, which keeps track of the
// per-span timestamps we no longer need to emit, along with information about
// the most recently observed schema change boundary.
type schemaChangeFrontier struct {
*spanFrontier
frontier span.Frontier

// schemaChangeBoundary values are communicated to the changeFrontier via
// Resolved messages send from the changeAggregators. The policy regarding
Expand Down Expand Up @@ -1729,8 +1720,7 @@ func makeSchemaChangeFrontier(
if err != nil {
return nil, err
}
return &schemaChangeFrontier{
spanFrontier: &spanFrontier{Frontier: sf},
scf := &schemaChangeFrontier{
initialHighWater: initialHighWater,
latestTs: initialHighWater,

Expand All @@ -1741,7 +1731,9 @@ func makeSchemaChangeFrontier(
// to ensure that sql pod in serverless deployment does
// not get shutdown immediately after changefeed starts.
latestKV: timeutil.Now(),
}, nil
}
scf.frontier = span.MakeConcurrentFrontier(sf)
return scf, nil
}

// ForwardResolvedSpan advances the timestamp for a resolved span, taking care
Expand Down Expand Up @@ -1774,12 +1766,22 @@ func (f *schemaChangeFrontier) ForwardLatestKV(ts time.Time) {

// Frontier returns the minimum timestamp being tracked.
func (f *schemaChangeFrontier) Frontier() hlc.Timestamp {
return f.frontierTimestamp()
return f.frontier.Frontier()
}

// Forward forwards span timestamp.
func (f *schemaChangeFrontier) Forward(s roachpb.Span, ts hlc.Timestamp) (bool, error) {
return f.frontier.Forward(s, ts)
}

// Entries forwards call to the underlying span frontier.
func (f *schemaChangeFrontier) Entries(fn span.Operation) {
f.frontier.Entries(fn)
}

// SpanFrontier returns underlying span.Frontier.
func (f *schemaChangeFrontier) SpanFrontier() *span.Frontier {
return f.spanFrontier.Frontier
// PeekFrontierSpan forwards call to the underlying span frontier.
func (f *schemaChangeFrontier) PeekFrontierSpan() roachpb.Span {
return f.frontier.PeekFrontierSpan()
}

type spanIter func(forEachSpan span.Operation)
Expand Down Expand Up @@ -1822,7 +1824,7 @@ func getCheckpointSpans(
func (f *schemaChangeFrontier) getCheckpointSpans(
maxBytes int64,
) (spans []roachpb.Span, timestamp hlc.Timestamp) {
return getCheckpointSpans(f.frontierTimestamp(), f.Entries, maxBytes)
return getCheckpointSpans(f.frontier.Frontier(), f.frontier.Entries, maxBytes)
}

// BackfillTS returns the timestamp of the incoming spans for an ongoing
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -90,7 +89,7 @@ func newEventConsumer(
cfg *execinfra.ServerConfig,
spec execinfrapb.ChangeAggregatorSpec,
feed ChangefeedConfig,
spanFrontier *span.Frontier,
spanFrontier frontier,
cursor hlc.Timestamp,
sink EventSink,
metrics *Metrics,
Expand Down Expand Up @@ -526,7 +525,7 @@ type parallelEventConsumer struct {

// spanFrontier stores the frontier for the aggregator
// that spawned this event consumer.
spanFrontier *span.Frontier
spanFrontier frontier

// termErr and termCh are used to save the first error that occurs
// in any worker and signal all workers to stop.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
if err != nil {
return err
}
rangeFeedResumeFrontier = span.MakeConcurrentFrontier(rangeFeedResumeFrontier)
defer rangeFeedResumeFrontier.Release()

for i := 0; ; i++ {
initialScan := i == 0
Expand Down Expand Up @@ -526,9 +528,7 @@ func (f *kvFeed) scanIfShould(
return spansToScan, scanTime, nil
}

func (f *kvFeed) runUntilTableEvent(
ctx context.Context, resumeFrontier *span.Frontier,
) (err error) {
func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Frontier) (err error) {
startFrom := resumeFrontier.Frontier()

// Determine whether to request the previous value of each update from
Expand Down Expand Up @@ -645,7 +645,7 @@ func copyFromSourceToDestUntilTableEvent(
ctx context.Context,
dest kvevent.Writer,
source kvevent.Reader,
frontier *span.Frontier,
frontier span.Frontier,
tables schemafeed.SchemaFeed,
endTime hlc.Timestamp,
knobs TestingKnobs,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile))
})

forwardFrontier := func(f *span.Frontier, s roachpb.Span, wall int64) bool {
forwardFrontier := func(f span.Frontier, s roachpb.Span, wall int64) bool {
forwarded, err := f.Forward(s, ts(wall))
require.NoError(t, err)
return forwarded
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func rawStream(
}

func subscriptionConsumer(
sub streamclient.Subscription, frontier *span.Frontier,
sub streamclient.Subscription, frontier span.Frontier,
) func(ctx context.Context) error {
return func(ctx context.Context) error {
var (
Expand All @@ -235,6 +235,8 @@ func subscriptionConsumer(
totalEventCount int
intervalEventCount int
)
defer frontier.Release()

intervalStart := timeutil.Now()
for {
var sz int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type streamIngestionFrontier struct {

// frontier contains the current resolved timestamp high-water for the tracked
// span set.
frontier *span.Frontier
frontier span.Frontier

// metrics are monitoring all running ingestion jobs.
metrics *Metrics
Expand Down Expand Up @@ -328,6 +328,8 @@ func (sf *streamIngestionFrontier) Next() (
}

func (sf *streamIngestionFrontier) close() {
defer sf.frontier.Release()

if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type streamIngestionProcessor struct {

// frontier keeps track of the progress for the spans tracked by this processor
// and is used forward resolved spans
frontier *span.Frontier
frontier span.Frontier
// lastFlushTime keeps track of the last time that we flushed due to a
// checkpoint timestamp event.
lastFlushTime time.Time
Expand Down Expand Up @@ -523,6 +523,8 @@ func (sip *streamIngestionProcessor) close() {
return
}

defer sip.frontier.Release()

// Stop the partition client, mergedSubscription, and
// cutoverPoller. All other goroutines should exit based on
// channel close events.
Expand Down Expand Up @@ -1230,7 +1232,7 @@ func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, erro
// frontierForSpan returns the lowest timestamp in the frontier within
// the given subspans. If the subspans are entirely outside the
// Frontier's tracked span an empty timestamp is returned.
func frontierForSpans(f *span.Frontier, spans ...roachpb.Span) hlc.Timestamp {
func frontierForSpans(f span.Frontier, spans ...roachpb.Span) hlc.Timestamp {
var (
minTimestamp hlc.Timestamp
sawEmptyTS bool
Expand Down
Loading

0 comments on commit b061ed4

Please sign in to comment.