Skip to content

Commit

Permalink
feat(spanner/spannertest): add support for adding and dropping Foreig…
Browse files Browse the repository at this point in the history
…n Keys (#6608)

Co-authored-by: rahul2393 <[email protected]>
  • Loading branch information
harshachinta and rahul2393 authored Sep 3, 2022
1 parent bff16a7 commit ccd3614
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 6 deletions.
72 changes: 66 additions & 6 deletions spanner/spannertest/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ type table struct {

// Information about the table columns.
// They are reordered on table creation so the primary key columns come first.
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
constraints []constraintInfo // constraints information of this table
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)

// Rows are stored in primary key order.
rows []row
Expand All @@ -76,6 +77,12 @@ type colInfo struct {
Alias spansql.PathExp // an alternate name for this column (result sets only)
}

// constraintInfo represents information about a constraint in a table
type constraintInfo struct {
Name spansql.ID
Constraint spansql.Constraint
}

// commitTimestampSentinel is a sentinel value for TIMESTAMP fields with allow_commit_timestamp=true.
// It is accepted, but never stored.
var commitTimestampSentinel = &struct{}{}
Expand Down Expand Up @@ -303,6 +310,11 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return status.Newf(codes.InvalidArgument, "primary key column %q not in table", col)
}
}
for _, constraint := range stmt.Constraints {
if st := t.addConstraint(constraint); st.Code() != codes.OK {
return st
}
}
t.rdw = stmt.RowDeletionPolicy
d.tables[stmt.Name] = t
return nil
Expand Down Expand Up @@ -377,6 +389,17 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return st
}
return nil
case spansql.AddConstraint:
// We do not validate if the referenced table and column exists.
if st := t.addConstraint(alt.Constraint); st.Code() != codes.OK {
return st
}
return nil
case spansql.DropConstraint:
if st := t.dropConstraint(alt); st.Code() != codes.OK {
return st
}
return nil
}
}

Expand Down Expand Up @@ -736,6 +759,24 @@ func (t *table) addColumn(cd spansql.ColumnDef, newTable bool) *status.Status {
return nil
}

func (t *table) addConstraint(alt spansql.TableConstraint) *status.Status {
t.mu.Lock()
defer t.mu.Unlock()

for _, constraint := range t.constraints {
if constraint.Name == alt.Name {
return status.Newf(codes.AlreadyExists, "constraint name %s already exists", alt.Name)
}
}

t.constraints = append(t.constraints, constraintInfo{
Name: alt.Name,
Constraint: alt.Constraint,
})

return nil
}

func (t *table) dropColumn(name spansql.ID) *status.Status {
// Only permit dropping non-key columns that aren't part of a secondary index.
// We don't support indexes, so only check that it isn't part of the primary key.
Expand Down Expand Up @@ -881,6 +922,25 @@ func (t *table) dropRowDeletionPolicy(ard spansql.DropRowDeletionPolicy) *status
return nil
}

func (t *table) dropConstraint(alt spansql.DropConstraint) *status.Status {
t.mu.Lock()
defer t.mu.Unlock()

var ci int = -1 // index of the constraint in t.constraints: constraint index
for i, constraint := range t.constraints {
if constraint.Name == alt.Name {
ci = i
}
}

if ci == -1 {
return status.Newf(codes.InvalidArgument, "unknown constraint name %q", alt.Name)
}

t.constraints = append(t.constraints[:ci], t.constraints[ci+1:]...)
return nil
}

func (t *table) insertRow(rowNum int, r row) {
t.rows = append(t.rows, nil)
copy(t.rows[rowNum+1:], t.rows[rowNum:])
Expand Down
43 changes: 43 additions & 0 deletions spanner/spannertest/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,46 @@ func TestKeyRange(t *testing.T) {
}
}
}

func TestForeignKeyAddAndAlterConstraint(t *testing.T) {
sql := `CREATE TABLE Orders (
OrderID INT64 NOT NULL,
CustomerID INT64 NOT NULL,
Quantity INT64 NOT NULL,
ProductID INT64 NOT NULL,
CONSTRAINT FK_CustomerOrder FOREIGN KEY (CustomerID) REFERENCES Customers (CustomerID)
) PRIMARY KEY (OrderID);
ALTER TABLE Orders DROP CONSTRAINT FK_CustomerOrder;`
var db database

ddl, err := spansql.ParseDDL("filename", sql)
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}
for _, stmt := range ddl.List {
if st := db.ApplyDDL(stmt); st.Code() != codes.OK {
t.Fatalf("ApplyDDL failed: %v", st)
}
}

altersql := `CREATE TABLE Orders (
OrderID INT64 NOT NULL,
CustomerID INT64 NOT NULL,
Quantity INT64 NOT NULL,
ProductID INT64 NOT NULL,
CONSTRAINT FK_ProductOrder FOREIGN KEY (ProductID) REFERENCES Product (ProductID)
) PRIMARY KEY (OrderID);
ALTER TABLE Orders ADD CONSTRAINT FK_CustomerOrder FOREIGN KEY (CustomerID) REFERENCES Customers (CustomerID);
ALTER TABLE Orders DROP CONSTRAINT FK_ProductOrder;`
var db1 database

ddl1, err1 := spansql.ParseDDL("filename", altersql)
if err1 != nil {
t.Fatalf("%s: Bad DDL", err1)
}
for _, stmt := range ddl1.List {
if st := db1.ApplyDDL(stmt); st.Code() != codes.OK {
t.Fatalf("ApplyDDL failed: %v", st)
}
}
}

0 comments on commit ccd3614

Please sign in to comment.