Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110516: span: Rewrite span frontier to use btree  r=miretskiy a=miretskiy

Rewrite span frontier to use btree instead of
LLRB (left leaning red black trees).

The old implementation was very CPU intensive, and
very inefficient in memory allocation.
Btree based implementation corrects these deficiencies.

Both implementation are available, with the ability
to revert to old LLRB based implementation via setting
`COCKROACH_BTREE_SPAN_FRONTIER_ENABLED` to false.

In addition to performance enhancements, the following
changes were made:
  * New implementation assumes that spans passed to the frontier
    during construction or Forward() are never mutated by the caller.
    This assumption is verified under unit tests.
  * Default Frontier implementation is no longer thread safe.  Instead,
    the caller may explicitly construct thread safe frontier via
    `MakeConcurrentFrontier`

Enhance frontier tests by implementing fuzz tests.
New implementation ~50% faster and amortized 0 allocation.

Epic: CRDB-26372

Release note: None


114513: kvserver,storage: optimize rangefeed reads for 1PC transactions r=nvanbenschoten a=sumeerbhola

Writes from 1PC transactions now use Pebble's NewBatchOnlyIter to read the value from the batch, avoiding a seek of all levels in the engine.

Fixes #113090

Epic: none

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Nov 30, 2023
3 parents 334e6d0 + c726497 + 56a8061 commit 04d27d9
Show file tree
Hide file tree
Showing 33 changed files with 4,346 additions and 633 deletions.
1 change: 1 addition & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto log_ch
pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto logging.md ../../../docs/generated/logging.md
pkg/util/log/channels.go://go:generate go run gen/main.go logpb/log.proto severity.go severity/severity_generated.go
pkg/util/log/sinks.go://go:generate mockgen -package=log -destination=mocks_generated_test.go --mock_names=TestingLogSink=MockLogSink . TestingLogSink
pkg/util/span/frontier.go://go:generate ../interval/generic/gen.sh *frontierEntry span
pkg/util/timeutil/zoneinfo.go://go:generate go run gen/main.go
pkg/internal/team/team.go://go:generate cp ../../../TEAMS.yaml TEAMS.yaml
"
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_span_coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func checkCoverage(
if err != nil {
return err
}
defer frontier.Release()

// The main loop below requires the entire frontier be caught up to the start
// time of each step it proceeds, however a span introduced in a later backup
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ func runGenerativeSplitAndScatter(
if err != nil {
return errors.Wrap(err, "making introduced span frontier")
}
defer introducedSpanFrontier.Release()

backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User())
if err != nil {
return errors.Wrap(err, "making backup locality map")
Expand All @@ -466,6 +468,8 @@ func runGenerativeSplitAndScatter(
if err != nil {
return errors.Wrap(err, "loading checkpoint frontier")
}
defer checkpointFrontier.Release()

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
spec.HighWater,
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,6 @@ func restore(
return emptyRowCount, errors.Wrap(err, "resolving locality locations")
}

introducedSpanFrontier, err := createIntroducedSpanFrontier(backupManifests, endTime)
if err != nil {
return emptyRowCount, err
}

if err := checkCoverage(restoreCtx, dataToRestore.getSpans(), backupManifests); err != nil {
return emptyRowCount, err
}
Expand All @@ -300,6 +295,13 @@ func restore(
if err != nil {
return emptyRowCount, err
}
defer progressTracker.close()

introducedSpanFrontier, err := createIntroducedSpanFrontier(backupManifests, endTime)
if err != nil {
return emptyRowCount, err
}
defer introducedSpanFrontier.Release()

targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV)
if details.ExperimentalOnline {
Expand Down
16 changes: 11 additions & 5 deletions pkg/ccl/backupccl/restore_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type progressTracker struct {
// fields that may get updated while read are put in the lock.
syncutil.Mutex

checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier

// res tracks the amount of data that has been ingested.
res roachpb.RowCount
Expand Down Expand Up @@ -87,7 +87,7 @@ func makeProgressTracker(
) (*progressTracker, error) {

var (
checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier
err error
nextRequiredSpanKey map[string]roachpb.Key
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry
Expand Down Expand Up @@ -119,10 +119,16 @@ func makeProgressTracker(
pt.endTime = endTime
return pt, nil
}

func (pt *progressTracker) close() {
pt.mu.Lock()
defer pt.mu.Unlock()
if pt.mu.checkpointFrontier != nil {
pt.mu.checkpointFrontier.Release()
}
}
func loadCheckpointFrontier(
requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry,
) (*spanUtils.Frontier, error) {
) (spanUtils.Frontier, error) {
numRequiredSpans := len(requiredSpans) - 1
contiguousSpan := roachpb.Span{
Key: requiredSpans[0].Key,
Expand All @@ -146,7 +152,7 @@ func loadCheckpointFrontier(
// first N spans in the frontier that remain below the maxBytes memory limit
// will return.
func persistFrontier(
frontier *spanUtils.Frontier, maxBytes int64,
frontier spanUtils.Frontier, maxBytes int64,
) []jobspb.RestoreProgress_FrontierEntry {
var used int64
completedSpansSlice := make([]jobspb.RestoreProgress_FrontierEntry, 0)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ var _ backupManifestFileIterator = &sstFileIterator{}
// Note: this function assumes that manifests are sorted in increasing EndTime.
func createIntroducedSpanFrontier(
manifests []backuppb.BackupManifest, asOf hlc.Timestamp,
) (*spanUtils.Frontier, error) {
introducedSpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{})
) (spanUtils.Frontier, error) {
introducedSpanFrontier, err := spanUtils.MakeFrontier()
if err != nil {
return nil, err
}
Expand All @@ -132,17 +132,17 @@ func createIntroducedSpanFrontier(
// spanCoveringFilter holds metadata that filters which backups and required spans are used to
// populate a restoreSpanEntry
type spanCoveringFilter struct {
checkpointFrontier *spanUtils.Frontier
checkpointFrontier spanUtils.Frontier
highWaterMark roachpb.Key
introducedSpanFrontier *spanUtils.Frontier
introducedSpanFrontier spanUtils.Frontier
useFrontierCheckpointing bool
targetSize int64
}

func makeSpanCoveringFilter(
checkpointFrontier *spanUtils.Frontier,
checkpointFrontier spanUtils.Frontier,
highWater roachpb.Key,
introducedSpanFrontier *spanUtils.Frontier,
introducedSpanFrontier spanUtils.Frontier,
targetSize int64,
useFrontierCheckpointing bool,
) (spanCoveringFilter, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func makeImportSpans(
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
highWaterMark []byte,
targetSize int64,
introducedSpanFrontier *spanUtils.Frontier,
introducedSpanFrontier spanUtils.Frontier,
completedSpans []jobspb.RestoreProgress_FrontierEntry,
) ([]execinfrapb.RestoreSpanEntry, error) {
cover := make([]execinfrapb.RestoreSpanEntry, 0)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{c.sp("a", "h"), c.sp("j", "k")},
{c.sp("h", "i"), c.sp("l", "m")}})

emptySpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{})
emptySpanFrontier, err := spanUtils.MakeFrontier()
require.NoError(t, err)

layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage,
Expand Down
38 changes: 12 additions & 26 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type timestampLowerBoundOracle interface {
}

type changeAggregatorLowerBoundOracle struct {
sf *span.Frontier
sf frontier
initialInclusiveLowerBound hlc.Timestamp
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
return
}
timestampOracle := &changeAggregatorLowerBoundOracle{
sf: ca.frontier.SpanFrontier(),
sf: ca.frontier,
initialInclusiveLowerBound: feed.ScanTime,
}

Expand Down Expand Up @@ -319,7 +319,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
ca.sink = &errorWrapperSink{wrapped: ca.sink}
ca.eventConsumer, ca.sink, err = newEventConsumer(
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
ca.MoveToDraining(err)
Expand Down Expand Up @@ -1705,20 +1705,15 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
}

func (s *spanFrontier) frontierTimestamp() hlc.Timestamp {
return s.Frontier.Frontier()
}
// type alias to make it possible to embed and forward calls (e.g. Frontier())
// to the underlying span.Frontier.
type spanFrontier = span.Frontier

// schemaChangeFrontier encapsulates the span frontier, which keeps track of the
// per-span timestamps we no longer need to emit, along with information about
// the most recently observed schema change boundary.
type schemaChangeFrontier struct {
*spanFrontier
spanFrontier

// schemaChangeBoundary values are communicated to the changeFrontier via
// Resolved messages send from the changeAggregators. The policy regarding
Expand Down Expand Up @@ -1762,8 +1757,7 @@ func makeSchemaChangeFrontier(
if err != nil {
return nil, err
}
return &schemaChangeFrontier{
spanFrontier: &spanFrontier{Frontier: sf},
scf := &schemaChangeFrontier{
initialHighWater: initialHighWater,
latestTs: initialHighWater,

Expand All @@ -1774,7 +1768,9 @@ func makeSchemaChangeFrontier(
// to ensure that sql pod in serverless deployment does
// not get shutdown immediately after changefeed starts.
latestKV: timeutil.Now(),
}, nil
}
scf.spanFrontier = span.MakeConcurrentFrontier(sf)
return scf, nil
}

// ForwardResolvedSpan advances the timestamp for a resolved span, taking care
Expand Down Expand Up @@ -1805,16 +1801,6 @@ func (f *schemaChangeFrontier) ForwardLatestKV(ts time.Time) {
}
}

// Frontier returns the minimum timestamp being tracked.
func (f *schemaChangeFrontier) Frontier() hlc.Timestamp {
return f.frontierTimestamp()
}

// SpanFrontier returns underlying span.Frontier.
func (f *schemaChangeFrontier) SpanFrontier() *span.Frontier {
return f.spanFrontier.Frontier
}

type spanIter func(forEachSpan span.Operation)

// getCheckpointSpans returns as many spans that should be checkpointed (are
Expand Down Expand Up @@ -1855,7 +1841,7 @@ func getCheckpointSpans(
func (f *schemaChangeFrontier) getCheckpointSpans(
maxBytes int64,
) (spans []roachpb.Span, timestamp hlc.Timestamp) {
return getCheckpointSpans(f.frontierTimestamp(), f.Entries, maxBytes)
return getCheckpointSpans(f.Frontier(), f.Entries, maxBytes)
}

// BackfillTS returns the timestamp of the incoming spans for an ongoing
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -84,7 +83,7 @@ func newEventConsumer(
cfg *execinfra.ServerConfig,
spec execinfrapb.ChangeAggregatorSpec,
feed ChangefeedConfig,
spanFrontier *span.Frontier,
spanFrontier frontier,
cursor hlc.Timestamp,
sink EventSink,
metrics *Metrics,
Expand Down Expand Up @@ -516,7 +515,7 @@ type parallelEventConsumer struct {

// spanFrontier stores the frontier for the aggregator
// that spawned this event consumer.
spanFrontier *span.Frontier
spanFrontier frontier

// termErr and termCh are used to save the first error that occurs
// in any worker and signal all workers to stop.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
if err != nil {
return err
}
rangeFeedResumeFrontier = span.MakeConcurrentFrontier(rangeFeedResumeFrontier)
defer rangeFeedResumeFrontier.Release()

for i := 0; ; i++ {
initialScan := i == 0
Expand Down Expand Up @@ -526,9 +528,7 @@ func (f *kvFeed) scanIfShould(
return spansToScan, scanTime, nil
}

func (f *kvFeed) runUntilTableEvent(
ctx context.Context, resumeFrontier *span.Frontier,
) (err error) {
func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Frontier) (err error) {
startFrom := resumeFrontier.Frontier()

// Determine whether to request the previous value of each update from
Expand Down Expand Up @@ -645,7 +645,7 @@ func copyFromSourceToDestUntilTableEvent(
ctx context.Context,
dest kvevent.Writer,
source kvevent.Reader,
frontier *span.Frontier,
frontier span.Frontier,
tables schemafeed.SchemaFeed,
endTime hlc.Timestamp,
knobs TestingKnobs,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile))
})

forwardFrontier := func(f *span.Frontier, s roachpb.Span, wall int64) bool {
forwardFrontier := func(f span.Frontier, s roachpb.Span, wall int64) bool {
forwarded, err := f.Forward(s, ts(wall))
require.NoError(t, err)
return forwarded
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func rawStream(
}

func subscriptionConsumer(
sub streamclient.Subscription, frontier *span.Frontier,
sub streamclient.Subscription, frontier span.Frontier,
) func(ctx context.Context) error {
return func(ctx context.Context) error {
var (
Expand All @@ -235,6 +235,8 @@ func subscriptionConsumer(
totalEventCount int
intervalEventCount int
)
defer frontier.Release()

intervalStart := timeutil.Now()
for {
var sz int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func constructSpanFrontierExecutionDetails(
}

func constructSpanFrontierExecutionDetailsWithFrontier(
partitionSpecs execinfrapb.StreamIngestionPartitionSpecs, f *span.Frontier,
partitionSpecs execinfrapb.StreamIngestionPartitionSpecs, f span.Frontier,
) []frontierExecutionDetails {
now := timeutil.Now()
res := make([]frontierExecutionDetails, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type streamIngestionFrontier struct {

// frontier contains the current resolved timestamp high-water for the tracked
// span set.
frontier *span.Frontier
frontier span.Frontier

// metrics are monitoring all running ingestion jobs.
metrics *Metrics
Expand Down Expand Up @@ -341,6 +341,8 @@ func (sf *streamIngestionFrontier) Next() (
}

func (sf *streamIngestionFrontier) close() {
defer sf.frontier.Release()

if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ type streamIngestionProcessor struct {

// frontier keeps track of the progress for the spans tracked by this processor
// and is used forward resolved spans
frontier *span.Frontier
frontier span.Frontier
// lastFlushTime keeps track of the last time that we flushed due to a
// checkpoint timestamp event.
lastFlushTime time.Time
Expand Down Expand Up @@ -528,6 +528,8 @@ func (sip *streamIngestionProcessor) close() {
return
}

defer sip.frontier.Release()

// Stop the partition client, mergedSubscription, and
// cutoverPoller. All other goroutines should exit based on
// channel close events.
Expand Down Expand Up @@ -1276,7 +1278,7 @@ func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, erro
// frontierForSpan returns the lowest timestamp in the frontier within
// the given subspans. If the subspans are entirely outside the
// Frontier's tracked span an empty timestamp is returned.
func frontierForSpans(f *span.Frontier, spans ...roachpb.Span) hlc.Timestamp {
func frontierForSpans(f span.Frontier, spans ...roachpb.Span) hlc.Timestamp {
var (
minTimestamp hlc.Timestamp
sawEmptyTS bool
Expand Down
Loading

0 comments on commit 04d27d9

Please sign in to comment.