Skip to content

Commit

Permalink
Merge #49425 #49900
Browse files Browse the repository at this point in the history
49425: backupccl: use distsql to distribute load of ExportRequests r=dt a=pbardea

Prior to this change, the backup job synchronized parallelization
through managing various channels. This change leverages DistSQL
infrastructure to distribute this work. In particular, this change
introduces a BackupProcessor whose primary responsibility is to issue
an ExportRequest and report its progress back to the backup coordinator.

A BackupProcessor is scheduled on each node and is responsible for
exporting the spans whose leaseholder are on that node. This means that
hopefully, the ExportRequest will be executed on the same node.

Backup progress and checkpointing is maintained by each processor
sending back the result from their export requests to the coordinator
with through DistSQL's metadata stream.

Part of #40239.

Release note: None

49900: colexec: increase support of aggregate functions r=yuzefovich a=yuzefovich

**colexec: add support of avg on ints and intervals**

This commit adds support of `avg` aggregate function on ints and
intervals. Adding intervals support was straightforward, but ints
support was a little trickier: average of ints is a decimal, so there
is some additional setup up work needed to override the result type.
Another complication was that we need to perform the summation using
"DECIMAL + INT" overload. In order to make the code more comprehensible
I introduced a separate `avgTmplInfo` struct that contains all the
necessary information for the code generation.

Fixes: #38823.
Addresses: #38845.

Release note (sql change): Vectorized execution engine now supports
`AVG` aggregate function on `Int`s and `Interval`s.

**colexec: add full support of sum and sum_int**

This commit adds the full support of `sum` and `sum_int` when operating
on integers (previously, `sum` wasn't supported at all whereas `sum_int`
only on `INT8`). The complication here is that `sum` on ints returns
a decimal and `sum_int` always returns `INT8` regardless of the argument
type width. The former is resolved in a similar fashion to how `avg` is
supported on ints, and the latter required introduction of a special
`assignFunc` that promotes the return type.

Fixes: #38845.

Release note (sql change): Vectorized execution engine now fully
supports `SUM` aggregate function (previously, the summation of integers
was incomplete).

**colexec: support min and max on int2 and int4**

Previously, `min` and `max` were disabled on INT2 and INT4 columns
because there was a mismatch between the logical expected type (INT8)
and the actual physical type (the type of the argument). Now this is
fixed by performing the cast and some extra templating.

Additionally, this commit cleans up the way we populate the output types
of the aggregate functions as well as removes some redundant return
values.

Release note (sql change): Vectorized execution engine now support `MIN`
and `MAX` aggregate functions on columns of INT2 and INT4 types.

**sql: miscellaneous cleanups**

This commit does the following:
1. unifies the way we set "no filter" on the window functions as well as
reuses the same constant for "column omitted" index
2. sets `VectorizeMode` of the testing eval context to `on` by default
3. disables fallback to rowexec for operators against processors tests
4. updates execplan to be defensive about nil processorConstructor.

Release note: None

Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jun 15, 2020
3 parents 9708a82 + 11d8188 + 946c87d commit 1a9c9f2
Show file tree
Hide file tree
Showing 51 changed files with 2,849 additions and 1,150 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/sort.eg.go \
pkg/sql/colexec/substring.eg.go \
pkg/sql/colexec/sum_agg.eg.go \
pkg/sql/colexec/sum_int_agg.eg.go \
pkg/sql/colexec/values_differ.eg.go \
pkg/sql/colexec/vec_comparators.eg.go \
pkg/sql/colexec/window_peer_grouper.eg.go
Expand Down
387 changes: 295 additions & 92 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ message BackupManifest {
sql.sqlbase.Descriptor desc = 3;
}

message Progress {
repeated File files = 1 [(gogoproto.nullable) = false];
util.hlc.Timestamp rev_start_time = 2 [(gogoproto.nullable) = false];
}

util.hlc.Timestamp start_time = 1 [(gogoproto.nullable) = false];
util.hlc.Timestamp end_time = 2 [(gogoproto.nullable) = false];
MVCCFilter mvcc_filter = 13 [(gogoproto.customname) = "MVCCFilter"];
Expand Down
228 changes: 76 additions & 152 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@ package backupccl
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -34,10 +33,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
)

// BackupCheckpointInterval is the interval at which backup progress is saved
Expand All @@ -50,7 +49,7 @@ func (r *RowCount) add(other RowCount) {
r.IndexEntries += other.IndexEntries
}

func countRows(raw roachpb.BulkOpSummary, pkIDs map[uint64]struct{}) RowCount {
func countRows(raw roachpb.BulkOpSummary, pkIDs map[uint64]bool) RowCount {
res := RowCount{DataSize: raw.DataSize}
for id, count := range raw.EntryCounts {
if _, ok := pkIDs[id]; ok {
Expand Down Expand Up @@ -169,8 +168,10 @@ type spanAndTime struct {
// file.
func backup(
ctx context.Context,
phs sql.PlanHookState,
defaultURI string,
urisByLocalityKV map[string]string,
db *kv.DB,
numClusterNodes int,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
storageByLocalityKV map[string]*roachpb.ExternalStorage,
Expand All @@ -183,14 +184,9 @@ func backup(
// TODO(dan): Figure out how permissions should work. #6713 is tracking this
// for grpc.

mu := struct {
syncutil.Mutex
files []BackupManifest_File
exported RowCount
lastCheckpoint time.Time
}{}

var checkpointMu syncutil.Mutex
var files []BackupManifest_File
var exported RowCount
var lastCheckpoint time.Time

var ranges []roachpb.RangeDescriptor
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -209,8 +205,8 @@ func backup(
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
mu.files = checkpointDesc.Files
mu.exported = checkpointDesc.EntryCounts
files = checkpointDesc.Files
exported = checkpointDesc.EntryCounts
for _, file := range checkpointDesc.Files {
if file.StartTime.IsEmpty() && !file.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, file.Span)
Expand All @@ -226,168 +222,94 @@ func backup(
spans := splitAndFilterSpans(backupManifest.Spans, completedSpans, ranges)
introducedSpans := splitAndFilterSpans(backupManifest.IntroducedSpans, completedIntroducedSpans, ranges)

allSpans := make([]spanAndTime, 0, len(spans)+len(introducedSpans))
for _, s := range introducedSpans {
allSpans = append(allSpans, spanAndTime{span: s, start: hlc.Timestamp{}, end: backupManifest.StartTime})
}
for _, s := range spans {
allSpans = append(allSpans, spanAndTime{span: s, start: backupManifest.StartTime, end: backupManifest.EndTime})
}

// Sequential ranges may have clustered leaseholders, for example a
// geo-partitioned table likely has all the leaseholders for some contiguous
// span of the table (i.e. a partition) pinned to just the nodes in a region.
// In such cases, sending spans sequentially may under-utilize the rest of the
// cluster given that we have a limit on the number of spans we send out at
// a given time. Randomizing the order of spans should help ensure a more even
// distribution of work across the cluster regardless of how leaseholders may
// or may not be clustered.
rand.Shuffle(len(allSpans), func(i, j int) {
allSpans[i], allSpans[j] = allSpans[j], allSpans[i]
})

progressLogger := jobs.NewChunkProgressLogger(job, len(spans), job.FractionCompleted(), jobs.ProgressUpdateOnly)

pkIDs := make(map[uint64]struct{})
for _, desc := range backupManifest.Descriptors {
if t := desc.Table(hlc.Timestamp{}); t != nil {
pkIDs[roachpb.BulkOpSummaryID(uint64(t.ID), uint64(t.PrimaryIndex.ID))] = struct{}{}
}
}

// We're already limiting these on the server-side, but sending all the
// Export requests at once would fill up distsender/grpc/something and cause
// all sorts of badness (node liveness timeouts leading to mass leaseholder
// transfers, poor performance on SQL workloads, etc) as well as log spam
// about slow distsender requests. Rate limit them here, too.
//
// Each node limits the number of running Export & Import requests it serves
// to avoid overloading the network, so multiply that by the number of nodes
// in the cluster and use that as the number of outstanding Export requests
// for the rate limiting. This attempts to strike a balance between
// simplicity, not getting slow distsender log spam, and keeping the server
// side limiter full.
//
// TODO(dan): Make this limiting per node.
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := numClusterNodes * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10
exportsSem := make(chan struct{}, maxConcurrentExports)

g := ctxgroup.WithContext(ctx)

requestFinishedCh := make(chan struct{}, len(spans)) // enough buffer to never block

// Only start the progress logger if there are spans, otherwise this will
// block forever. This is needed for TestBackupRestoreResume which doesn't
// have any spans. Users should never hit this.
g := ctxgroup.WithContext(ctx)
if len(spans) > 0 {
g.GoCtx(func(ctx context.Context) error {
// Currently the granularity of backup progress is the % of spans
// exported. Would improve accuracy if we tracked the actual size of each
// file.
return progressLogger.Loop(ctx, requestFinishedCh)
})
}

progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
g.GoCtx(func(ctx context.Context) error {
for i := range allSpans {
{
select {
case exportsSem <- struct{}{}:
case <-ctx.Done():
// Break the for loop to avoid creating more work - the backup
// has failed because either the context has been canceled or an
// error has been returned. Either way, Wait() is guaranteed to
// return an error now.
return ctx.Err()
}
// When a processor is done exporting a span, it will send a progress update
// to progCh.
for progress := range progCh {
var progDetails BackupManifest_Progress
if err := types.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil {
log.Errorf(ctx, "unable to unmarshal backup progress details: %+v", err)
}
if backupManifest.RevisionStartTime.Less(progDetails.RevStartTime) {
backupManifest.RevisionStartTime = progDetails.RevStartTime
}
for _, file := range progDetails.Files {
files = append(files, file)
exported.add(file.EntryCounts)
}

span := allSpans[i]
g.GoCtx(func(ctx context.Context) error {
defer func() { <-exportsSem }()
header := roachpb.Header{Timestamp: span.end}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
Storage: defaultStore.Conf(),
StorageByLocalityKV: storageByLocalityKV,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV),
MVCCFilter: roachpb.MVCCFilter(backupManifest.MVCCFilter),
Encryption: encryption,
}
rawRes, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "exporting %s", span.span)
}
res := rawRes.(*roachpb.ExportResponse)
// Signal that an ExportRequest finished to update job progress.
requestFinishedCh <- struct{}{}
var checkpointFiles BackupFileDescriptors
if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval {
checkpointFiles = append(checkpointFiles, files...)

mu.Lock()
if backupManifest.RevisionStartTime.Less(res.StartTime) {
backupManifest.RevisionStartTime = res.StartTime
}
for _, file := range res.Files {
f := BackupManifest_File{
Span: file.Span,
Path: file.Path,
Sha512: file.Sha512,
EntryCounts: countRows(file.Exported, pkIDs),
LocalityKV: file.LocalityKV,
}
if span.start != backupManifest.StartTime {
f.StartTime = span.start
f.EndTime = span.end
}
mu.files = append(mu.files, f)
mu.exported.add(f.EntryCounts)
}
var checkpointFiles BackupFileDescriptors
if timeutil.Since(mu.lastCheckpoint) > BackupCheckpointInterval {
// We optimistically assume the checkpoint will succeed to prevent
// multiple threads from attempting to checkpoint.
mu.lastCheckpoint = timeutil.Now()
checkpointFiles = append(checkpointFiles, mu.files...)
}
mu.Unlock()

requestFinishedCh <- struct{}{}

if checkpointFiles != nil {
// Make a copy while holding mu to avoid races while marshaling the
// manifest into the checkpoint file.
mu.Lock()
maninfestCopy := *backupManifest
mu.Unlock()

checkpointMu.Lock()
maninfestCopy.Files = checkpointFiles
err := writeBackupManifest(
ctx, settings, defaultStore, BackupManifestCheckpointName, encryption, &maninfestCopy,
)
checkpointMu.Unlock()
if err != nil {
log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err)
}
backupManifest.Files = checkpointFiles
err := writeBackupManifest(
ctx, settings, defaultStore, BackupManifestCheckpointName, encryption, backupManifest,
)
if err != nil {
log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err)
}
return nil
})

lastCheckpoint = timeutil.Now()
}
}
return nil
})

pkIDs := make(map[uint64]bool)
for _, desc := range backupManifest.Descriptors {
if t := desc.Table(hlc.Timestamp{}); t != nil {
pkIDs[roachpb.BulkOpSummaryID(uint64(t.ID), uint64(t.PrimaryIndex.ID))] = true
}
}

if err := distBackup(
ctx,
phs,
spans,
introducedSpans,
pkIDs,
defaultURI,
urisByLocalityKV,
encryption,
roachpb.MVCCFilter(backupManifest.MVCCFilter),
backupManifest.StartTime,
backupManifest.EndTime,
progCh,
); err != nil {
return RowCount{}, err
}

if err := g.Wait(); err != nil {
return RowCount{}, errors.Wrapf(err, "exporting %d ranges", errors.Safe(len(spans)))
}

// No more concurrency, so no need to acquire locks below.

backupManifest.Files = mu.files
backupManifest.EntryCounts = mu.exported
backupManifest.Files = files
backupManifest.EntryCounts = exported

backupID := uuid.MakeV4()
backupManifest.ID = backupID
// Write additional partial descriptors to each node for partitioned backups.
if len(storageByLocalityKV) > 0 {
filesByLocalityKV := make(map[string][]BackupManifest_File)
for i := range mu.files {
file := &mu.files[i]
for i := range files {
file := &files[i]
filesByLocalityKV[file.LocalityKV] = append(filesByLocalityKV[file.LocalityKV], *file)
}

Expand Down Expand Up @@ -424,7 +346,7 @@ func backup(
return RowCount{}, err
}

return mu.exported, nil
return exported, nil
}

func (b *backupResumer) releaseProtectedTimestamp(
Expand Down Expand Up @@ -529,8 +451,10 @@ func (b *backupResumer) Resume(

res, err := backup(
ctx,
p,
details.URI,
details.URIsByLocalityKV,
p.ExecCfg().DB,
numClusterNodes,
p.ExecCfg().Settings,
defaultStore,
storageByLocalityKV,
Expand Down
Loading

0 comments on commit 1a9c9f2

Please sign in to comment.