Skip to content

Commit

Permalink
Merge pull request #397 from scylladb/396-values-are-not-cerculating-…
Browse files Browse the repository at this point in the history
…properly-from-new-to-old

fix(jobs): fix pk values circulating
  • Loading branch information
dkropachev authored Jul 16, 2023
2 parents 7e88328 + c402a25 commit 37b5a51
Show file tree
Hide file tree
Showing 22 changed files with 1,745 additions and 462 deletions.
8 changes: 8 additions & 0 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type GeneratorInterface interface {
Get() *typedef.ValueWithToken
GetOld() *typedef.ValueWithToken
GiveOld(_ *typedef.ValueWithToken)
GiveOlds(_ []*typedef.ValueWithToken)
ReleaseToken(_ uint64)
}

Expand Down Expand Up @@ -109,6 +110,13 @@ func (g *Generator) GiveOld(v *typedef.ValueWithToken) {
g.partitions.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v)
}

// GiveOlds returns the supplied value for later reuse unless
func (g *Generator) GiveOlds(v []*typedef.ValueWithToken) {
for _, token := range v {
g.partitions.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token)
}
}

// ReleaseToken removes the corresponding token from the in-flight tracking.
func (g *Generator) ReleaseToken(token uint64) {
g.partitions.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
Expand Down
35 changes: 27 additions & 8 deletions pkg/jobs/gen_check_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func genSinglePartitionQuery(
Types: typs,
QueryType: typedef.SelectStatementType,
},
ValuesWithToken: valuesWithToken,
Values: values,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
}
}

Expand Down Expand Up @@ -175,8 +175,8 @@ func genSinglePartitionQueryMv(
Types: typs,
QueryType: typedef.SelectStatementType,
},
ValuesWithToken: valuesWithToken,
Values: values,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
}
}

Expand All @@ -192,12 +192,15 @@ func genMultiplePartitionQuery(
values := make([]interface{}, numQueryPKs*t.PartitionKeys.Len())

builder := qb.Select(s.Keyspace.Name + "." + t.Name)
tokens := make([]*typedef.ValueWithToken, 0, numQueryPKs)

for j := 0; j < numQueryPKs; j++ {
vs := g.GetOld()
if vs == nil {
g.GiveOlds(tokens)
return nil
}
tokens = append(tokens, vs)
for i := range vs.Value {
values[j+i*numQueryPKs] = vs.Value[i]
typs[j+i*numQueryPKs] = t.PartitionKeys[i].Type
Expand All @@ -212,7 +215,8 @@ func genMultiplePartitionQuery(
Types: typs,
QueryType: typedef.SelectStatementType,
},
Values: values,
Values: values,
ValuesWithToken: tokens,
}
}

Expand All @@ -232,12 +236,15 @@ func genMultiplePartitionQueryMv(
values := make([]interface{}, numQueryPKs*mv.PartitionKeys.Len())

builder := qb.Select(s.Keyspace.Name + "." + t.Name)
tokens := make([]*typedef.ValueWithToken, 0, numQueryPKs)

for j := 0; j < numQueryPKs; j++ {
vs := g.GetOld()
if vs == nil {
g.GiveOlds(tokens)
return nil
}
tokens = append(tokens, vs)
vals := make([]interface{}, mv.PartitionKeys.Len())
if mv.HaveNonPrimaryKey() {
vals[0] = mv.NonPrimaryKey.Type.GenValue(r, p)
Expand All @@ -259,7 +266,8 @@ func genMultiplePartitionQueryMv(
Types: typs,
QueryType: typedef.SelectStatementType,
},
Values: values,
Values: values,
ValuesWithToken: tokens,
}
}

Expand All @@ -280,6 +288,7 @@ func genClusteringRangeQuery(
var allTypes []typedef.Type
values := vs.Value.Copy()
builder := qb.Select(s.Keyspace.Name + "." + t.Name)

for _, pk := range t.PartitionKeys {
builder = builder.Where(qb.Eq(pk.Name))
allTypes = append(allTypes, pk.Type)
Expand All @@ -304,7 +313,8 @@ func genClusteringRangeQuery(
QueryType: typedef.SelectRangeStatementType,
Types: allTypes,
},
Values: values,
Values: values,
ValuesWithToken: []*typedef.ValueWithToken{vs},
}
}

Expand Down Expand Up @@ -356,7 +366,8 @@ func genClusteringRangeQueryMv(
QueryType: typedef.SelectRangeStatementType,
Types: allTypes,
},
Values: values,
Values: values,
ValuesWithToken: []*typedef.ValueWithToken{vs},
}
}

Expand All @@ -377,6 +388,7 @@ func genMultiplePartitionClusteringRangeQuery(
values := make(typedef.Values, pkValues*numQueryPKs, valuesCount)
typs := make(typedef.Types, pkValues*numQueryPKs, valuesCount)
builder := qb.Select(s.Keyspace.Name + "." + t.Name)
tokens := make([]*typedef.ValueWithToken, 0, numQueryPKs)

for _, pk := range t.PartitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, numQueryPKs))
Expand All @@ -385,8 +397,10 @@ func genMultiplePartitionClusteringRangeQuery(
for j := 0; j < numQueryPKs; j++ {
vs := g.GetOld()
if vs == nil {
g.GiveOlds(tokens)
return nil
}
tokens = append(tokens, vs)
for id := range vs.Value {
idx := id*numQueryPKs + j
typs[idx] = t.PartitionKeys[id].Type
Expand All @@ -413,7 +427,8 @@ func genMultiplePartitionClusteringRangeQuery(
Types: typs,
QueryType: typedef.SelectRangeStatementType,
},
Values: values,
Values: values,
ValuesWithToken: tokens,
}
}

Expand Down Expand Up @@ -446,6 +461,7 @@ func genMultiplePartitionClusteringRangeQueryMv(
values := make(typedef.Values, pkValues*numQueryPKs, valuesCount)
typs := make(typedef.Types, pkValues*numQueryPKs, valuesCount)
builder := qb.Select(s.Keyspace.Name + "." + mv.Name)
tokens := make([]*typedef.ValueWithToken, 0, numQueryPKs)

for _, pk := range mv.PartitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, numQueryPKs))
Expand All @@ -462,8 +478,10 @@ func genMultiplePartitionClusteringRangeQueryMv(
for j := 0; j < numQueryPKs; j++ {
vs := g.GetOld()
if vs == nil {
g.GiveOlds(tokens)
return nil
}
tokens = append(tokens, vs)
for id := range vs.Value {
idx := (baseID+id)*numQueryPKs + j
typs[idx] = mv.PartitionKeys[baseID+id].Type
Expand All @@ -490,7 +508,8 @@ func genMultiplePartitionClusteringRangeQueryMv(
Types: typs,
QueryType: typedef.SelectFromMaterializedViewStatementType,
},
Values: values,
Values: values,
ValuesWithToken: tokens,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/gen_mutate_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func genUpdateStmt(_ *typedef.Schema, t *typedef.Table, valuesWithToken *typedef
}
return &typedef.Stmt{
StmtCache: stmtCache,
ValuesWithToken: valuesWithToken,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
Values: values,
}, nil
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func genInsertStmt(
stmtCache := t.GetQueryCache(cacheType)
return &typedef.Stmt{
StmtCache: stmtCache,
ValuesWithToken: valuesWithToken,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
Values: values,
}, nil
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func genInsertJSONStmt(
Types: []typedef.Type{typedef.TYPE_TEXT},
QueryType: typedef.InsertJSONStatementType,
},
ValuesWithToken: valuesWithToken,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
Values: []interface{}{string(jsonString)},
}, nil
}
Expand All @@ -175,7 +175,7 @@ func genDeleteRows(_ *typedef.Schema, t *typedef.Table, valuesWithToken *typedef
}
return &typedef.Stmt{
StmtCache: stmtCache,
ValuesWithToken: valuesWithToken,
ValuesWithToken: []*typedef.ValueWithToken{valuesWithToken},
Values: values,
}, nil
}
Expand Down
79 changes: 66 additions & 13 deletions pkg/jobs/gen_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,64 @@ import (
"github.com/scylladb/gemini/pkg/utils"
)

type result struct {
type resultToken struct {
Token string
TokenValues string
}

func (r resultToken) Equal(received resultToken) bool {
return r.Token == received.Token && r.TokenValues == received.TokenValues
}

type resultTokens []resultToken

func (r resultTokens) Equal(received resultTokens) bool {
if len(r) != len(received) {
return false
}
for id, expectedToken := range r {
if !expectedToken.Equal(received[id]) {
return false
}
}
return true
}

func (r resultTokens) Diff(received resultTokens) string {
var out []string
maxIdx := len(r)
if maxIdx < len(received) {
maxIdx = len(received)
}
var expected, found *resultToken
for idx := 0; idx < maxIdx; idx++ {
if idx < len(r) {
expected = &r[idx]
} else {
expected = &resultToken{}
}

if idx < len(received) {
found = &received[idx]
} else {
found = &resultToken{}
}

out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
expected.TokenValues, found.TokenValues, " error: value stmt.ValuesWithToken.Token expected and received are different:"))
out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
expected.TokenValues, found.TokenValues, " error: value stmt.ValuesWithToken.Value expected and received are different:"))
}
return strings.Join(out, "\n")
}

type result struct {
Query string
Names string
Values string
Types string
QueryType string
TokenValues resultTokens
}

func (r *result) Equal(t *result) bool {
Expand All @@ -44,15 +94,17 @@ func (r *result) Equal(t *result) bool {
if t != nil {
provided = *t
}
return expected == provided
return expected.Query == provided.Query &&
expected.Names == provided.Names &&
expected.Values == provided.Values &&
expected.Types == provided.Types &&
expected.QueryType == provided.QueryType &&
expected.TokenValues.Equal(provided.TokenValues)
}

func (r *result) Diff(received *result) string {
var out []string
out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
r.Token, received.Token, " error: value stmt.ValuesWithToken.Token expected and received are different:"))
out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
r.TokenValues, received.TokenValues, " error: value stmt.ValuesWithToken.Value expected and received are different:"))
out = testutils.AppendIfNotEmpty(out, r.TokenValues.Diff(received.TokenValues))
out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
r.Query, received.Query, " error: value stmt.Query.ToCql().stmt expected and received are different:"))
out = testutils.AppendIfNotEmpty(out, testutils.GetErrorMsgIfDifferent(
Expand Down Expand Up @@ -118,15 +170,16 @@ func convertStmtToResults(stmt *typedef.Stmt) *result {
types = fmt.Sprintf("%s %s", types, stmt.Types[idx].Name())
}
query, names := stmt.Query.ToCql()
token := ""
tokenValues := ""
if stmt.ValuesWithToken != nil {
token = fmt.Sprintf("%v", stmt.ValuesWithToken.Token)
tokenValues = strings.TrimSpace(fmt.Sprintf("%v", stmt.ValuesWithToken.Value))
var tokens []resultToken
for _, valueToken := range stmt.ValuesWithToken {
tokens = append(tokens, resultToken{
Token: fmt.Sprintf("%v", valueToken.Token),
TokenValues: strings.TrimSpace(fmt.Sprintf("%v", valueToken.Value)),
})
}

return &result{
Token: token,
TokenValues: tokenValues,
TokenValues: tokens,
Query: strings.TrimSpace(query),
Names: strings.TrimSpace(fmt.Sprintf("%s", names)),
Values: strings.TrimSpace(fmt.Sprintf("%v", stmt.Values)),
Expand Down
22 changes: 8 additions & 14 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,12 @@ func validationJob(
logger.Info("Validation. No statement generated from GenCheckStmt.")
continue
}

err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger)
err := validation(ctx, schemaConfig, table, s, stmt, logger)
if stmt.ValuesWithToken != nil {
for _, token := range stmt.ValuesWithToken {
g.ReleaseToken(token.Token)
}
}
switch {
case err == nil:
globalStatus.ReadOps.Add(1)
Expand Down Expand Up @@ -374,11 +378,7 @@ func mutation(
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values
if mutateStmt.ValuesWithToken != nil {
defer func() {
g.GiveOld(mutateStmt.ValuesWithToken)
}()
}

if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
Expand All @@ -394,6 +394,7 @@ func mutation(
})
} else {
globalStatus.WriteOps.Add(1)
g.GiveOlds(mutateStmt.ValuesWithToken)
}
return nil
}
Expand All @@ -404,15 +405,8 @@ func validation(
table *typedef.Table,
s store.Store,
stmt *typedef.Stmt,
g *generators.Generator,
_ *status.GlobalStatus,
logger *zap.Logger,
) error {
if stmt.ValuesWithToken != nil {
defer func() {
g.ReleaseToken(stmt.ValuesWithToken.Token)
}()
}
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", stmt.PrettyCQL()))
}
Expand Down
Loading

0 comments on commit 37b5a51

Please sign in to comment.