Skip to content

Commit

Permalink
Merge pull request #88802 from ajwerner/backport22.2-88670
Browse files Browse the repository at this point in the history
  • Loading branch information
ajwerner authored Sep 27, 2022
2 parents 745eae9 + 70b174d commit 3bb017b
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 10 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/schemachanger/rel/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ go_test(
"//pkg/sql/schemachanger/rel/internal/cyclegraphtest",
"//pkg/sql/schemachanger/rel/internal/entitynodetest",
"//pkg/sql/schemachanger/rel/reltest",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/rel/query_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (p *queryBuilder) processValueExpr(rawValue expr) slotIdx {
if err != nil {
panic(err)
}
return p.fillSlot(slot{not: tv}, false /* isEntity */)
return p.fillSlot(slot{not: &tv}, false /* isEntity */)
case containsExpr:
return p.processValueExpr(v.v)
default:
Expand Down
25 changes: 23 additions & 2 deletions pkg/sql/schemachanger/rel/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type slot struct {
// not holds a value which this slot must not be equal to. Additionally,
// the value which fills this slot must have the same type as the value
// in the not container.
not typedValue
not *typedValue
}

// typedValue is a value in its comparable form, which is to say, it is a
Expand Down Expand Up @@ -75,6 +75,10 @@ func (tv typedValue) toInterface() interface{} {
return tv.toValue().Interface()
}

// inlineValue populates the inline value for the typedValue. The inline
// value is a single scalar which can be used to efficiently compare
// values, but it only has meaning in the context of the current entitySet.
// It must be cleared when moving to a new entity set.
func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) {
if tv.inlineSet {
return tv.inline, nil
Expand All @@ -87,6 +91,11 @@ func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error)
return tv.inline, nil
}

// resetInline clears the inline value.
func (tv *typedValue) resetInline() {
tv.inlineSet, tv.inline = false, 0
}

func (s *slot) eq(other slot) bool {
// TODO(ajwerner): Deal with types. We may have two slots which both have
// nil values but they differ in terms of types.
Expand All @@ -107,6 +116,18 @@ func (s *slot) empty() bool {
return s.value == nil
}

func (s *slot) reset() {
s.typedValue = typedValue{}
if s.any != nil {
for i := 0; i < len(s.any); i++ {
s.any[i].resetInline()
}
}
if s.not != nil {
s.not.resetInline()
}
}

func maybeSet(
slots []slot, idx slotIdx, tv typedValue, set *util.FastIntSet,
) (foundContradiction bool) {
Expand All @@ -131,7 +152,7 @@ func maybeSet(
}
return false, false
}
if s.not.typ != nil {
if s.not != nil {
if tv.typ != s.not.typ || eqNotNil(s.not.value, tv.value) {
return false, true
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/schemachanger/rel/query_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,32 @@ func newEvalContext(q *Query) *evalContext {
return &evalContext{
q: q,
depth: queryDepth(len(q.entities)),
slots: append(make([]slot, 0, len(q.slots)), q.slots...),
slots: cloneSlots(q.slots),
facts: q.facts,
}
}

// cloneSlots clones the slots of a query for use in an evalContext.
func cloneSlots(slots []slot) []slot {
clone := append(make([]slot, 0, len(slots)), slots...)
for i := range clone {
// If there are any slots which map to a set of allowed values, we need
// to clone those values because during query evaluation, we'll fill in
// inline values in the context of the current entity set. This matters
// in particular for constraints related to entities or strings; their
// inline values depend on the entitySet.
if clone[i].any != nil {
vals := clone[i].any
clone[i].any = append(make([]typedValue, 0, len(vals)), vals...)
}
if clone[i].not != nil {
cloned := *clone[i].not
clone[i].not = &cloned
}
}
return clone
}

type evalResult evalContext

func (ec *evalResult) Var(name Var) interface{} {
Expand Down Expand Up @@ -135,9 +156,7 @@ func (ec *evalContext) visit(e entity) error {
// evaluation and then unset them when we pop out of this stack frame.
var slotsFilled util.FastIntSet
defer func() {
slotsFilled.ForEach(func(i int) {
ec.slots[i].typedValue = typedValue{}
})
slotsFilled.ForEach(func(i int) { ec.slots[i].reset() })
}()

// Fill in the slot corresponding to this entity. It should not be filled
Expand Down Expand Up @@ -408,7 +427,7 @@ func (ec *evalContext) visitSubquery(query int) (done bool, _ error) {
defer sub.query.putEvalContext(sec)
defer func() { // reset the slots populated to run the subquery
sub.inputSlotMappings.ForEach(func(_, subSlot int) {
sec.slots[subSlot].typedValue = typedValue{}
sec.slots[subSlot].reset()
})
}()
if err := ec.bindSubQuerySlots(sub.inputSlotMappings, sec); err != nil {
Expand Down
106 changes: 104 additions & 2 deletions pkg/sql/schemachanger/rel/rel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rel_test

import (
"fmt"
"math/rand"
"reflect"
"testing"

Expand All @@ -20,7 +21,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/cyclegraphtest"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/entitynodetest"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/reltest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestRel(t *testing.T) {
Expand Down Expand Up @@ -337,12 +341,12 @@ func TestTooManyAttributesInValues(t *testing.T) {
}

func TestRuleValidation(t *testing.T) {
type tooManyAttrs struct {
type entity struct {
F1, F2 *uint32
}
a1, a2 := stringAttr("a1"), stringAttr("a2")
sc := rel.MustSchema("rules",
rel.EntityMapping(reflect.TypeOf((*tooManyAttrs)(nil)),
rel.EntityMapping(reflect.TypeOf((*entity)(nil)),
rel.EntityAttr(a1, "F1"),
rel.EntityAttr(a2, "F2"),
),
Expand Down Expand Up @@ -439,3 +443,101 @@ func TestEmbeddedFieldsWork(t *testing.T) {
"embedded pointer outer")
})
}

// TestConcurrentQueryInDifferentDatabases stresses some logic of the
// evalContext pooling to ensure that the state is properly reset between
// queries. An important property of this test is that it uses an any
// clause over entity pointers. When these pointers are inlined in the
// context of different databases, they will have different values. By
// randomizing the insertion order, we ensure that the inline values for
// the different entities differ.
//
// This test exercises the code which resets the inline values of slots in
// in the evalContext corresponding to the "not" and "any" constraints on that
// slot. These fields are pointers and need to be reset explicitly. The bug
// which motivated this test was that the slots were only being reset by value.
func TestConcurrentQueryInDifferentDatabases(t *testing.T) {
defer leaktest.AfterTest(t)()

type entity struct {
Str string
Other *entity
}
var str, other stringAttr = "str", "other"
schema := rel.MustSchema("test",
rel.EntityMapping(
reflect.TypeOf((*entity)(nil)),
rel.EntityAttr(str, "Str"),
rel.EntityAttr(other, "Other"),
),
)
newDB := func() *rel.Database {
db, err := rel.NewDatabase(schema, rel.Index{Attrs: []rel.Attr{other}})
require.NoError(t, err)
return db
}
const (
numDBs = 3
numEntities = 5
numContainsVals = 3
)
makeEntities := func() (ret []*entity) {
for i := 0; i < numEntities; i++ {
ret = append(ret, &entity{Str: fmt.Sprintf("s%d", i)})
}
for i := 0; i < numEntities; i++ {
ret[i].Other = ret[(i+1)%numEntities]
}
return ret
}
makeDBs := func() (ret []*rel.Database) {
for i := 0; i < numDBs; i++ {
ret = append(ret, newDB())
}
return ret
}
addEntitiesToDB := func(db *rel.Database, entities []*entity) {
for _, i := range rand.Perm(len(entities)) {
require.NoError(t, db.Insert(entities[i]))
}
}
addEntitiesToDBs := func(dbs []*rel.Database, entities []*entity) {
for _, db := range dbs {
addEntitiesToDB(db, entities)
}
}

dbs, entities := makeDBs(), makeEntities()
addEntitiesToDBs(dbs, entities)
assert.Less(t, numContainsVals, numEntities)
makeContainsVals := func(entities []*entity) (ret []interface{}) {
for i := 0; i < numContainsVals; i++ {
ret = append(ret, entities[i+1])
}
return ret
}
type v = rel.Var
q, err := rel.NewQuery(schema,
v("e").AttrIn(other, makeContainsVals(entities)...),
v("e").AttrNeq(rel.Self, entities[0]), // exclude the first entity
)
require.NoError(t, err)
var N = 8
exp := entities[1:numContainsVals] // the first entity is excluded
run := func(i int) func() error {
return func() error {
var got []*entity
assert.NoError(t, q.Iterate(dbs[i%len(dbs)], func(r rel.Result) error {
got = append(got, r.Var("e").(*entity))
return nil
}))
assert.EqualValues(t, exp, got)
return nil
}
}
var g errgroup.Group
for i := 0; i < N; i++ {
g.Go(run(i))
}
require.NoError(t, g.Wait())
}
1 change: 1 addition & 0 deletions pkg/sql/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"rename_column_test.go",
"repair_test.go",
"rsg_test.go",
"schema_changes_in_parallel_test.go",
"split_test.go",
"system_table_test.go",
"table_split_test.go",
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/tests/schema_changes_in_parallel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestSchemaChangesInParallel exists to try to shake out races in the
// declarative schema changer infrastructure. At its time of writing, it
// effectively reproduced a race in the rules engine's object pooling.
func TestSchemaChangesInParallel(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{
SkipWaitingForMVCCGC: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(ctx)

const N = 4
run := func(i int) func() (retErr error) {
return func() (retErr error) {
conn, err := sqlDB.Conn(ctx)
if err != nil {
return err
}
defer func() {
retErr = errors.CombineErrors(retErr, conn.Close())
}()
for _, stmt := range []string{
fmt.Sprintf("CREATE DATABASE db%d", i),
fmt.Sprintf("USE db%d", i),
"CREATE TABLE t (i INT PRIMARY KEY, k INT)",
"ALTER TABLE t ADD COLUMN j INT DEFAULT 42",
"ALTER TABLE t DROP COLUMN k",
"CREATE SEQUENCE s",
"ALTER TABLE t ADD COLUMN l INT DEFAULT nextval('s')",
fmt.Sprintf("DROP DATABASE db%d", i),
} {
if _, err := conn.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
}
}
var g errgroup.Group
for i := 0; i < N; i++ {
g.Go(run(i))
}
require.NoError(t, g.Wait())
}

0 comments on commit 3bb017b

Please sign in to comment.