Skip to content

Commit

Permalink
add(store): add syncer for identical queries on test and oracle store
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-li committed Jun 26, 2023
1 parent ee08726 commit 9cfe361
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 29 deletions.
4 changes: 2 additions & 2 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func run(_ *cobra.Command, _ []string) error {
if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Create(context.Background(), createBuilder{stmt: stmt}, createBuilder{stmt: stmt}); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
Expand All @@ -248,7 +248,7 @@ func run(_ *cobra.Command, _ []string) error {

for _, stmt := range generators.GetCreateSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Create(context.Background(), createBuilder{stmt: stmt}, createBuilder{stmt: stmt}); err != nil {
return errors.Wrap(err, "unable to create schema")
}
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func mutationJob(
defer func() {
logger.Info("ending mutation loop")
}()
syncer := s.InitMutateSyncer(ctx)
for {
if stopFlag.IsHardOrSoft() {
return nil
Expand All @@ -184,9 +185,9 @@ func mutationJob(
}
ind := r.Intn(1000000)
if ind%100000 == 0 {
_ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose)
_ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose, &syncer)
} else {
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger)
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger, &syncer)
}
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft()
Expand Down Expand Up @@ -218,7 +219,7 @@ func validationJob(
defer func() {
logger.Info("ending validation loop")
}()

syncer := s.InitCheckSyncer(ctx)
for {
if stopFlag.IsHardOrSoft() {
return nil
Expand All @@ -235,7 +236,7 @@ func validationJob(
continue
}

err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger)
err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger, &syncer)
switch {
case err == nil:
globalStatus.ReadOps.Add(1)
Expand Down Expand Up @@ -280,6 +281,7 @@ func warmupJob(
defer func() {
logger.Info("ending warmup loop")
}()
syncer := s.InitMutateSyncer(ctx)
for {
if stopFlag.IsHardOrSoft() {
return nil
Expand All @@ -291,7 +293,7 @@ func warmupJob(
default:
}
// Do we care about errors during warmup?
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger)
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger, &syncer)
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft()
return nil
Expand All @@ -310,6 +312,7 @@ func ddl(
globalStatus *status.GlobalStatus,
logger *zap.Logger,
verbose bool,
syncer *store.MutateSyncer,
) error {
if sc.CQLFeature != typedef.CQL_FEATURE_ALL {
logger.Debug("ddl statements disabled")
Expand Down Expand Up @@ -337,7 +340,7 @@ func ddl(
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
if err = s.Mutate(ctx, syncer, ddlStmt.Query, nil); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -371,6 +374,7 @@ func mutation(
globalStatus *status.GlobalStatus,
deletes bool,
logger *zap.Logger,
syncer *store.MutateSyncer,
) error {
mutateStmt, err := GenMutateStmt(schema, table, g, r, p, deletes)
if err != nil {
Expand All @@ -394,7 +398,7 @@ func mutation(
if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
if err = s.Mutate(ctx, syncer, mutateQuery, mutateValues); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand All @@ -419,6 +423,7 @@ func validation(
g *generators.Generator,
_ *status.GlobalStatus,
logger *zap.Logger,
syncer *store.CheckSyncer,
) error {
if stmt.ValuesWithToken != nil {
defer func() {
Expand All @@ -443,7 +448,7 @@ func validation(
attempt := 1
for {
lastErr = err
err = s.Check(ctx, table, stmt.Query, stmt.Values...)
err = s.Check(ctx, syncer, table, stmt.Query, stmt.Values)

if err == nil {
if attempt > 1 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values typedef.Values) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, builder, time.Now(), values...)
err = cs.doMutate(ctx, builder, time.Now(), values)
if err == nil {
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
return nil
Expand All @@ -67,7 +67,7 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in
return err
}

func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values typedef.Values) error {
queryBody, _ := builder.ToCql()

query := cs.session.Query(queryBody, values...).WithContext(ctx)
Expand All @@ -90,7 +90,7 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
return nil
}

func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values typedef.Values) (result []map[string]interface{}, err error) {
query, _ := builder.ToCql()
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
Expand Down
42 changes: 27 additions & 15 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import (
)

type loader interface {
load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error)
load(context.Context, qb.Builder, typedef.Values) ([]map[string]interface{}, error)
}

type storer interface {
mutate(context.Context, qb.Builder, ...interface{}) error
mutate(context.Context, qb.Builder, typedef.Values) error
}

type storeLoader interface {
Expand All @@ -55,8 +55,10 @@ type storeLoader interface {

type Store interface {
Create(context.Context, qb.Builder, qb.Builder) error
Mutate(context.Context, qb.Builder, ...interface{}) error
Check(context.Context, *typedef.Table, qb.Builder, ...interface{}) error
Mutate(context.Context, *MutateSyncer, qb.Builder, typedef.Values) error
Check(context.Context, *CheckSyncer, *typedef.Table, qb.Builder, typedef.Values) error
InitMutateSyncer(context.Context) MutateSyncer
InitCheckSyncer(context.Context) CheckSyncer
Close() error
}

Expand Down Expand Up @@ -123,11 +125,11 @@ type noOpStore struct {
system string
}

func (n *noOpStore) mutate(context.Context, qb.Builder, ...interface{}) error {
func (n *noOpStore) mutate(context.Context, qb.Builder, typedef.Values) error {
return nil
}

func (n *noOpStore) load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) {
func (n *noOpStore) load(context.Context, qb.Builder, typedef.Values) ([]map[string]interface{}, error) {
return nil, nil
}

Expand All @@ -151,37 +153,47 @@ type delegatingStore struct {
}

func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder qb.Builder) error {
if err := mutate(ctx, ds.oracleStore, oracleBuilder, []interface{}{}); err != nil {
if err := mutate(ctx, ds.oracleStore, oracleBuilder, typedef.Values{}); err != nil {
return errors.Wrap(err, "oracle failed store creation")
}
if err := mutate(ctx, ds.testStore, testBuilder, []interface{}{}); err != nil {
if err := mutate(ctx, ds.testStore, testBuilder, typedef.Values{}); err != nil {
return errors.Wrap(err, "test failed store creation")
}
return nil
}

func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error {
if err := mutate(ctx, ds.oracleStore, builder, values...); err != nil {
func (ds delegatingStore) Mutate(ctx context.Context, syncer *MutateSyncer, builder qb.Builder, values typedef.Values) error {
syncer.oracleSend(ctx, builder, values)

if err := ds.testStore.mutate(ctx, builder, values); err != nil {
// Oracle failed, transition cannot take place
ds.logger.Info("test store failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(err))
return nil
}
err := syncer.oracleGet()
if err != nil {
// Oracle failed, transition cannot take place
ds.logger.Info("oracle failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(err))
return nil
}
return mutate(ctx, ds.testStore, builder, values...)
return nil
}

func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values ...interface{}) error {
if err := s.mutate(ctx, builder, values...); err != nil {
func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values typedef.Values) error {
if err := s.mutate(ctx, builder, values); err != nil {
return errors.Wrapf(err, "unable to apply mutations to the %s store", s.name())
}
return nil
}

func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, builder qb.Builder, values ...interface{}) error {
func (ds delegatingStore) Check(ctx context.Context, syncer *CheckSyncer, table *typedef.Table, builder qb.Builder, values typedef.Values) error {
syncer.oracleSend(ctx, builder, values)

testRows, err := ds.testStore.load(ctx, builder, values)
if err != nil {
return errors.Wrapf(err, "unable to load check data from the test store")
}
oracleRows, err := ds.oracleStore.load(ctx, builder, values)
oracleRows, err := syncer.oracleGet()
if err != nil {
return errors.Wrapf(err, "unable to load check data from the oracle store")
}
Expand Down
123 changes: 123 additions & 0 deletions pkg/store/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2019 ScyllaDB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package store

import (
"context"

"github.com/scylladb/gocqlx/v2/qb"

"github.com/scylladb/gemini/pkg/typedef"
)

// CheckSyncer helps to synchronize start time of identical check queries on oracle store and on test store
type CheckSyncer struct {
msgChan chan checkMsg
}

type checkMsg struct {
builder qb.Builder
ctx context.Context
values *typedef.Values
err error
rows *[]map[string]interface{}
}

// InitCheckSyncer creates CheckSyncer. CheckSyncer should be created for each 'validationJob' goroutine.
func (ds delegatingStore) InitCheckSyncer(ctx context.Context) CheckSyncer {
syncer := CheckSyncer{
msgChan: make(chan checkMsg),
}
go func() {
var msg checkMsg
exit:
for {
select {
case msg = <-syncer.msgChan:
msg.rows = &[]map[string]interface{}{}
*msg.rows, msg.err = ds.oracleStore.load(msg.ctx, msg.builder, *msg.values)
msg.values = nil
syncer.msgChan <- msg
case <-ctx.Done():
continue exit
}
}
}()
return syncer
}

// oracleSend send parallel check request on oracle store
func (s *CheckSyncer) oracleSend(ctx context.Context, builder qb.Builder, values typedef.Values) {
s.msgChan <- checkMsg{
ctx: ctx,
builder: builder,
values: &values,
}
}

// oracleGet get response from ran parallel check request on oracle store by oracleSend
func (s *CheckSyncer) oracleGet() ([]map[string]interface{}, error) {
get := <-s.msgChan
return *get.rows, get.err
}

// MutateSyncer helps to synchronize start time of identical mutate queries on oracle store and on test store
type MutateSyncer struct {
msgChan chan mutateMsg
}

type mutateMsg struct {
builder qb.Builder
ctx context.Context
values *typedef.Values
err error
}

// InitMutateSyncer creates MutateSyncer. MutateSyncer should be created for each 'mutationJob' and 'warmupJob' goroutine.
func (ds delegatingStore) InitMutateSyncer(ctx context.Context) MutateSyncer {
syncer := MutateSyncer{
msgChan: make(chan mutateMsg),
}
go func() {
var msg mutateMsg
exit:
for {
select {
case msg = <-syncer.msgChan:
msg.err = ds.oracleStore.mutate(msg.ctx, msg.builder, *msg.values)
msg.values = nil
syncer.msgChan <- msg
case <-ctx.Done():
continue exit
}
}
}()
return syncer
}

// oracleSend send parallel mutate request on oracle store
func (s *MutateSyncer) oracleSend(ctx context.Context, builder qb.Builder, values typedef.Values) {
s.msgChan <- mutateMsg{
ctx: ctx,
builder: builder,
values: &values,
}
}

// oracleGet get response from ran parallel mutate request on oracle store by oracleSend
func (s *MutateSyncer) oracleGet() error {
msg := <-s.msgChan
return msg.err
}

0 comments on commit 9cfe361

Please sign in to comment.