Skip to content

Commit

Permalink
Merge 'schema: Support for materialized views' from Henrik
Browse files Browse the repository at this point in the history
"Initial support for materialized views. Not all possible permutations
 are implemented but a good bunch of common cases should be covered.

 Fixes: #5"

* origin/materialized_views:
  schema: Support for materialized views.
  • Loading branch information
penberg committed May 3, 2019
2 parents db54ff5 + 85a4c6f commit 0748cd5
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Gemini version is now available in the resulting output.
- Materialized Views support.
- Improved error handling in validation code.
- Avoiding small double booking of write ops in case of mutation errors.
- Printing executable CQL statements when logging errors or in verbose mode.
Expand Down
177 changes: 141 additions & 36 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
)

Expand Down Expand Up @@ -69,12 +70,19 @@ func (cs Columns) ToJSONMap(values map[string]interface{}, p *PartitionRange) ma
}

type Table struct {
Name string `json:"name"`
PartitionKeys Columns `json:"partition_keys"`
ClusteringKeys Columns `json:"clustering_keys"`
Columns Columns `json:"columns"`
Indexes []IndexDef `json:"indexes"`
KnownIssues map[string]bool `json:"known_issues"`
Name string `json:"name"`
PartitionKeys Columns `json:"partition_keys"`
ClusteringKeys Columns `json:"clustering_keys"`
Columns Columns `json:"columns"`
Indexes []IndexDef `json:"indexes"`
MaterializedViews []MaterializedView `json:"materialized_views"`
KnownIssues map[string]bool `json:"known_issues"`
}

type MaterializedView struct {
Name string `json:"name"`
PartitionKeys Columns `json:"partition_keys"`
ClusteringKeys Columns `json:"clustering_keys"`
}

type Stmt struct {
Expand Down Expand Up @@ -151,12 +159,43 @@ func GenSchema() *Schema {
}
}
}
validMVColumn := func() ColumnDef {
validCols := make([]ColumnDef, 0, len(columns))
for _, col := range columns {
valid := false
for _, pkType := range pkTypes {
if col.Type.Name() == pkType.Name() {
valid = true
break
}
}
if valid {
validCols = append(validCols, col)
}
}
return validCols[rand.Intn(len(validCols))]
}
var mvs []MaterializedView
numMvs := 1
for i := 0; i < numMvs; i++ {
cols := []ColumnDef{
validMVColumn(),
}
mv := MaterializedView{
Name: "table1_mv_" + strconv.Itoa(i),
PartitionKeys: append(cols, partitionKeys...),
ClusteringKeys: clusteringKeys,
}
mvs = append(mvs, mv)
}

table := Table{
Name: "table1",
PartitionKeys: partitionKeys,
ClusteringKeys: clusteringKeys,
Columns: columns,
Indexes: indexes,
Name: "table1",
PartitionKeys: partitionKeys,
ClusteringKeys: clusteringKeys,
Columns: columns,
MaterializedViews: mvs,
Indexes: indexes,
KnownIssues: map[string]bool{
KnownIssuesJsonWithTuples: true,
},
Expand Down Expand Up @@ -209,6 +248,40 @@ func (s *Schema) GetCreateSchema() []string {
for _, idef := range t.Indexes {
stmts = append(stmts, fmt.Sprintf("CREATE INDEX %s ON %s.%s (%s)", idef.Name, s.Keyspace.Name, t.Name, idef.Column))
}
for _, mv := range t.MaterializedViews {
var (
mvPartitionKeys []string
mvPrimaryKeysNotNull []string
mvClusteringKeys []string
)
for _, pk := range mv.PartitionKeys {
mvPartitionKeys = append(mvPartitionKeys, pk.Name)
mvPrimaryKeysNotNull = append(mvPrimaryKeysNotNull, fmt.Sprintf("%s IS NOT NULL", pk.Name))
}
for _, ck := range mv.ClusteringKeys {
mvClusteringKeys = append(mvClusteringKeys, ck.Name)
mvPrimaryKeysNotNull = append(mvPrimaryKeysNotNull, fmt.Sprintf("%s IS NOT NULL", ck.Name))
}
var createMaterializedView string
if len(mv.PartitionKeys) == 1 {
createMaterializedView = "CREATE MATERIALIZED VIEW %s.%s AS SELECT * FROM %s.%s WHERE %s PRIMARY KEY (%s"
} else {
createMaterializedView = "CREATE MATERIALIZED VIEW %s.%s AS SELECT * FROM %s.%s WHERE %s PRIMARY KEY ((%s)"
}
if len(mvClusteringKeys) > 0 {
createMaterializedView = createMaterializedView + ",%s)"
stmts = append(stmts, fmt.Sprintf(createMaterializedView,
s.Keyspace.Name, mv.Name, s.Keyspace.Name, t.Name,
strings.Join(mvPrimaryKeysNotNull, " AND "),
strings.Join(mvPartitionKeys, ","), strings.Join(clusteringKeys, ",")))
} else {
createMaterializedView = createMaterializedView + ")"
stmts = append(stmts, fmt.Sprintf(createMaterializedView,
s.Keyspace.Name, mv.Name, s.Keyspace.Name, t.Name,
strings.Join(mvPrimaryKeysNotNull, " AND "),
strings.Join(mvPartitionKeys, ",")))
}
}
}
return stmts
}
Expand Down Expand Up @@ -336,15 +409,22 @@ func (s *Schema) GenCheckStmt(t Table, p *PartitionRange) *Stmt {
}

func (s *Schema) genSinglePartitionQuery(t Table, p *PartitionRange) *Stmt {
tableName := t.Name
partitionKeys := t.PartitionKeys
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
view := p.Rand.Intn(len(t.MaterializedViews))
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
}
var relations []string
values := make([]interface{}, 0)
typs := make([]Type, 0, 10)
for _, pk := range t.PartitionKeys {
for _, pk := range partitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
Expand All @@ -360,18 +440,25 @@ func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
values []interface{}
typs []Type
)
pkNum := p.Rand.Intn(len(t.PartitionKeys))
tableName := t.Name
partitionKeys := t.PartitionKeys
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
view := p.Rand.Intn(len(t.MaterializedViews))
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
}
pkNum := p.Rand.Intn(len(partitionKeys))
if pkNum == 0 {
pkNum = 1
}
for _, pk := range t.PartitionKeys {
for _, pk := range partitionKeys {
relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ",")))
for i := 0; i < pkNum; i++ {
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
}
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
Expand All @@ -387,24 +474,33 @@ func (s *Schema) genClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
values []interface{}
typs []Type
)
for _, pk := range t.PartitionKeys {
tableName := t.Name
partitionKeys := t.PartitionKeys
clusteringKeys := t.ClusteringKeys
view := p.Rand.Intn(len(t.MaterializedViews))
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
clusteringKeys = t.MaterializedViews[view].ClusteringKeys
}
for _, pk := range partitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
maxClusteringRels := 0
if len(t.ClusteringKeys) > 1 {
maxClusteringRels = p.Rand.Intn(len(t.ClusteringKeys) - 1)
if len(clusteringKeys) > 1 {
maxClusteringRels = p.Rand.Intn(len(clusteringKeys) - 1)
for i := 0; i < maxClusteringRels; i++ {
relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name))
values = appendValue(t.ClusteringKeys[i].Type, p, values)
typs = append(typs, t.ClusteringKeys[i].Type)
relations = append(relations, fmt.Sprintf("%s = ?", clusteringKeys[i].Name))
values = appendValue(clusteringKeys[i].Type, p, values)
typs = append(typs, clusteringKeys[i].Type)
}
}
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name))
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", clusteringKeys[maxClusteringRels].Name, clusteringKeys[maxClusteringRels].Name))
values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values)
typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
typs = append(typs, clusteringKeys[maxClusteringRels].Type, clusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
Expand All @@ -420,30 +516,39 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR
values []interface{}
typs []Type
)
pkNum := p.Rand.Intn(len(t.PartitionKeys))
tableName := t.Name
partitionKeys := t.PartitionKeys
clusteringKeys := t.ClusteringKeys
view := p.Rand.Intn(len(t.MaterializedViews))
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
tableName = t.MaterializedViews[view].Name
partitionKeys = t.MaterializedViews[view].PartitionKeys
clusteringKeys = t.MaterializedViews[view].ClusteringKeys
}
pkNum := p.Rand.Intn(len(partitionKeys))
if pkNum == 0 {
pkNum = 1
}
for _, pk := range t.PartitionKeys {
for _, pk := range partitionKeys {
relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ",")))
for i := 0; i < pkNum; i++ {
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
}
maxClusteringRels := 0
if len(t.ClusteringKeys) > 1 {
maxClusteringRels = p.Rand.Intn(len(t.ClusteringKeys) - 1)
if len(clusteringKeys) > 1 {
maxClusteringRels = p.Rand.Intn(len(clusteringKeys) - 1)
for i := 0; i < maxClusteringRels; i++ {
relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name))
values = appendValue(t.ClusteringKeys[i].Type, p, values)
typs = append(typs, t.ClusteringKeys[i].Type)
relations = append(relations, fmt.Sprintf("%s = ?", clusteringKeys[i].Name))
values = appendValue(clusteringKeys[i].Type, p, values)
typs = append(typs, clusteringKeys[i].Type)
}
}
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name))
values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values)
typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", clusteringKeys[maxClusteringRels].Name, clusteringKeys[maxClusteringRels].Name))
values = appendValueRange(clusteringKeys[maxClusteringRels].Type, p, values)
typs = append(typs, clusteringKeys[maxClusteringRels].Type, clusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
Expand Down

0 comments on commit 0748cd5

Please sign in to comment.