Skip to content

Commit

Permalink
fix(gemini): make it properly work for UDTs
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-li authored and Dmitry Kropachev committed Jun 15, 2023
1 parent ad3f856 commit 1825598
Show file tree
Hide file tree
Showing 16 changed files with 123 additions and 60 deletions.
8 changes: 4 additions & 4 deletions cmd/gemini/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
{
"name": "col0",
"type": {
"types": {
"type_name": "udt_672245080",
"frozen": true,
"coltypes": {
"udt_672245080_0": "ascii",
"udt_672245080_1": "boolean",
"udt_672245080_2": "bigint",
"udt_672245080_3": "blob"
},
"type_name": "udt_672245080",
"frozen": true
}
}
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/joberror/joberror.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type JobError struct {
Timestamp time.Time `json:"timestamp"`
Message string `json:"message"`
Query string `json:"query"`
StmtType string `json:"stmt-type"`
}

type ErrorList struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/jobs/gen_ddl_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func genAddColumnStmt(t *typedef.Table, keyspace string, column *typedef.ColumnD
},
})
return &typedef.Stmts{
List: stmts,
List: stmts,
QueryType: typedef.AddColumnStatementType,
PostStmtHook: func() {
t.Columns = append(t.Columns, column)
t.ResetQueryCache()
Expand Down Expand Up @@ -124,7 +125,8 @@ func genDropColumnStmt(t *typedef.Table, keyspace string, column *typedef.Column
},
})
return &typedef.Stmts{
List: stmts,
List: stmts,
QueryType: typedef.DropColumnStatementType,
PostStmtHook: func() {
t.Columns = t.Columns.Remove(column)
t.ResetQueryCache()
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/gen_mutate_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func genInsertJSONStmt(
StmtCache: &typedef.StmtCache{
Query: builder,
Types: []typedef.Type{typedef.TYPE_TEXT},
QueryType: typedef.InsertStatement,
QueryType: typedef.InsertJSONStatementType,
},
ValuesWithToken: valuesWithToken,
Values: []interface{}{string(jsonString)},
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func validationJob(
if err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger); err != nil {
globalStatus.AddReadError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: stmt.QueryType.ToString(),
Message: "Validation failed: " + err.Error(),
Query: stmt.PrettyCQL(),
})
Expand Down Expand Up @@ -341,6 +342,7 @@ func ddl(
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: ddlStmts.QueryType.ToString(),
Message: "DDL failed: " + err.Error(),
Query: ddlStmt.PrettyCQL(),
})
Expand Down Expand Up @@ -394,6 +396,7 @@ func mutation(
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: mutateStmt.QueryType.ToString(),
Message: "Mutation failed: " + err.Error(),
Query: mutateStmt.PrettyCQL(),
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/querycache/querycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func genInsertStmtCache(
return &typedef.StmtCache{
Query: builder,
Types: allTypes,
QueryType: typedef.InsertStatement,
QueryType: typedef.InsertStatementType,
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func genUpdateStmtCache(s *typedef.Schema, t *typedef.Table) *typedef.StmtCache
return &typedef.StmtCache{
Query: builder,
Types: allTypes,
QueryType: typedef.Updatetatement,
QueryType: typedef.UpdateStatementType,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func ignore(err error) bool {
return true
}
//nolint:errorlint
switch err {
case context.Canceled, context.DeadlineExceeded:
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return true
default:
return false
Expand Down
9 changes: 9 additions & 0 deletions pkg/typedef/bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func (ct *BagType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{}
return []interface{}{out}
}

func (ct *BagType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
count := r.Intn(9) + 1
out := make([]interface{}, count)
for i := 0; i < count; i++ {
out[i] = ct.Type.GenJSONValue(r, p)
}
return out
}

func (ct *BagType) LenValue() int {
return 1
}
Expand Down
27 changes: 1 addition & 26 deletions pkg/typedef/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package typedef

import (
"encoding/json"
"fmt"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand Down Expand Up @@ -96,31 +95,7 @@ func (c Columns) Remove(column *ColumnDef) Columns {

func (c Columns) ToJSONMap(values map[string]interface{}, r *rand.Rand, p *PartitionRangeConfig) map[string]interface{} {
for _, k := range c {
switch t := k.Type.(type) {
case SimpleType:
if t != TYPE_BLOB {
values[k.Name] = t.GenValue(r, p)[0]
continue
}
v, ok := t.GenValue(r, p)[0].(string)
if ok {
values[k.Name] = "0x" + v
}
case *TupleType:
vv := t.GenValue(r, p)
for i, val := range vv {
if t.Types[i] == TYPE_BLOB {
v, ok := val.(string)
if ok {
v = "0x" + v
}
vv[i] = v
}
}
values[k.Name] = vv
default:
panic(fmt.Sprintf("unknown type: %s", t.Name()))
}
values[k.Name] = k.Type.GenJSONValue(r, p)
}
return values
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/typedef/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ const (
SelectByIndexStatementType
SelectFromMaterializedViewStatementType
DeleteStatementType
InsertStatement
Updatetatement
InsertStatementType
InsertJSONStatementType
UpdateStatementType
AlterColumnStatementType
DropColumnStatementType
AddColumnStatementType
)

//nolint:revive
Expand Down
1 change: 1 addition & 0 deletions pkg/typedef/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Type interface {
CQLHolder() string
CQLPretty(string, []interface{}) (string, int)
GenValue(*rand.Rand, *PartitionRangeConfig) []interface{}
GenJSONValue(*rand.Rand, *PartitionRangeConfig) interface{}
LenValue() int
Indexable() bool
CQLType() gocql.TypeInfo
Expand Down
50 changes: 29 additions & 21 deletions pkg/typedef/simple_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,49 +156,57 @@ func (st SimpleType) Indexable() bool {
return st != TYPE_DURATION
}

func (st SimpleType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
if st == TYPE_BLOB {
ln := r.Intn(p.MaxBlobLength) + p.MinBlobLength
return "0x" + hex.EncodeToString([]byte(utils.RandString(r, ln)))
}
return st.genValue(r, p)
}

func (st SimpleType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} {
var val interface{}
return []interface{}{st.genValue(r, p)}
}

func (st SimpleType) genValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
switch st {
case TYPE_ASCII, TYPE_TEXT, TYPE_VARCHAR:
ln := r.Intn(p.MaxStringLength) + p.MinStringLength
val = utils.RandString(r, ln)
return utils.RandString(r, ln)
case TYPE_BLOB:
ln := r.Intn(p.MaxBlobLength) + p.MinBlobLength
val = hex.EncodeToString([]byte(utils.RandString(r, ln)))
return hex.EncodeToString([]byte(utils.RandString(r, ln)))
case TYPE_BIGINT:
val = r.Int63()
return r.Int63()
case TYPE_BOOLEAN:
val = r.Int()%2 == 0
return r.Int()%2 == 0
case TYPE_DATE:
val = utils.RandDate(r)
return utils.RandDate(r)
case TYPE_TIME:
val = utils.RandTime(r).UnixNano()
return utils.RandTime(r).UnixNano()
case TYPE_TIMESTAMP:
val = utils.RandTime(r)
return utils.RandTime(r)
case TYPE_DECIMAL:
val = inf.NewDec(r.Int63(), 3)
return inf.NewDec(r.Int63(), 3)
case TYPE_DOUBLE:
val = r.Float64()
return r.Float64()
case TYPE_DURATION:
val = (time.Minute * time.Duration(r.Intn(100))).String()
return (time.Minute * time.Duration(r.Intn(100))).String()
case TYPE_FLOAT:
val = r.Float32()
return r.Float32()
case TYPE_INET:
val = net.ParseIP(utils.RandIPV4Address(r, r.Intn(255), 2)).String()
return net.ParseIP(utils.RandIPV4Address(r, r.Intn(255), 2)).String()
case TYPE_INT:
val = r.Int31()
return r.Int31()
case TYPE_SMALLINT:
val = int16(r.Int31())
return int16(r.Int31())
case TYPE_TIMEUUID, TYPE_UUID:
val = utils.UUIDFromTime(r)
return utils.UUIDFromTime(r)
case TYPE_TINYINT:
val = int8(r.Int31())
return int8(r.Int31())
case TYPE_VARINT:
val = big.NewInt(r.Int63())
return big.NewInt(r.Int63())
default:
panic(fmt.Sprintf("generate value: not supported type %s", st))
}
return []interface{}{
val,
}
}
8 changes: 8 additions & 0 deletions pkg/typedef/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ func (t *TupleType) Indexable() bool {
return true
}

func (t *TupleType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
out := make([]interface{}, 0, len(t.Types))
for _, tp := range t.Types {
out = append(out, tp.GenJSONValue(r, p))
}
return out
}

func (t *TupleType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} {
out := make([]interface{}, 0, len(t.Types))
for _, tp := range t.Types {
Expand Down
30 changes: 30 additions & 0 deletions pkg/typedef/typedef.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
type Stmts struct {
PostStmtHook func()
List []*Stmt
QueryType StatementType
}

type StmtCache struct {
Expand Down Expand Up @@ -87,6 +88,35 @@ func (s *Stmt) PrettyCQL() string {

type StatementType uint8

func (st StatementType) ToString() string {
switch st {
case SelectStatementType:
return "SelectStatement"
case SelectRangeStatementType:
return "SelectRangeStatement"
case SelectByIndexStatementType:
return "SelectByIndexStatement"
case SelectFromMaterializedViewStatementType:
return "SelectFromMaterializedViewStatement"
case DeleteStatementType:
return "DeleteStatement"
case InsertStatementType:
return "InsertStatement"
case InsertJSONStatementType:
return "InsertJSONStatement"
case UpdateStatementType:
return "UpdateStatement"
case AlterColumnStatementType:
return "AlterColumnStatement"
case DropColumnStatementType:
return "DropColumnStatement"
case AddColumnStatementType:
return "AddColumnStatement"
default:
panic(fmt.Sprintf("unknown statement type %d", st))
}
}

func (st StatementType) PossibleAsyncOperation() bool {
switch st {
case SelectByIndexStatementType, SelectFromMaterializedViewStatementType:
Expand Down
16 changes: 16 additions & 0 deletions pkg/typedef/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ func (mt *MapType) CQLPretty(query string, value []interface{}) (string, int) {
return strings.Replace(query, "?", vv, 1), 1
}

func (mt *MapType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
count := r.Intn(9) + 1
vals := make(map[interface{}]interface{})
for i := 0; i < count; i++ {
vals[mt.KeyType.GenJSONValue(r, p)] = mt.ValueType.GenJSONValue(r, p)
}
return vals
}

func (mt *MapType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} {
count := r.Intn(9) + 1
vals := make(map[interface{}]interface{})
Expand Down Expand Up @@ -179,6 +188,13 @@ func (ct *CounterType) CQLPretty(query string, value []interface{}) (string, int
return strings.Replace(query, "?", fmt.Sprintf("%d", value[0]), 1), 1
}

func (ct *CounterType) GenJSONValue(r *rand.Rand, _ *PartitionRangeConfig) interface{} {
if utils.UnderTest {
return r.Int63()
}
return atomic.AddInt64(&ct.Value, 1)
}

func (ct *CounterType) GenValue(r *rand.Rand, _ *PartitionRangeConfig) []interface{} {
if utils.UnderTest {
return []interface{}{r.Int63()}
Expand Down
8 changes: 8 additions & 0 deletions pkg/typedef/udt.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func (t *UDTType) Indexable() bool {
return true
}

func (t *UDTType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} {
vals := make(map[string]interface{})
for name, typ := range t.Types {
vals[name] = typ.GenJSONValue(r, p)
}
return vals
}

func (t *UDTType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} {
vals := make(map[string]interface{})
for name, typ := range t.Types {
Expand Down

0 comments on commit 1825598

Please sign in to comment.