Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: enables elastic CPU limiter for all users of ExportRequest #96691

Merged
merged 1 commit into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ ALL_TESTS = [
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient:kvclient_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvpb:kvpb_disallowed_imports_test",
"//pkg/kv/kvpb:kvpb_test",
Expand Down Expand Up @@ -1179,6 +1180,7 @@ GO_TARGETS = [
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient/rangestats:rangestats",
"//pkg/kv/kvclient:kvclient",
"//pkg/kv/kvclient:kvclient_test",
"//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil",
"//pkg/kv/kvnemesis:kvnemesis",
"//pkg/kv/kvnemesis:kvnemesis_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
Expand Down Expand Up @@ -10829,3 +10830,49 @@ func TestBackupInLocality(t *testing.T) {
db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter)
}
}

// TestExportResponseDataSizeZeroCPUPagination verifies that an ExportRequest
// that is preempted by the elastic CPU limiter and has DataSize = 0, is
// returned to the client to handle pagination.
func TestExportResponseDataSizeZeroCPUPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
first := true
var numRequests int
externalDir, dirCleanup := testutils.TempDir(t)
defer dirCleanup()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
ExternalIODir: externalDir,
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range request.Requests {
if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok {
numRequests++
h := admission.ElasticCPUWorkHandleFromContext(ctx)
if h == nil {
t.Fatalf("expected context to have CPU work handle")
}
h.TestingOverrideOverLimit(func() (bool, time.Duration) {
if first {
first = false
return true, 0
}
return false, 0
})
}
}
return nil
},
}},
})
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2)`)
sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
sqlDB.Exec(t, `BACKUP TABLE foo INTO 'nodelocal://1/foo'`)
require.Equal(t, 2, numRequests)
}
76 changes: 44 additions & 32 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -191,46 +192,57 @@ func getAllDescChanges(
startKey := codec.TablePrefix(keys.DescriptorTableID)
endKey := startKey.PrefixEnd()

allRevs, err := kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime)
if err != nil {
return nil, err
}
g := ctxgroup.WithContext(ctx)
allRevs := make(chan []kvclient.VersionedValues)
g.GoCtx(func(ctx context.Context) error {
defer close(allRevs)
return kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime, allRevs)
})

var res []backuppb.BackupManifest_DescriptorRevision

for _, revs := range allRevs {
id, err := codec.DecodeDescMetadataID(revs.Key)
if err != nil {
return nil, err
}
for _, rev := range revs.Values {
r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: rev.Timestamp}
if len(rev.RawBytes) != 0 {
// We update the modification time for the descriptors here with the
// timestamp of the KV row so that we can identify the appropriate
// descriptors to use during restore.
// Note that the modification time of descriptors on disk is usually 0.
// See the comment on descpb.FromSerializedValue for more details.
b, err := descbuilder.FromSerializedValue(&rev)
g.GoCtx(func(ctx context.Context) error {
for revs := range allRevs {
for _, rev := range revs {
id, err := codec.DecodeDescMetadataID(rev.Key)
if err != nil {
return nil, err
}
if b == nil {
continue
return err
}
desc := b.BuildCreatedMutable()
r.Desc = desc.DescriptorProto()
// Collect the prior IDs of table descriptors, as the ID may have been
// changed during truncate prior to 20.2.
switch t := desc.(type) {
case *tabledesc.Mutable:
if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
for _, values := range rev.Values {
r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: values.Timestamp}
if len(values.RawBytes) != 0 {
// We update the modification time for the descriptors here with the
// timestamp of the KV row so that we can identify the appropriate
// descriptors to use during restore.
// Note that the modification time of descriptors on disk is usually 0.
// See the comment on descpb.FromSerializedValue for more details.
b, err := descbuilder.FromSerializedValue(&values)
if err != nil {
return err
}
if b == nil {
continue
}
desc := b.BuildCreatedMutable()
r.Desc = desc.DescriptorProto()
// Collect the prior IDs of table descriptors, as the ID may have been
// changed during truncate prior to 20.2.
switch t := desc.(type) {
case *tabledesc.Mutable:
if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
}
}
}
res = append(res, r)
}
}
res = append(res, r)
}

return nil
})

if err := g.Wait(); err != nil {
return nil, err
}
return res, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,27 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
Expand Down
Loading