Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: backupccl: mark backup ExportRequests as Bulk priority #79367

Merged
merged 1 commit into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -380,7 +381,15 @@ func runBackupProcessor(
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1

admissionHeader := roachpb.AdmissionHeader{
// Export requests are currently assigned NormalPri.
//
// TODO(dt): Consider linking this to/from the UserPriority field.
Priority: int32(admission.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawRes roachpb.Response
Expand All @@ -398,8 +407,8 @@ func runBackupProcessor(
ReqSentTime: reqSentTime.String(),
})

rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(),
header, req)
rawRes, pErr = kv.SendWrappedWithAdmission(ctx, flowCtx.Cfg.DB.NonTransactionalSender(),
header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,24 @@ func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender {
// `nil` context; an empty one is used in that case.
func SendWrappedWith(
ctx context.Context, sender Sender, h roachpb.Header, args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
return SendWrappedWithAdmission(ctx, sender, h, roachpb.AdmissionHeader{}, args)
}

// SendWrappedWithAdmission is a convenience function which wraps the request
// in a batch and sends it via the provided Sender and headers. It returns the
// unwrapped response or an error. It's valid to pass a `nil` context; an
// empty one is used in that case.
func SendWrappedWithAdmission(
ctx context.Context,
sender Sender,
h roachpb.Header,
ah roachpb.AdmissionHeader,
args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
ba := roachpb.BatchRequest{}
ba.Header = h
ba.AdmissionHeader = ah
ba.Add(args)

br, pErr := sender.Send(ctx, ba)
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type WorkPriority int8
const (
// LowPri is low priority work.
LowPri WorkPriority = math.MinInt8
// BulkNormalPri is bulk priority work from bulk jobs, which could be run due
// to user submissions or be automatic.
BulkNormalPri WorkPriority = -30
// NormalPri is normal priority work.
NormalPri WorkPriority = 0
// HighPri is high priority work.
Expand Down