Skip to content

Commit

Permalink
*: enables elastic CPU limiter for all users of ExportRequest
Browse files Browse the repository at this point in the history
Previously, there was a strange coupling between the elastic CPU
limiter and the `header.TargetBytes` DistSender limit set on each
ExportRequest. Even if a request was preempted on exhausting its
allotted CPU tokens, it would only return from kvserver by virtue
of its `header.TargetBytes` being set to a non-zero value. Out of the
four users of ExportRequest, only backup set this field to a sentinel
value of 1 to limit the number of SSTs we send back in an ExportResponse.
The remaining callers of ExportRequest would not return from the kvserver.
Instead they would evaluate the request from the resume key immediately,
not giving the scheduler a chance to take the goroutine off CPU.

This change breaks this coupling by introducing a `resumeInfo` object that
indicates whether the resumption was because we were over our CPU limit. If
it was, we return an ExportResponse with our progress so far. This change
shifts the burden of handling pagination to the client. This seems better than
having the server sleep or wait around until its CPU tokens are replenished
as the client would be left wondering why a request is taking so long.

To that effect this change adds pagination support to the other callers of
ExportRequest. Note, we do not set `SplitMidKey` at these other callsites
yet. Thus, all pagination will happen at key boundaries in the ExportRequest.
A follow-up will add support for `SplitMidKey` to these callers.

Informs: #96684

Release note: None
  • Loading branch information
adityamaru committed Feb 9, 2023
1 parent f8eb2ef commit 2033f54
Show file tree
Hide file tree
Showing 21 changed files with 782 additions and 291 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ ALL_TESTS = [
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient:kvclient_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
"//pkg/kv/kvserver/abortspan:abortspan_test",
Expand Down Expand Up @@ -1166,6 +1167,7 @@ GO_TARGETS = [
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient/rangestats:rangestats",
"//pkg/kv/kvclient:kvclient",
"//pkg/kv/kvclient:kvclient_test",
"//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil",
"//pkg/kv/kvnemesis:kvnemesis",
"//pkg/kv/kvnemesis:kvnemesis_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
Expand Down Expand Up @@ -10819,3 +10820,49 @@ func TestBackupInLocality(t *testing.T) {
db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter)
}
}

// TestExportResponseDataSizeZeroCPUPagination verifies that an ExportRequest
// that is preempted by the elastic CPU limiter and has DataSize = 0, is
// returned to the client to handle pagination.
func TestExportResponseDataSizeZeroCPUPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
first := true
var numRequests int
externalDir, dirCleanup := testutils.TempDir(t)
defer dirCleanup()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
ExternalIODir: externalDir,
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
if _, ok := ru.GetInner().(*roachpb.ExportRequest); ok {
numRequests++
h := admission.ElasticCPUWorkHandleFromContext(ctx)
if h == nil {
t.Fatalf("expected context to have CPU work handle")
}
h.TestingOverrideOverLimit(func() (bool, time.Duration) {
if first {
first = false
return true, 0
}
return false, 0
})
}
}
return nil
},
}},
})
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2)`)
sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
sqlDB.Exec(t, `BACKUP TABLE foo INTO 'nodelocal://1/foo'`)
require.Equal(t, 2, numRequests)
}
76 changes: 44 additions & 32 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -191,46 +192,57 @@ func getAllDescChanges(
startKey := codec.TablePrefix(keys.DescriptorTableID)
endKey := startKey.PrefixEnd()

allRevs, err := kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime)
if err != nil {
return nil, err
}
g := ctxgroup.WithContext(ctx)
allRevs := make(chan []kvclient.VersionedValues)
g.GoCtx(func(ctx context.Context) error {
defer close(allRevs)
return kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime, allRevs)
})

var res []backuppb.BackupManifest_DescriptorRevision

for _, revs := range allRevs {
id, err := codec.DecodeDescMetadataID(revs.Key)
if err != nil {
return nil, err
}
for _, rev := range revs.Values {
r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: rev.Timestamp}
if len(rev.RawBytes) != 0 {
// We update the modification time for the descriptors here with the
// timestamp of the KV row so that we can identify the appropriate
// descriptors to use during restore.
// Note that the modification time of descriptors on disk is usually 0.
// See the comment on descpb.FromSerializedValue for more details.
b, err := descbuilder.FromSerializedValue(&rev)
g.GoCtx(func(ctx context.Context) error {
for revs := range allRevs {
for _, rev := range revs {
id, err := codec.DecodeDescMetadataID(rev.Key)
if err != nil {
return nil, err
}
if b == nil {
continue
return err
}
desc := b.BuildCreatedMutable()
r.Desc = desc.DescriptorProto()
// Collect the prior IDs of table descriptors, as the ID may have been
// changed during truncate prior to 20.2.
switch t := desc.(type) {
case *tabledesc.Mutable:
if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
for _, values := range rev.Values {
r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: values.Timestamp}
if len(values.RawBytes) != 0 {
// We update the modification time for the descriptors here with the
// timestamp of the KV row so that we can identify the appropriate
// descriptors to use during restore.
// Note that the modification time of descriptors on disk is usually 0.
// See the comment on descpb.FromSerializedValue for more details.
b, err := descbuilder.FromSerializedValue(&values)
if err != nil {
return err
}
if b == nil {
continue
}
desc := b.BuildCreatedMutable()
r.Desc = desc.DescriptorProto()
// Collect the prior IDs of table descriptors, as the ID may have been
// changed during truncate prior to 20.2.
switch t := desc.(type) {
case *tabledesc.Mutable:
if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
}
}
}
res = append(res, r)
}
}
res = append(res, r)
}

return nil
})

if err := g.Wait(); err != nil {
return nil, err
}
return res, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//pkg/ccl/changefeedccl/schemafeed/schematestutils",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand All @@ -69,15 +70,18 @@ go_test(
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
Expand Down
Loading

0 comments on commit 2033f54

Please sign in to comment.