From 78a3ee88ceafd08627d2defef3ae03e528440567 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 3 Feb 2020 17:42:53 -0500 Subject: [PATCH] changefeedccl: use ScanRequest instead of ExportRequest during backfills This PR is motivated by the desire to get the memory usage of CDC under control in the presense of much larger ranges. Currently when a changefeed decides it needs to do a backfill, it breaks the spans up along range boundaries and then fetches the data (with some parallelism) for the backfill. The memory overhead was somewhat bounded by the range size. If we want to make the range size dramatically larger, the memory usage would become a function of that new, much larger range size. Fortunately, we don't have much need for these `ExportRequest`s any more. Another fascinating revelation of late is that the `ScanResponse` does indeed include MVCC timestamps (not the we necessarily needed them but it's a good idea to keep them for compatibility). The `ScanRequest` permits currently a limit on `NumRows` which this commit utilized. I wanted to get this change typed in anticipation of #44341 which will provide a limit on `NumBytes`. I retained the existing parallelism as ScanRequests with limits are not parallel. I would like to do some benchmarking but I feel pretty okay about the testing we have in place already. @danhhz what do you want to see here? Relates to #39717. Release note: None. --- pkg/ccl/changefeedccl/poller.go | 184 ++++++++++++-------------------- 1 file changed, 69 insertions(+), 115 deletions(-) diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 183d63eefad7..95a1cc2e4a80 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -25,8 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -201,8 +200,6 @@ func (p *poller) rangefeedImplIter(ctx context.Context, i int) error { } p.mu.Unlock() if scanTime != (hlc.Timestamp{}) { - // TODO(dan): Now that we no longer have the poller, we should stop using - // ExportRequest and start using normal Scans. if err := p.exportSpansParallel(ctx, spans, scanTime, backfillWithDiff); err != nil { return err } @@ -414,6 +411,7 @@ func (p *poller) exportSpansParallel( ) error { // Export requests for the various watched spans are executed in parallel, // with a semaphore-enforced limit based on a cluster setting. + // The spans here generally correspond with range boundaries. maxConcurrentExports := clusterNodeCount(p.gossip) * int(storage.ExportRequestsLimit.Get(&p.settings.SV)) exportsSem := make(chan struct{}, maxConcurrentExports) @@ -452,53 +450,83 @@ func (p *poller) exportSpansParallel( func (p *poller) exportSpan( ctx context.Context, span roachpb.Span, ts hlc.Timestamp, withDiff bool, ) error { - sender := p.db.NonTransactionalSender() + txn := p.db.NewTxn(ctx, "changefeed backfill") if log.V(2) { - log.Infof(ctx, `sending ExportRequest %s at %s`, span, ts) - } - - header := roachpb.Header{Timestamp: ts} - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span), - MVCCFilter: roachpb.MVCCFilter_Latest, - ReturnSST: true, - OmitChecksum: true, + log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts) } - + txn.SetFixedTimestamp(ctx, ts) stopwatchStart := timeutil.Now() - exported, pErr := client.SendWrappedWith(ctx, sender, header, req) - exportDuration := timeutil.Since(stopwatchStart) - if log.V(2) { - log.Infof(ctx, `finished ExportRequest %s at %s took %s`, - span, ts.AsOfSystemTime(), exportDuration) - } - slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV) - if exportDuration > slowExportThreshold { - log.Infof(ctx, "finished ExportRequest %s at %s took %s behind by %s", - span, ts, exportDuration, timeutil.Since(ts.GoTime())) - } - - if pErr != nil { - err := pErr.GoError() - return errors.Wrapf(err, `fetching changes for %s`, span) - } - p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) - - // When outputting a full scan, we want to use the schema at the scan - // timestamp, not the schema at the value timestamp. - schemaTimestamp := ts - stopwatchStart = timeutil.Now() - for _, file := range exported.(*roachpb.ExportResponse).Files { - if err := p.slurpSST(ctx, file.SST, schemaTimestamp, withDiff); err != nil { + var scanDuration, bufferDuration time.Duration + // TODO(ajwerner): Adopt the byte limit here as soon as it merges. This must + // happen in 20.1! + const maxKeysPerScan = 1 << 18 + for remaining := span; ; { + start := timeutil.Now() + b := txn.NewBatch() + r := roachpb.NewScan(remaining.Key, remaining.EndKey).(*roachpb.ScanRequest) + r.ScanFormat = roachpb.BATCH_RESPONSE + b.Header.MaxSpanRequestKeys = maxKeysPerScan + // NB: We use a raw request rather than the Scan() method because we want + // the MVCC timestamps which are encoded in the response but are filtered + // during result parsing. + b.AddRawRequest(r) + if err := txn.Run(ctx, b); err != nil { + return errors.Wrapf(err, `fetching changes for %s`, span) + } + afterScan := timeutil.Now() + res := b.RawResponse().Responses[0].GetScan() + if err := p.slurpScanResponse(ctx, remaining, res, ts, withDiff); err != nil { return err } + afterBuffer := timeutil.Now() + scanDuration += afterScan.Sub(start) + bufferDuration += afterBuffer.Sub(afterScan) + if res.ResumeSpan != nil { + remaining = *res.ResumeSpan + } else { + break + } } + p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds()) if err := p.buf.AddResolved(ctx, span, ts); err != nil { return err } - if log.V(2) { - log.Infof(ctx, `finished buffering %s took %s`, span, timeutil.Since(stopwatchStart)) + log.Infof(ctx, `finished Scan of %s at %s took %s`, + span, ts.AsOfSystemTime(), timeutil.Since(stopwatchStart)) + } + return nil +} + +// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into +// the poller's buffer. +func (p *poller) slurpScanResponse( + ctx context.Context, + span roachpb.Span, + res *roachpb.ScanResponse, + ts hlc.Timestamp, + withDiff bool, +) error { + for _, br := range res.BatchResponses { + for len(br) > 0 { + var kv roachpb.KeyValue + var err error + kv.Key, kv.Value.Timestamp, kv.Value.RawBytes, br, err = enginepb.ScanDecodeKeyValue(br) + if err != nil { + return errors.Wrapf(err, `decoding changes for %s`, span) + } + var prevVal roachpb.Value + if withDiff { + // Include the same value for the "before" and "after" KV, but + // interpret them at different timestamp. Specifically, interpret + // the "before" KV at the timestamp immediately before the schema + // change. This is handled in kvsToRows. + prevVal = kv.Value + } + if err = p.buf.AddKV(ctx, kv, prevVal, ts); err != nil { + return errors.Wrapf(err, `buffering changes for %s`, span) + } + } } return nil } @@ -529,80 +557,6 @@ func (p *poller) pollTableHistory(ctx context.Context) error { } } -// slurpSST iterates an encoded sst and inserts the contained kvs into the -// buffer. -func (p *poller) slurpSST( - ctx context.Context, sst []byte, schemaTimestamp hlc.Timestamp, withDiff bool, -) error { - var previousKey roachpb.Key - var kvs []roachpb.KeyValue - slurpKVs := func() error { - sort.Sort(byValueTimestamp(kvs)) - for _, kv := range kvs { - var prevVal roachpb.Value - if withDiff { - // Include the same value for the "before" and "after" KV, but - // interpret them at different timestamp. Specifically, interpret - // the "before" KV at the timestamp immediately before the schema - // change. This is handled in kvsToRows. - prevVal = kv.Value - } - if err := p.buf.AddKV(ctx, kv, prevVal, schemaTimestamp); err != nil { - return err - } - } - previousKey = previousKey[:0] - kvs = kvs[:0] - return nil - } - - var scratch bufalloc.ByteAllocator - it, err := engine.NewMemSSTIterator(sst, false /* verify */) - if err != nil { - return err - } - defer it.Close() - for it.SeekGE(engine.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - return err - } else if !ok { - break - } - - unsafeKey := it.UnsafeKey() - var key roachpb.Key - var value []byte - scratch, key = scratch.Copy(unsafeKey.Key, 0 /* extraCap */) - scratch, value = scratch.Copy(it.UnsafeValue(), 0 /* extraCap */) - - // The buffer currently requires that each key's mvcc revisions are - // added in increasing timestamp order. The sst is guaranteed to be in - // key order, but decresing timestamp order. So, buffer up kvs until the - // key changes, then sort by increasing timestamp before handing them - // all to AddKV. - if !previousKey.Equal(key) { - if err := slurpKVs(); err != nil { - return err - } - previousKey = key - } - kvs = append(kvs, roachpb.KeyValue{ - Key: key, - Value: roachpb.Value{RawBytes: value, Timestamp: unsafeKey.Timestamp}, - }) - } - - return slurpKVs() -} - -type byValueTimestamp []roachpb.KeyValue - -func (b byValueTimestamp) Len() int { return len(b) } -func (b byValueTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b byValueTimestamp) Less(i, j int) bool { - return b[i].Value.Timestamp.Less(b[j].Value.Timestamp) -} - func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) { rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0) if err != nil {