Skip to content

Commit

Permalink
Merge pull request scylladb#274 from aleksbykov/gemini-issues-fixes
Browse files Browse the repository at this point in the history
Gemini issues fixes
  • Loading branch information
roydahan authored Sep 1, 2022
2 parents c3d3858 + dcd270a commit 8bcf95d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 9 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
# Changelog
## [1.7.7] - 2022-08-24

- Fix issue: max-partition-keys should align with Scylla's --max-partition-key-restrictions-per-query and --max-clustering-key-restrictions-per-query configuration options([#271](https://github.com/scylladb/gemini/issues/271))
- Fix issue: wrong number of values passed to prepared select query ([272](https://github.com/scylladb/gemini/issues/272))
- Fix issue: Gemini issues a query involved with data filtering without using ALLOW FILTERING and thus may have unpredictable performance ([273](https://github.com/scylladb/gemini/issues/273))
- Support request and connection timeout. Could be set via cli parameters

## [1.7.6] - 2022-08-03

### Fixed

- fixed error handling in load function


## [1.7.6] - 2022-08-03

Expand Down Expand Up @@ -76,7 +89,7 @@
- Gemini ensures that material views can be created in the default case by simply
creating enough keys and columns.

## 1.4.2
## 1.4.2

- Reused primary keys does no longer block the caller if none are available.
- Primary key generation no longer blocks if the targeted source is full.
Expand Down
13 changes: 9 additions & 4 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ var (
testClusterHostSelectionPolicy string
oracleClusterHostSelectionPolicy string
useServerSideTimestamps bool
requestTimeout time.Duration
connectTimeout time.Duration
)

const (
Expand Down Expand Up @@ -394,11 +396,12 @@ func createLogger(level string) *zap.Logger {
func createClusters(consistency gocql.Consistency, testHostSelectionPolicy gocql.HostSelectionPolicy, oracleHostSelectionPolicy gocql.HostSelectionPolicy, logger *zap.Logger) (*gocql.ClusterConfig, *gocql.ClusterConfig) {
retryPolicy := &gocql.ExponentialBackoffRetryPolicy{
Min: time.Second,
Max: 10 * time.Second,
Max: 60 * time.Second,
NumRetries: 5,
}
testCluster := gocql.NewCluster(testClusterHost...)
testCluster.Timeout = 5 * time.Second
testCluster.Timeout = requestTimeout
testCluster.ConnectTimeout = connectTimeout
testCluster.RetryPolicy = retryPolicy
testCluster.Consistency = consistency
testCluster.PoolConfig.HostSelectionPolicy = testHostSelectionPolicy
Expand All @@ -411,7 +414,7 @@ func createClusters(consistency gocql.Consistency, testHostSelectionPolicy gocql
return testCluster, nil
}
oracleCluster := gocql.NewCluster(oracleClusterHost...)
oracleCluster.Timeout = 5 * time.Second
oracleCluster.Timeout = 120 * time.Second
oracleCluster.RetryPolicy = retryPolicy
oracleCluster.Consistency = consistency
oracleCluster.PoolConfig.HostSelectionPolicy = oracleHostSelectionPolicy
Expand Down Expand Up @@ -534,7 +537,9 @@ func init() {
rootCmd.Flags().StringVarP(&oracleClusterHostSelectionPolicy, "oracle-host-selection-policy", "", "round-robin", "Host selection policy used by the driver for the oracle cluster: round-robin|host-pool|token-aware")
rootCmd.Flags().StringVarP(&testClusterHostSelectionPolicy, "test-host-selection-policy", "", "round-robin", "Host selection policy used by the driver for the test cluster: round-robin|host-pool|token-aware")
rootCmd.Flags().BoolVarP(&useServerSideTimestamps, "use-server-timestamps", "", false, "Use server-side generated timestamps for writes")
}
rootCmd.Flags().DurationVarP(&requestTimeout, "request-timeout", "", 30*time.Second, "Duration of waiting request execution")
rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established")
}

func printSetup() error {
tw := new(tabwriter.Writer)
Expand Down
38 changes: 34 additions & 4 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gemini
import (
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -351,6 +352,7 @@ type MaterializedView struct {
Name string `json:"name"`
PartitionKeys Columns `json:"partition_keys"`
ClusteringKeys Columns `json:"clustering_keys"`
NonPrimaryKey ColumnDef
}

type Stmt struct {
Expand Down Expand Up @@ -544,6 +546,7 @@ func createMaterializedViews(table Table, partitionKeys []ColumnDef, clusteringK
Name: fmt.Sprintf("%s_mv_%d", table.Name, i),
PartitionKeys: append(cols, partitionKeys...),
ClusteringKeys: clusteringKeys,
NonPrimaryKey: col,
}
mvs = append(mvs, mv)
}
Expand Down Expand Up @@ -847,13 +850,18 @@ func (s *Schema) GenCheckStmt(t *Table, g *Generator, r *rand.Rand, p PartitionR
func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()
var (
mv_col ColumnDef
mv_values []interface{}
)

tableName := t.Name
partitionKeys := t.PartitionKeys
if len(t.MaterializedViews) > 0 && r.Int()%2 == 0 {
view := r.Intn(len(t.MaterializedViews))
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
mv_col = t.MaterializedViews[view].NonPrimaryKey
}
builder := qb.Select(s.Keyspace.Name + "." + tableName)
typs := make([]Type, 0, 10)
Expand All @@ -865,6 +873,11 @@ func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p
if !ok {
return nil
}
if (ColumnDef{}) != mv_col {
mv_values = appendValue(mv_col.Type, r, p, mv_values)
values.Value = append(mv_values, values.Value...)
}

return &Stmt{
Query: builder,
Values: func() (uint64, []interface{}) {
Expand Down Expand Up @@ -894,6 +907,11 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, g *Generator, r *rand.Rand,
if numQueryPKs == 0 {
numQueryPKs = 1
}
multiplier := int(math.Pow(float64(numQueryPKs), float64(len(partitionKeys))))
if multiplier > 100 {
numQueryPKs = 1
}

builder := qb.Select(s.Keyspace.Name + "." + tableName)
for i, pk := range partitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, numQueryPKs))
Expand Down Expand Up @@ -928,6 +946,8 @@ func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p

var (
typs []Type
mv_col ColumnDef
mv_values []interface{}
)
tableName := t.Name
partitionKeys := t.PartitionKeys
Expand All @@ -937,6 +957,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
clusteringKeys = t.MaterializedViews[view].ClusteringKeys
mv_col = t.MaterializedViews[view].NonPrimaryKey
}
builder := qb.Select(s.Keyspace.Name + "." + tableName)
vs, ok := g.GetOld()
Expand All @@ -949,6 +970,10 @@ func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p
builder = builder.Where(qb.Eq(pk.Name))
typs = append(typs, pk.Type)
}
if (ColumnDef{}) != mv_col {
mv_values = appendValue(mv_col.Type, r, p, mv_values)
values = append(mv_values, values...)
}
if len(clusteringKeys) > 0 {
maxClusteringRels := r.Intn(len(clusteringKeys))
for i := 0; i < maxClusteringRels; i++ {
Expand Down Expand Up @@ -992,6 +1017,10 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, g *Generator
if numQueryPKs == 0 {
numQueryPKs = 1
}
multiplier := int(math.Pow(float64(numQueryPKs), float64(len(partitionKeys))))
if multiplier > 100 {
numQueryPKs = 1
}
builder := qb.Select(s.Keyspace.Name + "." + tableName)
for i, pk := range partitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, numQueryPKs))
Expand Down Expand Up @@ -1045,18 +1074,19 @@ func (s *Schema) genSingleIndexQuery(t *Table, g *Generator, r *rand.Rand, p Par
return nil
}

/* Once we have ALLOW FILTERING SUPPORT this can be applied
pkNum := p.Rand.Intn(len(t.PartitionKeys))
pkNum := r.Intn(len(t.Indexes))
if pkNum == 0 {
pkNum = 1
}
*/
indexes := t.Indexes[:pkNum]
builder := qb.Select(s.Keyspace.Name + "." + t.Name)
for _, idx := range t.Indexes {
builder.AllowFiltering()
for _, idx := range indexes {
builder = builder.Where(qb.Eq(idx.Column))
values = appendValue(t.Columns[idx.ColumnIdx].Type, r, p, values)
typs = append(typs, t.Columns[idx.ColumnIdx].Type)
}

return &Stmt{
Query: builder,
Values: func() (uint64, []interface{}) {
Expand Down
4 changes: 4 additions & 0 deletions types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func TestMarshalUnmarshal(t *testing.T) {
Type: genUDTType(sc),
},
},
NonPrimaryKey: ColumnDef{
Name: "",
Type: SimpleType(""),
},
},
},
},
Expand Down

0 comments on commit 8bcf95d

Please sign in to comment.