From 85a4c6f0ce6bfc7bc03300ca6e9fe9c0686d1f6e Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Thu, 2 May 2019 15:50:49 +0200 Subject: [PATCH] schema: Support for materialized views. --- CHANGELOG.md | 1 + schema.go | 177 ++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 142 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e02f46c..942b3b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Gemini version is now available in the resulting output. +- Materialized Views support. - Improved error handling in validation code. - Avoiding small double booking of write ops in case of mutation errors. - Printing executable CQL statements when logging errors or in verbose mode. diff --git a/schema.go b/schema.go index 975ed416..f132fe59 100644 --- a/schema.go +++ b/schema.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math/rand" + "strconv" "strings" ) @@ -69,12 +70,19 @@ func (cs Columns) ToJSONMap(values map[string]interface{}, p *PartitionRange) ma } type Table struct { - Name string `json:"name"` - PartitionKeys Columns `json:"partition_keys"` - ClusteringKeys Columns `json:"clustering_keys"` - Columns Columns `json:"columns"` - Indexes []IndexDef `json:"indexes"` - KnownIssues map[string]bool `json:"known_issues"` + Name string `json:"name"` + PartitionKeys Columns `json:"partition_keys"` + ClusteringKeys Columns `json:"clustering_keys"` + Columns Columns `json:"columns"` + Indexes []IndexDef `json:"indexes"` + MaterializedViews []MaterializedView `json:"materialized_views"` + KnownIssues map[string]bool `json:"known_issues"` +} + +type MaterializedView struct { + Name string `json:"name"` + PartitionKeys Columns `json:"partition_keys"` + ClusteringKeys Columns `json:"clustering_keys"` } type Stmt struct { @@ -151,12 +159,43 @@ func GenSchema() *Schema { } } } + validMVColumn := func() ColumnDef { + validCols := make([]ColumnDef, 0, len(columns)) + for _, col := range columns { + valid := false + for _, pkType := range pkTypes { + if col.Type.Name() == pkType.Name() { + valid = true + break + } + } + if valid { + validCols = append(validCols, col) + } + } + return validCols[rand.Intn(len(validCols))] + } + var mvs []MaterializedView + numMvs := 1 + for i := 0; i < numMvs; i++ { + cols := []ColumnDef{ + validMVColumn(), + } + mv := MaterializedView{ + Name: "table1_mv_" + strconv.Itoa(i), + PartitionKeys: append(cols, partitionKeys...), + ClusteringKeys: clusteringKeys, + } + mvs = append(mvs, mv) + } + table := Table{ - Name: "table1", - PartitionKeys: partitionKeys, - ClusteringKeys: clusteringKeys, - Columns: columns, - Indexes: indexes, + Name: "table1", + PartitionKeys: partitionKeys, + ClusteringKeys: clusteringKeys, + Columns: columns, + MaterializedViews: mvs, + Indexes: indexes, KnownIssues: map[string]bool{ KnownIssuesJsonWithTuples: true, }, @@ -209,6 +248,40 @@ func (s *Schema) GetCreateSchema() []string { for _, idef := range t.Indexes { stmts = append(stmts, fmt.Sprintf("CREATE INDEX %s ON %s.%s (%s)", idef.Name, s.Keyspace.Name, t.Name, idef.Column)) } + for _, mv := range t.MaterializedViews { + var ( + mvPartitionKeys []string + mvPrimaryKeysNotNull []string + mvClusteringKeys []string + ) + for _, pk := range mv.PartitionKeys { + mvPartitionKeys = append(mvPartitionKeys, pk.Name) + mvPrimaryKeysNotNull = append(mvPrimaryKeysNotNull, fmt.Sprintf("%s IS NOT NULL", pk.Name)) + } + for _, ck := range mv.ClusteringKeys { + mvClusteringKeys = append(mvClusteringKeys, ck.Name) + mvPrimaryKeysNotNull = append(mvPrimaryKeysNotNull, fmt.Sprintf("%s IS NOT NULL", ck.Name)) + } + var createMaterializedView string + if len(mv.PartitionKeys) == 1 { + createMaterializedView = "CREATE MATERIALIZED VIEW %s.%s AS SELECT * FROM %s.%s WHERE %s PRIMARY KEY (%s" + } else { + createMaterializedView = "CREATE MATERIALIZED VIEW %s.%s AS SELECT * FROM %s.%s WHERE %s PRIMARY KEY ((%s)" + } + if len(mvClusteringKeys) > 0 { + createMaterializedView = createMaterializedView + ",%s)" + stmts = append(stmts, fmt.Sprintf(createMaterializedView, + s.Keyspace.Name, mv.Name, s.Keyspace.Name, t.Name, + strings.Join(mvPrimaryKeysNotNull, " AND "), + strings.Join(mvPartitionKeys, ","), strings.Join(clusteringKeys, ","))) + } else { + createMaterializedView = createMaterializedView + ")" + stmts = append(stmts, fmt.Sprintf(createMaterializedView, + s.Keyspace.Name, mv.Name, s.Keyspace.Name, t.Name, + strings.Join(mvPrimaryKeysNotNull, " AND "), + strings.Join(mvPartitionKeys, ","))) + } + } } return stmts } @@ -336,15 +409,22 @@ func (s *Schema) GenCheckStmt(t Table, p *PartitionRange) *Stmt { } func (s *Schema) genSinglePartitionQuery(t Table, p *PartitionRange) *Stmt { + tableName := t.Name + partitionKeys := t.PartitionKeys + if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 { + view := p.Rand.Intn(len(t.MaterializedViews)) + tableName = t.MaterializedViews[view].Name + partitionKeys = t.MaterializedViews[view].PartitionKeys + } var relations []string values := make([]interface{}, 0) typs := make([]Type, 0, 10) - for _, pk := range t.PartitionKeys { + for _, pk := range partitionKeys { relations = append(relations, fmt.Sprintf("%s = ?", pk.Name)) values = appendValue(pk.Type, p, values) typs = append(typs, pk.Type) } - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND ")) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND ")) return &Stmt{ Query: query, Values: func() []interface{} { @@ -360,18 +440,25 @@ func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt { values []interface{} typs []Type ) - pkNum := p.Rand.Intn(len(t.PartitionKeys)) + tableName := t.Name + partitionKeys := t.PartitionKeys + if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 { + view := p.Rand.Intn(len(t.MaterializedViews)) + tableName = t.MaterializedViews[view].Name + partitionKeys = t.MaterializedViews[view].PartitionKeys + } + pkNum := p.Rand.Intn(len(partitionKeys)) if pkNum == 0 { pkNum = 1 } - for _, pk := range t.PartitionKeys { + for _, pk := range partitionKeys { relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ","))) for i := 0; i < pkNum; i++ { values = appendValue(pk.Type, p, values) typs = append(typs, pk.Type) } } - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND ")) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND ")) return &Stmt{ Query: query, Values: func() []interface{} { @@ -387,24 +474,33 @@ func (s *Schema) genClusteringRangeQuery(t Table, p *PartitionRange) *Stmt { values []interface{} typs []Type ) - for _, pk := range t.PartitionKeys { + tableName := t.Name + partitionKeys := t.PartitionKeys + clusteringKeys := t.ClusteringKeys + view := p.Rand.Intn(len(t.MaterializedViews)) + if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 { + tableName = t.MaterializedViews[view].Name + partitionKeys = t.MaterializedViews[view].PartitionKeys + clusteringKeys = t.MaterializedViews[view].ClusteringKeys + } + for _, pk := range partitionKeys { relations = append(relations, fmt.Sprintf("%s = ?", pk.Name)) values = appendValue(pk.Type, p, values) typs = append(typs, pk.Type) } maxClusteringRels := 0 - if len(t.ClusteringKeys) > 1 { - maxClusteringRels = p.Rand.Intn(len(t.ClusteringKeys) - 1) + if len(clusteringKeys) > 1 { + maxClusteringRels = p.Rand.Intn(len(clusteringKeys) - 1) for i := 0; i < maxClusteringRels; i++ { - relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name)) - values = appendValue(t.ClusteringKeys[i].Type, p, values) - typs = append(typs, t.ClusteringKeys[i].Type) + relations = append(relations, fmt.Sprintf("%s = ?", clusteringKeys[i].Name)) + values = appendValue(clusteringKeys[i].Type, p, values) + typs = append(typs, clusteringKeys[i].Type) } } - relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name)) + relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", clusteringKeys[maxClusteringRels].Name, clusteringKeys[maxClusteringRels].Name)) values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values) - typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type) - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND ")) + typs = append(typs, clusteringKeys[maxClusteringRels].Type, clusteringKeys[maxClusteringRels].Type) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND ")) return &Stmt{ Query: query, Values: func() []interface{} { @@ -420,11 +516,20 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR values []interface{} typs []Type ) - pkNum := p.Rand.Intn(len(t.PartitionKeys)) + tableName := t.Name + partitionKeys := t.PartitionKeys + clusteringKeys := t.ClusteringKeys + view := p.Rand.Intn(len(t.MaterializedViews)) + if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 { + tableName = t.MaterializedViews[view].Name + partitionKeys = t.MaterializedViews[view].PartitionKeys + clusteringKeys = t.MaterializedViews[view].ClusteringKeys + } + pkNum := p.Rand.Intn(len(partitionKeys)) if pkNum == 0 { pkNum = 1 } - for _, pk := range t.PartitionKeys { + for _, pk := range partitionKeys { relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ","))) for i := 0; i < pkNum; i++ { values = appendValue(pk.Type, p, values) @@ -432,18 +537,18 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR } } maxClusteringRels := 0 - if len(t.ClusteringKeys) > 1 { - maxClusteringRels = p.Rand.Intn(len(t.ClusteringKeys) - 1) + if len(clusteringKeys) > 1 { + maxClusteringRels = p.Rand.Intn(len(clusteringKeys) - 1) for i := 0; i < maxClusteringRels; i++ { - relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name)) - values = appendValue(t.ClusteringKeys[i].Type, p, values) - typs = append(typs, t.ClusteringKeys[i].Type) + relations = append(relations, fmt.Sprintf("%s = ?", clusteringKeys[i].Name)) + values = appendValue(clusteringKeys[i].Type, p, values) + typs = append(typs, clusteringKeys[i].Type) } } - relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name)) - values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values) - typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type) - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND ")) + relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", clusteringKeys[maxClusteringRels].Name, clusteringKeys[maxClusteringRels].Name)) + values = appendValueRange(clusteringKeys[maxClusteringRels].Type, p, values) + typs = append(typs, clusteringKeys[maxClusteringRels].Type, clusteringKeys[maxClusteringRels].Type) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, tableName, strings.Join(relations, " AND ")) return &Stmt{ Query: query, Values: func() []interface{} {