Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/merge strategies #10

Merged
merged 4 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 2 additions & 52 deletions clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package datapasta
import (
"context"
"fmt"
"log"
"strings"
)

Expand Down Expand Up @@ -147,61 +146,12 @@ func Download(ctx context.Context, db Database, startTable, startColumn string,
return nil, debugging, err
}
}

return cloneInOrder, debugging, nil
}

// Upload uploads, in naive order, every record in a dump.
// It mutates the elements of `dump`, so you can track changes (for example new primary keys).
func Upload(ctx context.Context, db Database, dump DatabaseDump) error {
fkm := NewForeignKeyMapper(db)
return db.Insert(fkm, dump...)
}

type ForeignKeyMapper func(row map[string]any) func()

// NewForeignKeyMapper returns a function that will update foreign key references in a row to their new values.
// each update returns a function that must be called after the row has been updated with new primary keys.
func NewForeignKeyMapper(db Database) ForeignKeyMapper {
changes := make(map[string]map[any]any)

for _, fk := range db.ForeignKeys() {
changes[fk.BaseTable+"."+fk.BaseCol] = map[any]any{}
}

return func(row map[string]any) func() {
table := row[DumpTableKey].(string)
for k, v := range row {
for _, fk := range db.ForeignKeys() {
if fk.ReferencingTable != table || fk.ReferencingCol != k || v == nil || changes[fk.BaseTable+`.`+fk.BaseCol] == nil {
continue
}

newID, ok := changes[fk.BaseTable+`.`+fk.BaseCol][v]
if !ok {
log.Printf("unable to find mapped id for %s[%s]=%v in %s", table, k, v, fk.BaseTable)
} else {
row[k] = newID
}
}
}

copy := make(map[string]any, len(row))
for k, v := range row {
// does anyone care about this value?
if changes[table+`.`+k] == nil {
continue
}
copy[k] = v
}

return func() {
table := row[DumpTableKey].(string)
for k, v := range row {
if changes[table+"."+k] == nil {
continue
}
changes[table+"."+k][copy[k]] = v
}
}
}
return db.Insert(dump...)
}
20 changes: 15 additions & 5 deletions clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,38 @@ func (d testDB) SelectMatchingRows(tname string, conds map[string][]any) ([]map[
return nil, fmt.Errorf("no mock for %s where %#v", tname, conds)
}

func (d testDB) PrimaryKeys() map[string]string {
return nil
}

func (d testDB) InsertRecord(map[string]any) (any, error) { return nil, nil }

// apply the updates from the cols to the row
func (d testDB) Update(id datapasta.RecordID, cols map[string]any) error { return nil }

// delete the row
func (d testDB) Delete(id datapasta.RecordID) error { return nil }

func (d testDB) Mapping() ([]datapasta.Mapping, error) { return nil, nil }

// upload a batch of records
func (d testDB) Insert(fkm datapasta.ForeignKeyMapper, records ...map[string]any) error {
func (d testDB) Insert(records ...map[string]any) error {
for _, m := range records {
finish := fkm(m)
d.Logf("inserting %#v", m)

if m[datapasta.DumpTableKey] == "company" && m["id"] == 10 {
if m["api_key"] != "obfuscated" {
d.Errorf("didn't obfuscated company 9's api key, got %s", m["api_key"])
}
m["id"] = 11
finish()
continue
}
if m[datapasta.DumpTableKey] == "factory" && m["id"] == 23 {
m["id"] = 12
finish()
continue
}
if m[datapasta.DumpTableKey] == "product" && m["id"] == 5 {
m["id"] = 13
finish()
continue
}
return fmt.Errorf("unexpected insert: %#v", m)
Expand Down
4 changes: 1 addition & 3 deletions integrations/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func TestDatabaseImplementation(t *testing.T, db datapasta.Database, startTable,
old[k] = v
}

fkm := datapasta.NewForeignKeyMapper(db)
if err := db.Insert(fkm, found[0]); err != nil {
if err := db.Insert(found[0]); err != nil {
t.Fatalf("error inserting row: %s", err.Error())
return
}
Expand Down Expand Up @@ -62,4 +61,3 @@ func TestDatabaseImplementation(t *testing.T, db datapasta.Database, startTable,
return
}
}

61 changes: 54 additions & 7 deletions interface.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,77 @@
package datapasta

import (
"fmt"
"log"
)

// Database is the abstraction between the cloning tool and the database.
// The NewPostgres.NewClient method gives you an implementation for Postgres.
type Database interface {

// SelectMatchingRows must return unseen records.
// a Database can't be reused between clones, because it must do internal deduping.
// `conds` will be a map of columns and the values they can have.
SelectMatchingRows(tname string, conds map[string][]any) ([]map[string]any, error)


// insert one record, returning the new id
InsertRecord(record map[string]any) (any, error)

// apply the updates from the cols to the row
Update(id RecordID, cols map[string]any) error

// delete the row
Delete(id RecordID) error

// Insert uploads a batch of records.
// any changes to the records (such as newly generated primary keys) should mutate the record map directly.
// a Destination can't generally be reused between clones, as it may be inside a transaction.
// it's recommended that callers use a Database that wraps a transaction.
Insert(mapper ForeignKeyMapper, records ...map[string]any) error

//
// the records will have primary keys which must be handled.
// the Database is responsible for exposing the resulting primary key mapping in some manner.
Insert(records ...map[string]any) error

// Mapping must return whatever mapping has been created by prior Inserts.
// the implementation may internally choose to track this in the database or in memory.
Mapping() ([]Mapping, error)

// get foriegn key mapping
ForeignKeys() []ForeignKey

// get primary key mapping
PrimaryKeys() map[string]string
}

// ForeignKey contains every RERENCING column and the BASE column it refers to.
// This is used to recurse the database as a graph.
// This is used to recurse the database as a graph.
// Database implementations must provide a complete list of references.
type ForeignKey struct {
BaseTable string `json:"base_table"`
BaseCol string `json:"base_col"`
ReferencingTable string `json:"referencing_table"`
ReferencingCol string `json:"referencing_col"`
}
}

type RecordID struct {
Table string
PrimaryKey any
}

func (r RecordID) String() string {
return fmt.Sprintf(`%s(%v)`, r.Table, r.PrimaryKey)
}

func GetRowIdentifier(pks map[string]string, row map[string]any) RecordID {
table := row[DumpTableKey].(string)
pk, ok := row[pks[table]]
if !ok {
panic("unable to get row identifier")
}
return RecordID{Table: table, PrimaryKey: pk}
}

type Mapping struct {
RecordID
OriginalID any
}

var LogFunc = log.Printf
Loading