diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d9f406132178..8893a7a27de8 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -380,7 +381,15 @@ func runBackupProcessor( // value. The sentinel value of 1 forces the ExportRequest to paginate // after creating a single SST. header.TargetBytes = 1 - + admissionHeader := roachpb.AdmissionHeader{ + // Export requests are currently assigned NormalPri. + // + // TODO(dt): Consider linking this to/from the UserPriority field. + Priority: int32(admission.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + } log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawRes roachpb.Response @@ -398,8 +407,8 @@ func runBackupProcessor( ReqSentTime: reqSentTime.String(), }) - rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), - header, req) + rawRes, pErr = kv.SendWrappedWithAdmission(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), + header, admissionHeader, req) respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 9ab8e2e1ce44..607b7369c829 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -428,9 +428,24 @@ func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender { // `nil` context; an empty one is used in that case. func SendWrappedWith( ctx context.Context, sender Sender, h roachpb.Header, args roachpb.Request, +) (roachpb.Response, *roachpb.Error) { + return SendWrappedWithAdmission(ctx, sender, h, roachpb.AdmissionHeader{}, args) +} + +// SendWrappedWithAdmission is a convenience function which wraps the request +// in a batch and sends it via the provided Sender and headers. It returns the +// unwrapped response or an error. It's valid to pass a `nil` context; an +// empty one is used in that case. +func SendWrappedWithAdmission( + ctx context.Context, + sender Sender, + h roachpb.Header, + ah roachpb.AdmissionHeader, + args roachpb.Request, ) (roachpb.Response, *roachpb.Error) { ba := roachpb.BatchRequest{} ba.Header = h + ba.AdmissionHeader = ah ba.Add(args) br, pErr := sender.Send(ctx, ba) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 6e44ec546385..06e25cfe230b 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -68,6 +68,9 @@ type WorkPriority int8 const ( // LowPri is low priority work. LowPri WorkPriority = math.MinInt8 + // BulkNormalPri is bulk priority work from bulk jobs, which could be run due + // to user submissions or be automatic. + BulkNormalPri WorkPriority = -30 // NormalPri is normal priority work. NormalPri WorkPriority = 0 // HighPri is high priority work.