diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 183d63eefad7..95a1cc2e4a80 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -25,8 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -201,8 +200,6 @@ func (p *poller) rangefeedImplIter(ctx context.Context, i int) error { } p.mu.Unlock() if scanTime != (hlc.Timestamp{}) { - // TODO(dan): Now that we no longer have the poller, we should stop using - // ExportRequest and start using normal Scans. if err := p.exportSpansParallel(ctx, spans, scanTime, backfillWithDiff); err != nil { return err } @@ -414,6 +411,7 @@ func (p *poller) exportSpansParallel( ) error { // Export requests for the various watched spans are executed in parallel, // with a semaphore-enforced limit based on a cluster setting. + // The spans here generally correspond with range boundaries. maxConcurrentExports := clusterNodeCount(p.gossip) * int(storage.ExportRequestsLimit.Get(&p.settings.SV)) exportsSem := make(chan struct{}, maxConcurrentExports) @@ -452,53 +450,83 @@ func (p *poller) exportSpansParallel( func (p *poller) exportSpan( ctx context.Context, span roachpb.Span, ts hlc.Timestamp, withDiff bool, ) error { - sender := p.db.NonTransactionalSender() + txn := p.db.NewTxn(ctx, "changefeed backfill") if log.V(2) { - log.Infof(ctx, `sending ExportRequest %s at %s`, span, ts) - } - - header := roachpb.Header{Timestamp: ts} - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span), - MVCCFilter: roachpb.MVCCFilter_Latest, - ReturnSST: true, - OmitChecksum: true, + log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts) } - + txn.SetFixedTimestamp(ctx, ts) stopwatchStart := timeutil.Now() - exported, pErr := client.SendWrappedWith(ctx, sender, header, req) - exportDuration := timeutil.Since(stopwatchStart) - if log.V(2) { - log.Infof(ctx, `finished ExportRequest %s at %s took %s`, - span, ts.AsOfSystemTime(), exportDuration) - } - slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV) - if exportDuration > slowExportThreshold { - log.Infof(ctx, "finished ExportRequest %s at %s took %s behind by %s", - span, ts, exportDuration, timeutil.Since(ts.GoTime())) - } - - if pErr != nil { - err := pErr.GoError() - return errors.Wrapf(err, `fetching changes for %s`, span) - } - p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) - - // When outputting a full scan, we want to use the schema at the scan - // timestamp, not the schema at the value timestamp. - schemaTimestamp := ts - stopwatchStart = timeutil.Now() - for _, file := range exported.(*roachpb.ExportResponse).Files { - if err := p.slurpSST(ctx, file.SST, schemaTimestamp, withDiff); err != nil { + var scanDuration, bufferDuration time.Duration + // TODO(ajwerner): Adopt the byte limit here as soon as it merges. This must + // happen in 20.1! + const maxKeysPerScan = 1 << 18 + for remaining := span; ; { + start := timeutil.Now() + b := txn.NewBatch() + r := roachpb.NewScan(remaining.Key, remaining.EndKey).(*roachpb.ScanRequest) + r.ScanFormat = roachpb.BATCH_RESPONSE + b.Header.MaxSpanRequestKeys = maxKeysPerScan + // NB: We use a raw request rather than the Scan() method because we want + // the MVCC timestamps which are encoded in the response but are filtered + // during result parsing. + b.AddRawRequest(r) + if err := txn.Run(ctx, b); err != nil { + return errors.Wrapf(err, `fetching changes for %s`, span) + } + afterScan := timeutil.Now() + res := b.RawResponse().Responses[0].GetScan() + if err := p.slurpScanResponse(ctx, remaining, res, ts, withDiff); err != nil { return err } + afterBuffer := timeutil.Now() + scanDuration += afterScan.Sub(start) + bufferDuration += afterBuffer.Sub(afterScan) + if res.ResumeSpan != nil { + remaining = *res.ResumeSpan + } else { + break + } } + p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds()) if err := p.buf.AddResolved(ctx, span, ts); err != nil { return err } - if log.V(2) { - log.Infof(ctx, `finished buffering %s took %s`, span, timeutil.Since(stopwatchStart)) + log.Infof(ctx, `finished Scan of %s at %s took %s`, + span, ts.AsOfSystemTime(), timeutil.Since(stopwatchStart)) + } + return nil +} + +// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into +// the poller's buffer. +func (p *poller) slurpScanResponse( + ctx context.Context, + span roachpb.Span, + res *roachpb.ScanResponse, + ts hlc.Timestamp, + withDiff bool, +) error { + for _, br := range res.BatchResponses { + for len(br) > 0 { + var kv roachpb.KeyValue + var err error + kv.Key, kv.Value.Timestamp, kv.Value.RawBytes, br, err = enginepb.ScanDecodeKeyValue(br) + if err != nil { + return errors.Wrapf(err, `decoding changes for %s`, span) + } + var prevVal roachpb.Value + if withDiff { + // Include the same value for the "before" and "after" KV, but + // interpret them at different timestamp. Specifically, interpret + // the "before" KV at the timestamp immediately before the schema + // change. This is handled in kvsToRows. + prevVal = kv.Value + } + if err = p.buf.AddKV(ctx, kv, prevVal, ts); err != nil { + return errors.Wrapf(err, `buffering changes for %s`, span) + } + } } return nil } @@ -529,80 +557,6 @@ func (p *poller) pollTableHistory(ctx context.Context) error { } } -// slurpSST iterates an encoded sst and inserts the contained kvs into the -// buffer. -func (p *poller) slurpSST( - ctx context.Context, sst []byte, schemaTimestamp hlc.Timestamp, withDiff bool, -) error { - var previousKey roachpb.Key - var kvs []roachpb.KeyValue - slurpKVs := func() error { - sort.Sort(byValueTimestamp(kvs)) - for _, kv := range kvs { - var prevVal roachpb.Value - if withDiff { - // Include the same value for the "before" and "after" KV, but - // interpret them at different timestamp. Specifically, interpret - // the "before" KV at the timestamp immediately before the schema - // change. This is handled in kvsToRows. - prevVal = kv.Value - } - if err := p.buf.AddKV(ctx, kv, prevVal, schemaTimestamp); err != nil { - return err - } - } - previousKey = previousKey[:0] - kvs = kvs[:0] - return nil - } - - var scratch bufalloc.ByteAllocator - it, err := engine.NewMemSSTIterator(sst, false /* verify */) - if err != nil { - return err - } - defer it.Close() - for it.SeekGE(engine.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - return err - } else if !ok { - break - } - - unsafeKey := it.UnsafeKey() - var key roachpb.Key - var value []byte - scratch, key = scratch.Copy(unsafeKey.Key, 0 /* extraCap */) - scratch, value = scratch.Copy(it.UnsafeValue(), 0 /* extraCap */) - - // The buffer currently requires that each key's mvcc revisions are - // added in increasing timestamp order. The sst is guaranteed to be in - // key order, but decresing timestamp order. So, buffer up kvs until the - // key changes, then sort by increasing timestamp before handing them - // all to AddKV. - if !previousKey.Equal(key) { - if err := slurpKVs(); err != nil { - return err - } - previousKey = key - } - kvs = append(kvs, roachpb.KeyValue{ - Key: key, - Value: roachpb.Value{RawBytes: value, Timestamp: unsafeKey.Timestamp}, - }) - } - - return slurpKVs() -} - -type byValueTimestamp []roachpb.KeyValue - -func (b byValueTimestamp) Len() int { return len(b) } -func (b byValueTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b byValueTimestamp) Less(i, j int) bool { - return b[i].Value.Timestamp.Less(b[j].Value.Timestamp) -} - func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) { rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0) if err != nil {