From ff28d191fae132764a8bfffbc07c4c61fab2f9b3 Mon Sep 17 00:00:00 2001 From: illia-li Date: Sun, 25 Jun 2023 17:55:46 -0400 Subject: [PATCH] add(store): add syncer for identical queries on test and oracle store --- cmd/gemini/root.go | 4 +- pkg/jobs/jobs.go | 21 ++++--- pkg/store/cqlstore.go | 8 +-- pkg/store/store.go | 42 +++++++++----- pkg/store/syncer.go | 125 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 29 deletions(-) create mode 100644 pkg/store/syncer.go diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index ba4c76cc..24d1f29b 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -235,7 +235,7 @@ func run(_ *cobra.Command, _ []string) error { if dropSchema && mode != jobs.ReadMode { for _, stmt := range generators.GetDropSchema(schema) { logger.Debug(stmt) - if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { + if err = st.Create(context.Background(), createBuilder{stmt: stmt}, createBuilder{stmt: stmt}); err != nil { return errors.Wrap(err, "unable to drop schema") } } @@ -248,7 +248,7 @@ func run(_ *cobra.Command, _ []string) error { for _, stmt := range generators.GetCreateSchema(schema) { logger.Debug(stmt) - if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { + if err = st.Create(context.Background(), createBuilder{stmt: stmt}, createBuilder{stmt: stmt}); err != nil { return errors.Wrap(err, "unable to create schema") } } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index a7b90fac..3bff2979 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -170,6 +170,7 @@ func mutationJob( defer func() { logger.Info("ending mutation loop") }() + syncer := s.InitMutateSyncer(ctx) for { if stopFlag.IsHardOrSoft() { return nil @@ -183,9 +184,9 @@ func mutationJob( } ind := r.Intn(1000000) if ind%100000 == 0 { - _ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose) + _ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose, &syncer) } else { - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger) + _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger, &syncer) } if failFast && globalStatus.HasErrors() { stopFlag.SetSoft(true) @@ -217,7 +218,7 @@ func validationJob( defer func() { logger.Info("ending validation loop") }() - + syncer := s.InitCheckSyncer(ctx) for { if stopFlag.IsHardOrSoft() { return nil @@ -234,7 +235,7 @@ func validationJob( continue } - err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger) + err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger, &syncer) switch { case err == nil: globalStatus.ReadOps.Add(1) @@ -279,13 +280,14 @@ func warmupJob( defer func() { logger.Info("ending warmup loop") }() + syncer := s.InitMutateSyncer(ctx) for { if stopFlag.IsHardOrSoft() { logger.Debug("warmup job terminated") return nil } // Do we care about errors during warmup? - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger) + _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger, &syncer) if failFast && globalStatus.HasErrors() { stopFlag.SetSoft(true) return nil @@ -304,6 +306,7 @@ func ddl( globalStatus *status.GlobalStatus, logger *zap.Logger, verbose bool, + syncer *store.MutateSyncer, ) error { if sc.CQLFeature != typedef.CQL_FEATURE_ALL { logger.Debug("ddl statements disabled") @@ -331,7 +334,7 @@ func ddl( if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil { w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) } - if err = s.Mutate(ctx, ddlStmt.Query); err != nil { + if err = s.Mutate(ctx, syncer, ddlStmt.Query, nil); err != nil { if errors.Is(err, context.Canceled) { return nil } @@ -365,6 +368,7 @@ func mutation( globalStatus *status.GlobalStatus, deletes bool, logger *zap.Logger, + syncer *store.MutateSyncer, ) error { mutateStmt, err := GenMutateStmt(schema, table, g, r, p, deletes) if err != nil { @@ -388,7 +392,7 @@ func mutation( if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil { w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) } - if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil { + if err = s.Mutate(ctx, syncer, mutateQuery, mutateValues); err != nil { if errors.Is(err, context.Canceled) { return nil } @@ -413,6 +417,7 @@ func validation( g *generators.Generator, _ *status.GlobalStatus, logger *zap.Logger, + syncer *store.CheckSyncer, ) error { if stmt.ValuesWithToken != nil { defer func() { @@ -437,7 +442,7 @@ func validation( attempt := 1 for { lastErr = err - err = s.Check(ctx, table, stmt.Query, stmt.Values...) + err = s.Check(ctx, syncer, table, stmt.Query, stmt.Values) if err == nil { if attempt > 1 { diff --git a/pkg/store/cqlstore.go b/pkg/store/cqlstore.go index 82a99868..27a0ea06 100644 --- a/pkg/store/cqlstore.go +++ b/pkg/store/cqlstore.go @@ -45,12 +45,12 @@ func (cs *cqlStore) name() string { return cs.system } -func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) { +func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values typedef.Values) (err error) { var i int for i = 0; i < cs.maxRetriesMutate; i++ { // retry with new timestamp as list modification with the same ts // will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937 - err = cs.doMutate(ctx, builder, time.Now(), values...) + err = cs.doMutate(ctx, builder, time.Now(), values) if err == nil { cs.ops.WithLabelValues(cs.system, opType(builder)).Inc() return nil @@ -67,7 +67,7 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in return err } -func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error { +func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values typedef.Values) error { queryBody, _ := builder.ToCql() query := cs.session.Query(queryBody, values...).WithContext(ctx) @@ -90,7 +90,7 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti return nil } -func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) { +func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values typedef.Values) (result []map[string]interface{}, err error) { query, _ := builder.ToCql() iter := cs.session.Query(query, values...).WithContext(ctx).Iter() cs.ops.WithLabelValues(cs.system, opType(builder)).Inc() diff --git a/pkg/store/store.go b/pkg/store/store.go index 267c6d50..c79618da 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -39,11 +39,11 @@ import ( ) type loader interface { - load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) + load(context.Context, qb.Builder, typedef.Values) ([]map[string]interface{}, error) } type storer interface { - mutate(context.Context, qb.Builder, ...interface{}) error + mutate(context.Context, qb.Builder, typedef.Values) error } type storeLoader interface { @@ -55,8 +55,10 @@ type storeLoader interface { type Store interface { Create(context.Context, qb.Builder, qb.Builder) error - Mutate(context.Context, qb.Builder, ...interface{}) error - Check(context.Context, *typedef.Table, qb.Builder, ...interface{}) error + Mutate(context.Context, *MutateSyncer, qb.Builder, typedef.Values) error + Check(context.Context, *CheckSyncer, *typedef.Table, qb.Builder, typedef.Values) error + InitMutateSyncer(context.Context) MutateSyncer + InitCheckSyncer(context.Context) CheckSyncer Close() error } @@ -123,11 +125,11 @@ type noOpStore struct { system string } -func (n *noOpStore) mutate(context.Context, qb.Builder, ...interface{}) error { +func (n *noOpStore) mutate(context.Context, qb.Builder, typedef.Values) error { return nil } -func (n *noOpStore) load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) { +func (n *noOpStore) load(context.Context, qb.Builder, typedef.Values) ([]map[string]interface{}, error) { return nil, nil } @@ -151,37 +153,47 @@ type delegatingStore struct { } func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder qb.Builder) error { - if err := mutate(ctx, ds.oracleStore, oracleBuilder, []interface{}{}); err != nil { + if err := mutate(ctx, ds.oracleStore, oracleBuilder, typedef.Values{}); err != nil { return errors.Wrap(err, "oracle failed store creation") } - if err := mutate(ctx, ds.testStore, testBuilder, []interface{}{}); err != nil { + if err := mutate(ctx, ds.testStore, testBuilder, typedef.Values{}); err != nil { return errors.Wrap(err, "test failed store creation") } return nil } -func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error { - if err := mutate(ctx, ds.oracleStore, builder, values...); err != nil { +func (ds delegatingStore) Mutate(ctx context.Context, syncer *MutateSyncer, builder qb.Builder, values typedef.Values) error { + syncer.oracleSend(ctx, builder, values) + + if err := ds.testStore.mutate(ctx, builder, values); err != nil { + // Oracle failed, transition cannot take place + ds.logger.Info("test store failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(err)) + return nil + } + err := syncer.oracleGet() + if err != nil { // Oracle failed, transition cannot take place ds.logger.Info("oracle failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(err)) return nil } - return mutate(ctx, ds.testStore, builder, values...) + return nil } -func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values ...interface{}) error { - if err := s.mutate(ctx, builder, values...); err != nil { +func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values typedef.Values) error { + if err := s.mutate(ctx, builder, values); err != nil { return errors.Wrapf(err, "unable to apply mutations to the %s store", s.name()) } return nil } -func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, builder qb.Builder, values ...interface{}) error { +func (ds delegatingStore) Check(ctx context.Context, syncer *CheckSyncer, table *typedef.Table, builder qb.Builder, values typedef.Values) error { + syncer.oracleSend(ctx, builder, values) + testRows, err := ds.testStore.load(ctx, builder, values) if err != nil { return errors.Wrapf(err, "unable to load check data from the test store") } - oracleRows, err := ds.oracleStore.load(ctx, builder, values) + oracleRows, err := syncer.oracleGet() if err != nil { return errors.Wrapf(err, "unable to load check data from the oracle store") } diff --git a/pkg/store/syncer.go b/pkg/store/syncer.go new file mode 100644 index 00000000..f2b61c8b --- /dev/null +++ b/pkg/store/syncer.go @@ -0,0 +1,125 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "context" + + "github.com/scylladb/gocqlx/v2/qb" + + "github.com/scylladb/gemini/pkg/typedef" +) + +// CheckSyncer helps to synchronize start time of identical check queries on oracle store and on test store +type CheckSyncer struct { + msgChan chan checkMsg +} + +type checkMsg struct { + builder qb.Builder + ctx context.Context + values *typedef.Values + err error + rows *[]map[string]interface{} +} + +// InitCheckSyncer creates CheckSyncer. CheckSyncer should be created for each 'validationJob' goroutine. +func (ds delegatingStore) InitCheckSyncer(ctx context.Context) CheckSyncer { + syncer := CheckSyncer{ + msgChan: make(chan checkMsg), + } + go func() { + defer close(syncer.msgChan) + var msg checkMsg + exit: + for { + select { + case msg = <-syncer.msgChan: + msg.rows = &[]map[string]interface{}{} + *msg.rows, msg.err = ds.oracleStore.load(msg.ctx, msg.builder, *msg.values) + msg.values = nil + syncer.msgChan <- msg + case <-ctx.Done(): + continue exit + } + } + }() + return syncer +} + +// oracleSend send parallel check request on oracle store +func (s *CheckSyncer) oracleSend(ctx context.Context, builder qb.Builder, values typedef.Values) { + s.msgChan <- checkMsg{ + ctx: ctx, + builder: builder, + values: &values, + } +} + +// oracleGet get response from ran parallel check request on oracle store by oracleSend +func (s *CheckSyncer) oracleGet() ([]map[string]interface{}, error) { + get := <-s.msgChan + return *get.rows, get.err +} + +// MutateSyncer helps to synchronize start time of identical mutate queries on oracle store and on test store +type MutateSyncer struct { + msgChan chan mutateMsg +} + +type mutateMsg struct { + builder qb.Builder + ctx context.Context + values *typedef.Values + err error +} + +// InitMutateSyncer creates MutateSyncer. MutateSyncer should be created for each 'mutationJob' and 'warmupJob' goroutine. +func (ds delegatingStore) InitMutateSyncer(ctx context.Context) MutateSyncer { + syncer := MutateSyncer{ + msgChan: make(chan mutateMsg), + } + go func() { + defer close(syncer.msgChan) + var msg mutateMsg + exit: + for { + select { + case msg = <-syncer.msgChan: + msg.err = ds.oracleStore.mutate(msg.ctx, msg.builder, *msg.values) + msg.values = nil + syncer.msgChan <- msg + case <-ctx.Done(): + continue exit + } + } + }() + return syncer +} + +// oracleSend send parallel mutate request on oracle store +func (s *MutateSyncer) oracleSend(ctx context.Context, builder qb.Builder, values typedef.Values) { + s.msgChan <- mutateMsg{ + ctx: ctx, + builder: builder, + values: &values, + } +} + +// oracleGet get response from ran parallel mutate request on oracle store by oracleSend +func (s *MutateSyncer) oracleGet() error { + msg := <-s.msgChan + return msg.err +}