Skip to content

Commit

Permalink
changefeedccl: use ScanRequest instead of ExportRequest during backfills
Browse files Browse the repository at this point in the history
This PR is motivated by the desire to get the memory usage of CDC under
control in the presense of much larger ranges. Currently when a changefeed
decides it needs to do a backfill, it breaks the spans up along range
boundaries and then fetches the data (with some parallelism) for the backfill.

The memory overhead was somewhat bounded by the range size. If we want to make
the range size dramatically larger, the memory usage would become a function of
that new, much larger range size.

Fortunately, we don't have much need for these `ExportRequest`s any more.
Another fascinating revelation of late is that the `ScanResponse` does indeed
include MVCC timestamps (not the we necessarily needed them but it's a good
idea to keep them for compatibility).

The `ScanRequest` permits currently a limit on `NumRows` which this commit
utilized. I wanted to get this change typed in anticipation of cockroachdb#44341 which
will provide a limit on `NumBytes`.

I retained the existing parallelism as ScanRequests with limits are not
parallel.

I would like to do some benchmarking but I feel pretty okay about the
testing we have in place already. @danhhz what do you want to see here?

Relates to cockroachdb#39717.

Release note: None.
  • Loading branch information
ajwerner committed Feb 4, 2020
1 parent 5224918 commit 78a3ee8
Showing 1 changed file with 69 additions and 115 deletions.
184 changes: 69 additions & 115 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -201,8 +200,6 @@ func (p *poller) rangefeedImplIter(ctx context.Context, i int) error {
}
p.mu.Unlock()
if scanTime != (hlc.Timestamp{}) {
// TODO(dan): Now that we no longer have the poller, we should stop using
// ExportRequest and start using normal Scans.
if err := p.exportSpansParallel(ctx, spans, scanTime, backfillWithDiff); err != nil {
return err
}
Expand Down Expand Up @@ -414,6 +411,7 @@ func (p *poller) exportSpansParallel(
) error {
// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
maxConcurrentExports := clusterNodeCount(p.gossip) *
int(storage.ExportRequestsLimit.Get(&p.settings.SV))
exportsSem := make(chan struct{}, maxConcurrentExports)
Expand Down Expand Up @@ -452,53 +450,83 @@ func (p *poller) exportSpansParallel(
func (p *poller) exportSpan(
ctx context.Context, span roachpb.Span, ts hlc.Timestamp, withDiff bool,
) error {
sender := p.db.NonTransactionalSender()
txn := p.db.NewTxn(ctx, "changefeed backfill")
if log.V(2) {
log.Infof(ctx, `sending ExportRequest %s at %s`, span, ts)
}

header := roachpb.Header{Timestamp: ts}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
MVCCFilter: roachpb.MVCCFilter_Latest,
ReturnSST: true,
OmitChecksum: true,
log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts)
}

txn.SetFixedTimestamp(ctx, ts)
stopwatchStart := timeutil.Now()
exported, pErr := client.SendWrappedWith(ctx, sender, header, req)
exportDuration := timeutil.Since(stopwatchStart)
if log.V(2) {
log.Infof(ctx, `finished ExportRequest %s at %s took %s`,
span, ts.AsOfSystemTime(), exportDuration)
}
slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV)
if exportDuration > slowExportThreshold {
log.Infof(ctx, "finished ExportRequest %s at %s took %s behind by %s",
span, ts, exportDuration, timeutil.Since(ts.GoTime()))
}

if pErr != nil {
err := pErr.GoError()
return errors.Wrapf(err, `fetching changes for %s`, span)
}
p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds())

// When outputting a full scan, we want to use the schema at the scan
// timestamp, not the schema at the value timestamp.
schemaTimestamp := ts
stopwatchStart = timeutil.Now()
for _, file := range exported.(*roachpb.ExportResponse).Files {
if err := p.slurpSST(ctx, file.SST, schemaTimestamp, withDiff); err != nil {
var scanDuration, bufferDuration time.Duration
// TODO(ajwerner): Adopt the byte limit here as soon as it merges. This must
// happen in 20.1!
const maxKeysPerScan = 1 << 18
for remaining := span; ; {
start := timeutil.Now()
b := txn.NewBatch()
r := roachpb.NewScan(remaining.Key, remaining.EndKey).(*roachpb.ScanRequest)
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.MaxSpanRequestKeys = maxKeysPerScan
// NB: We use a raw request rather than the Scan() method because we want
// the MVCC timestamps which are encoded in the response but are filtered
// during result parsing.
b.AddRawRequest(r)
if err := txn.Run(ctx, b); err != nil {
return errors.Wrapf(err, `fetching changes for %s`, span)
}
afterScan := timeutil.Now()
res := b.RawResponse().Responses[0].GetScan()
if err := p.slurpScanResponse(ctx, remaining, res, ts, withDiff); err != nil {
return err
}
afterBuffer := timeutil.Now()
scanDuration += afterScan.Sub(start)
bufferDuration += afterBuffer.Sub(afterScan)
if res.ResumeSpan != nil {
remaining = *res.ResumeSpan
} else {
break
}
}
p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds())
if err := p.buf.AddResolved(ctx, span, ts); err != nil {
return err
}

if log.V(2) {
log.Infof(ctx, `finished buffering %s took %s`, span, timeutil.Since(stopwatchStart))
log.Infof(ctx, `finished Scan of %s at %s took %s`,
span, ts.AsOfSystemTime(), timeutil.Since(stopwatchStart))
}
return nil
}

// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into
// the poller's buffer.
func (p *poller) slurpScanResponse(
ctx context.Context,
span roachpb.Span,
res *roachpb.ScanResponse,
ts hlc.Timestamp,
withDiff bool,
) error {
for _, br := range res.BatchResponses {
for len(br) > 0 {
var kv roachpb.KeyValue
var err error
kv.Key, kv.Value.Timestamp, kv.Value.RawBytes, br, err = enginepb.ScanDecodeKeyValue(br)
if err != nil {
return errors.Wrapf(err, `decoding changes for %s`, span)
}
var prevVal roachpb.Value
if withDiff {
// Include the same value for the "before" and "after" KV, but
// interpret them at different timestamp. Specifically, interpret
// the "before" KV at the timestamp immediately before the schema
// change. This is handled in kvsToRows.
prevVal = kv.Value
}
if err = p.buf.AddKV(ctx, kv, prevVal, ts); err != nil {
return errors.Wrapf(err, `buffering changes for %s`, span)
}
}
}
return nil
}
Expand Down Expand Up @@ -529,80 +557,6 @@ func (p *poller) pollTableHistory(ctx context.Context) error {
}
}

// slurpSST iterates an encoded sst and inserts the contained kvs into the
// buffer.
func (p *poller) slurpSST(
ctx context.Context, sst []byte, schemaTimestamp hlc.Timestamp, withDiff bool,
) error {
var previousKey roachpb.Key
var kvs []roachpb.KeyValue
slurpKVs := func() error {
sort.Sort(byValueTimestamp(kvs))
for _, kv := range kvs {
var prevVal roachpb.Value
if withDiff {
// Include the same value for the "before" and "after" KV, but
// interpret them at different timestamp. Specifically, interpret
// the "before" KV at the timestamp immediately before the schema
// change. This is handled in kvsToRows.
prevVal = kv.Value
}
if err := p.buf.AddKV(ctx, kv, prevVal, schemaTimestamp); err != nil {
return err
}
}
previousKey = previousKey[:0]
kvs = kvs[:0]
return nil
}

var scratch bufalloc.ByteAllocator
it, err := engine.NewMemSSTIterator(sst, false /* verify */)
if err != nil {
return err
}
defer it.Close()
for it.SeekGE(engine.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
break
}

unsafeKey := it.UnsafeKey()
var key roachpb.Key
var value []byte
scratch, key = scratch.Copy(unsafeKey.Key, 0 /* extraCap */)
scratch, value = scratch.Copy(it.UnsafeValue(), 0 /* extraCap */)

// The buffer currently requires that each key's mvcc revisions are
// added in increasing timestamp order. The sst is guaranteed to be in
// key order, but decresing timestamp order. So, buffer up kvs until the
// key changes, then sort by increasing timestamp before handing them
// all to AddKV.
if !previousKey.Equal(key) {
if err := slurpKVs(); err != nil {
return err
}
previousKey = key
}
kvs = append(kvs, roachpb.KeyValue{
Key: key,
Value: roachpb.Value{RawBytes: value, Timestamp: unsafeKey.Timestamp},
})
}

return slurpKVs()
}

type byValueTimestamp []roachpb.KeyValue

func (b byValueTimestamp) Len() int { return len(b) }
func (b byValueTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byValueTimestamp) Less(i, j int) bool {
return b[i].Value.Timestamp.Less(b[j].Value.Timestamp)
}

func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) {
rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0)
if err != nil {
Expand Down

0 comments on commit 78a3ee8

Please sign in to comment.