Skip to content

Commit

Permalink
gemini: adds support for consistency levels
Browse files Browse the repository at this point in the history
A new CLI arg `consistency` is added which is then consistently
used throughout the application.
  • Loading branch information
Henrik Johansson committed Jun 11, 2019
1 parent 72cd196 commit d055ac0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- Support for compation strategies added via a CLI argument `compaction-strategy`
- Support for changing consistency level via a CLI argument `consistency`.
- Support for compaction strategies added via a CLI argument `compaction-strategy`
as a set of string values "stcs", "twcs" or "lcs" which will make Gemini choose
the default values for the properties of the respective compaction strategies.
Alternatively the JSON-like definition of the compaction-strategy can be supplied
Expand Down
22 changes: 21 additions & 1 deletion cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/briandowns/spinner"
"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/scylladb/gemini"
Expand All @@ -44,6 +45,7 @@ var (
bind string
warmup time.Duration
compactionStrategy string
consistency string
)

const (
Expand Down Expand Up @@ -151,6 +153,12 @@ func (cb createBuilder) ToCql() (stmt string, names []string) {

func run(cmd *cobra.Command, args []string) {

cons, err := gocql.ParseConsistencyWrapper(consistency)
if err != nil {
fmt.Printf("Unable parse consistency, error=%s. Falling back on Quorum\n", err)
cons = gocql.Quorum
}

go func() {
http.Handle("/metrics", promhttp.Handler())
_ = http.ListenAndServe(bind, nil)
Expand Down Expand Up @@ -190,7 +198,8 @@ func run(cmd *cobra.Command, args []string) {
jsonSchema, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Schema: %v\n", string(jsonSchema))

store := store.New(schema, testClusterHost, oracleClusterHost)
testCluster, oracleCluster := createClusters(cons)
store := store.New(schema, testCluster, oracleCluster)
defer store.Close()

if dropSchema && mode != readMode {
Expand All @@ -217,6 +226,16 @@ func run(cmd *cobra.Command, args []string) {
runJob(Job, schema, store, mode, outFile)
}

func createClusters(consistency gocql.Consistency) (*gocql.ClusterConfig, *gocql.ClusterConfig) {
testCluster := gocql.NewCluster(testClusterHost...)
testCluster.Timeout = 5 * time.Second
testCluster.Consistency = consistency
oracleCluster := gocql.NewCluster(oracleClusterHost...)
oracleCluster.Timeout = 5 * time.Second
oracleCluster.Consistency = consistency
return testCluster, oracleCluster
}

func getCompactionStrategy(cs string) *gemini.CompactionStrategy {
switch cs {
case "stcs":
Expand Down Expand Up @@ -449,6 +468,7 @@ func init() {
rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'")
rootCmd.Flags().DurationVarP(&warmup, "warmup", "", 30*time.Second, "Specify the warmup perid as a duration for example 30s or 10h")
rootCmd.Flags().StringVarP(&compactionStrategy, "compaction-strategy", "", "", "Specify the desired CS as either the coded short hand stcs|twcs|lcs to get the default for each type or provide the entire specification in the form {'class':'....'}")
rootCmd.Flags().StringVarP(&consistency, "consistency", "", "QUORUM", "Specify the desired consistency as ANY|ONE|TWO|THREE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|LOCAL_ONE")
}

func printSetup() error {
Expand Down
6 changes: 2 additions & 4 deletions store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, ts time.Time
query, _ := builder.ToCql()
var tsUsec int64 = ts.UnixNano() / 1000
if err := cs.session.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
return errors.Errorf("%v [cluster = test, query = '%s']", err, query)
return errors.Errorf("%v [cluster = %s, query = '%s']", err, cs.name, query)
}
cs.ops.WithLabelValues(cs.name, opType(builder)).Inc()
return nil
Expand All @@ -47,9 +47,7 @@ func (cs cqlStore) close() error {
return nil
}

func newSession(hosts []string) *gocql.Session {
cluster := gocql.NewCluster(hosts...)
cluster.Timeout = 5 * time.Second
func newSession(cluster *gocql.ClusterConfig) *gocql.Session {
session, err := cluster.CreateSession()
if err != nil {
panic(err)
Expand Down
7 changes: 4 additions & 3 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"time"

"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
Expand Down Expand Up @@ -39,21 +40,21 @@ type Store interface {
Close() error
}

func New(schema *gemini.Schema, testHosts []string, oracleHosts []string) Store {
func New(schema *gemini.Schema, testCluster *gocql.ClusterConfig, oracleCluster *gocql.ClusterConfig) Store {
ops := promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gemini_cql_requests",
Help: "How many CQL requests processed, partitioned by name and CQL query type aka 'method' (batch, delete, insert, update).",
}, []string{"name", "method"},
)
return &delegatingStore{
testStore: &cqlStore{
session: newSession(testHosts),
session: newSession(testCluster),
schema: schema,
name: "test",
ops: ops,
},
oracleStore: &cqlStore{
session: newSession(oracleHosts),
session: newSession(oracleCluster),
schema: schema,
name: "oracle",
ops: ops,
Expand Down

0 comments on commit d055ac0

Please sign in to comment.