Skip to content

Commit

Permalink
Check constraint backend (#9)
Browse files Browse the repository at this point in the history
Backend Support for Check Constraint
  • Loading branch information
taherkl authored and akashthawaitcc committed Dec 26, 2024
1 parent cb27339 commit 74239c2
Show file tree
Hide file tree
Showing 23 changed files with 831 additions and 101 deletions.
1 change: 1 addition & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ const (
ForeignKeyActionNotSupported
NumericPKNotSupported
DefaultValueError
TypeMismatch
)

const (
Expand Down
5 changes: 5 additions & 0 deletions internal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func GenerateForeignkeyId() string {
func GenerateIndexesId() string {
return GenerateId("i")
}

func GenerateCheckConstrainstId() string {
return GenerateId("ck")
}

func GenerateRuleId() string {
return GenerateId("r")
}
Expand Down
13 changes: 13 additions & 0 deletions internal/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ func ToSpannerIndexName(conv *Conv, srcIndexName string) string {
return getSpannerValidName(conv, srcIndexName)
}

// Note that the check constraints names in spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerCheckConstraintName(conv *Conv, srcCheckConstraintName string) string {
return getSpannerValidName(conv, srcCheckConstraintName)
}

func GetSpannerValidExpression(cks []ddl.CheckConstraint) []ddl.CheckConstraint {
// TODO validate the check constraints data with batch verification then send back
return cks
}

// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
Expand Down
7 changes: 7 additions & 0 deletions internal/reports/report_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ func buildTableReportBody(conv *internal.Conv, tableId string, issues map[string
Description: fmt.Sprintf("%s for table '%s' column '%s'", IssueDB[i].Brief, conv.SpSchema[tableId].Name, spColName),
}
l = append(l, toAppend)
case internal.TypeMismatch:
toAppend := Issue{
Category: IssueDB[i].Category,
Description: fmt.Sprintf("Table '%s': Type mismatch in '%s'column affecting check constraints. Verify data type compatibility with constraint logic", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[colId].Name),
}
l = append(l, toAppend)

default:
toAppend := Issue{
Category: IssueDB[i].Category,
Expand Down
26 changes: 17 additions & 9 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ import (

// Table represents a database table.
type Table struct {
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
Indexes []Index
Id string
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
CheckConstraints []CheckConstraint
Indexes []Index
Id string
}

// Column represents a database column.
Expand Down Expand Up @@ -77,6 +78,13 @@ type ForeignKey struct {
Id string
}

// CheckConstraints represents a check constraint defined in the schema.
type CheckConstraint struct {
Name string
Expr string
Id string
}

// Key respresents a primary key or index key.
type Key struct {
ColId string
Expand Down
23 changes: 12 additions & 11 deletions sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type InfoSchema interface {
GetColumns(conv *internal.Conv, table SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)
GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
GetRowCount(table SchemaAndName) (int64, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, map[string][]string, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error)
GetForeignKeys(conv *internal.Conv, table SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
Expand Down Expand Up @@ -187,7 +187,7 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
var t schema.Table
fmt.Println("processing schema for table", table)
tblId := internal.GenerateTableId()
primaryKeys, constraints, err := infoSchema.GetConstraints(conv, table)
primaryKeys, checkConstraints, constraints, err := infoSchema.GetConstraints(conv, table)
if err != nil {
return t, fmt.Errorf("couldn't get constraints for table %s.%s: %s", table.Schema, table.Name, err)
}
Expand Down Expand Up @@ -217,15 +217,16 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
schemaPKeys = append(schemaPKeys, schema.Key{ColId: colNameIdMap[k]})
}
t = schema.Table{
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
Indexes: indexes,
ForeignKeys: foreignKeys}
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
CheckConstraints: checkConstraints,
Indexes: indexes,
ForeignKeys: foreignKeys}
return t, nil
}

Expand Down
32 changes: 24 additions & 8 deletions sources/common/toddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
}
comment := "Spanner schema for source table " + quoteIfNeeded(srcTable.Name)
conv.SpSchema[srcTable.Id] = ddl.CreateTable{
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
CheckConstraints: cvtCheckConstraint(conv, srcTable.CheckConstraints),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
return nil
}

Expand Down Expand Up @@ -234,6 +235,21 @@ func cvtForeignKeys(conv *internal.Conv, spTableName string, srcTableId string,
return spKeys
}

func cvtCheckConstraint(conv *internal.Conv, srcKeys []schema.CheckConstraint) []ddl.CheckConstraint {
var spcks []ddl.CheckConstraint

for _, cks := range srcKeys {
spcks = append(spcks, ddl.CheckConstraint{
Id: cks.Id,
Name: internal.ToSpannerCheckConstraintName(conv, cks.Name),
Expr: cks.Expr,
})

}

return internal.GetSpannerValidExpression(spcks)
}

func CvtForeignKeysHelper(conv *internal.Conv, spTableName string, srcTableId string, srcKey schema.ForeignKey, isRestore bool) (ddl.Foreignkey, error) {
if len(srcKey.ColIds) != len(srcKey.ReferColumnIds) {
conv.Unexpected(fmt.Sprintf("ConvertForeignKeys: ColIds and referColumns don't have the same lengths: len(columns)=%d, len(referColumns)=%d for source tableId: %s, referenced table: %s", len(srcKey.ColIds), len(srcKey.ReferColumnIds), srcTableId, srcKey.ReferTableId))
Expand Down
29 changes: 29 additions & 0 deletions sources/common/toddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,33 @@ func TestSpannerSchemaApplyExpressions(t *testing.T) {
assert.Equal(t, tc.expectedConv, tc.conv)
})
}
func Test_cvtCheckContraint(t *testing.T) {

conv := internal.MakeConv()
srcSchema := []schema.CheckConstraint{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
spSchema := []ddl.CheckConstraint{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
result := cvtCheckConstraint(conv, srcSchema)
assert.Equal(t, spSchema, result)
}
6 changes: 3 additions & 3 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)
return *result.Table.ItemCount, err
}

func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error) {
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, checkConstraints []schema.CheckConstraint, constraints map[string][]string, err error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(table.Name),
}
result, err := isi.DynamoClient.DescribeTable(input)
if err != nil {
return primaryKeys, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
return primaryKeys, checkConstraints, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
}

// Primary keys.
for _, i := range result.Table.KeySchema {
primaryKeys = append(primaryKeys, *i.AttributeName)
}
return primaryKeys, constraints, nil
return primaryKeys, checkConstraints, constraints, nil
}

func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) {
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func TestInfoSchemaImpl_GetConstraints(t *testing.T) {
dySchema := common.SchemaAndName{Name: "test"}
conv := internal.MakeConv()
isi := InfoSchemaImpl{client, nil, 10}
primaryKeys, constraints, err := isi.GetConstraints(conv, dySchema)
primaryKeys, _, constraints, err := isi.GetConstraints(conv, dySchema)
assert.Nil(t, err)

pKeys := []string{"a", "b"}
Expand Down Expand Up @@ -705,7 +705,7 @@ func TestInfoSchemaImpl_GetColumns(t *testing.T) {
client := &mockDynamoClient{
scanOutputs: scanOutputs,
}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}

isi := InfoSchemaImpl{client, nil, 10}

Expand Down
116 changes: 83 additions & 33 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,6 @@ func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAnd
continue
}
ignored := schema.Ignored{}
for _, c := range constraints[colName] {
// c can be UNIQUE, PRIMARY KEY, FOREIGN KEY or CHECK
// We've already filtered out PRIMARY KEY.
switch c {
case "CHECK":
ignored.Check = true
case "FOREIGN KEY", "PRIMARY KEY", "UNIQUE":
// Nothing to do here -- these are all handled elsewhere.
}
}
ignored.Default = colDefault.Valid
colId := internal.GenerateColumnId()
if colExtra.String == "auto_increment" {
Expand Down Expand Up @@ -250,38 +240,98 @@ func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAnd
// other constraints. Note: we need to preserve ordinal order of
// columns in primary key constraints.
// Note that foreign key constraints are handled in getForeignKeys.
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, map[string][]string, error) {
q := `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA AND t.TABLE_NAME=k.TABLE_NAME
WHERE k.TABLE_SCHEMA = ? AND k.TABLE_NAME = ? ORDER BY k.ordinal_position;`
rows, err := isi.Db.Query(q, table.Schema, table.Name)
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error) {
tableExists, err := isi.isCheckConstraintsTablePresent()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

finalQuery := isi.getQuery(tableExists)
rows, err := isi.Db.Query(finalQuery, table.Schema, table.Name)
if err != nil {
return nil, nil, nil, err
}
defer rows.Close()

var primaryKeys []string
var col, constraint string
var checkKeys []schema.CheckConstraint
m := make(map[string][]string)

for rows.Next() {
err := rows.Scan(&col, &constraint)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
if col == "" || constraint == "" {
conv.Unexpected(fmt.Sprintf("Got empty col or constraint"))
if err := isi.processRow(rows, tableExists, conv, &primaryKeys, &checkKeys, m); err != nil {
continue
}
switch constraint {
case "PRIMARY KEY":
primaryKeys = append(primaryKeys, col)
default:
m[col] = append(m[col], constraint)
}
}
return primaryKeys, m, nil

return primaryKeys, checkKeys, m, nil
}

// checkCheckConstraintsTableExists checks if the CHECK_CONSTRAINTS table exists.
func (isi InfoSchemaImpl) isCheckConstraintsTablePresent() (bool, error) {
var tableExistsCount int
checkQuery := `SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA' AND TABLE_NAME = 'CHECK_CONSTRAINTS';`
err := isi.Db.QueryRow(checkQuery).Scan(&tableExistsCount)
if err != nil {
return false, err
}
return tableExistsCount > 0, nil
}

// getQuery returns the appropriate SQL query based on the existence of CHECK_CONSTRAINTS.
func (isi InfoSchemaImpl) getQuery(tableExists bool) string {
if tableExists {
return `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE, COALESCE(c.CHECK_CLAUSE, '') AS CHECK_CLAUSE
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME
AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA
AND t.TABLE_NAME = k.TABLE_NAME
LEFT JOIN INFORMATION_SCHEMA.CHECK_CONSTRAINTS AS c
ON t.CONSTRAINT_NAME = c.CONSTRAINT_NAME
WHERE t.TABLE_SCHEMA = ?
AND t.TABLE_NAME = ?
ORDER BY k.ORDINAL_POSITION;`
}
return `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME
AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA
AND t.TABLE_NAME = k.TABLE_NAME
WHERE t.TABLE_SCHEMA = ?
AND t.TABLE_NAME = ?
ORDER BY k.ORDINAL_POSITION;`
}

// processRow handles scanning and processing of a database row for GetConstraints.
func (isi InfoSchemaImpl) processRow(
rows *sql.Rows, tableExists bool, conv *internal.Conv, primaryKeys *[]string,
checkKeys *[]schema.CheckConstraint, m map[string][]string) error {

var col, constraintType, checkClause string
err := rows.Scan(&col, &constraintType, &checkClause)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
return err
}

if col == "" || constraintType == "" {
conv.Unexpected("Got empty column or constraint type")
return nil
}

switch constraintType {
case "PRIMARY KEY":
*primaryKeys = append(*primaryKeys, col)
case "CHECK":
checkClause = strings.ReplaceAll(checkClause, "_utf8mb4\\", "")
checkClause = strings.ReplaceAll(checkClause, "\\", "")
constraintName := fmt.Sprintf("%s_check", col)
*checkKeys = append(*checkKeys, schema.CheckConstraint{Name: constraintName, Expr: checkClause, Id: internal.GenerateCheckConstrainstId()})
default:
m[col] = append(m[col], constraintType)
}
return nil
}

// GetForeignKeys return list all the foreign keys constraints.
Expand Down
Loading

0 comments on commit 74239c2

Please sign in to comment.