Skip to content

Commit

Permalink
finish implementing mssql db and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-c committed May 19, 2019
1 parent 50e9729 commit ae500db
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 139 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ cli/cli
cli/migrate
.coverage
.godoc.pid
vendor/
vendor/
.vscode/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE users (
user_id integer unique,
name varchar(40),
email varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE users DROP COLUMN IF EXISTS city;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE users ADD city varchar(100);


Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS users_email_index;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE UNIQUE INDEX users_email_index ON users (email);

-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS books;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE books (
user_id integer,
name varchar(40),
author varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS movies;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE movies (
user_id integer,
name varchar(40),
director varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
185 changes: 48 additions & 137 deletions database/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"io/ioutil"
nurl "net/url"

_ "github.com/denisenkom/go-mssqldb" // mssql support
mssql "github.com/denisenkom/go-mssqldb" // mssql support
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
)

func init() {
Expand All @@ -29,6 +30,13 @@ var (
ErrDatabaseDirty = fmt.Errorf("database is dirty")
)

var lockErrorMap = map[int]string{
-1: "The lock request timed out.",
-2: "The lock request was canceled.",
-3: "The lock request was chosen as a deadlock victim.",
-999: "Parameter validation or other call error.",
}

// Config for database
type Config struct {
MigrationsTable string
Expand All @@ -47,8 +55,7 @@ type MSSQL struct {
config *Config
}

// WithInstance returns a database instance from an already create database connection
// TODO: WithInstance double check that docs are correct for this function
// WithInstance returns a database instance from an already created database connection
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
Expand Down Expand Up @@ -105,10 +112,6 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
return ss, nil
}

func dbConnectionString(host, port string) string {
return fmt.Sprintf("postgres://postgres@%s:%s/postgres?sslmode=disable", host, port)
}

// Open a connection to the database
func (ss *MSSQL) Open(url string) (database.Driver, error) {
purl, err := nurl.Parse(url)
Expand All @@ -122,14 +125,12 @@ func (ss *MSSQL) Open(url string) (database.Driver, error) {
}

migrationsTable := purl.Query().Get("x-migrations-table")
if len(migrationsTable) == 0 {
migrationsTable = DefaultMigrationsTable
}

px, err := WithInstance(db, &Config{
DatabaseName: purl.Path,
MigrationsTable: migrationsTable,
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,22 +159,20 @@ func (ss *MSSQL) Lock() error {
return err
}

// start a transaction
query := "BEGIN TRANSACTION"
if _, err := ss.conn.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Err: "get lock transaction", Query: []byte(query)}
}

// This will either obtain the lock immediately and return true,
// or return false if the lock cannot be acquired immediately.
// MS Docs: sp_getapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-2017
query = `EXEC sp_getapplock @Resource = ?, @LockMode = 'Update'`
if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil {
query := `EXEC sp_getapplock @Resource = ?, @LockMode = 'Update', @LockOwner = 'Session', @LockTimeout = 0`

var status mssql.ReturnStatus
if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 {
ss.isLocked = true
return nil
} else if err != nil {
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
} else {
return &database.Error{Err: fmt.Sprintf("try lock failed with error %v", lockErrorMap[int(status)]), Query: []byte(query)}
}

ss.isLocked = true
return nil
}

// Unlock froms the migration lock from the database
Expand All @@ -188,18 +187,12 @@ func (ss *MSSQL) Unlock() error {
}

// MS Docs: sp_releaseapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-2017
query := `EXEC sp_releaseapplock @Resource = ?`
query := `EXEC sp_releaseapplock @Resource = ?, @LockOwner = 'Session'`
if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
ss.isLocked = false

// end lock transaction
query = "COMMIT"
if _, err := ss.conn.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Err: "commit lock transaction", Query: []byte(query)}
}

return nil
}

Expand All @@ -213,78 +206,25 @@ func (ss *MSSQL) Run(migration io.Reader) error {
// run migration
query := string(migr[:])
if _, err := ss.conn.ExecContext(context.Background(), query); err != nil {
// // FIXME: check for mssql error here
// if pgErr, ok := err.(*pq.Error); ok {
// var line uint
// var col uint
// var lineColOK bool
// if pgErr.Position != "" {
// if pos, err := strconv.ParseUint(pgErr.Position, 10, 64); err == nil {
// line, col, lineColOK = computeLineFromPos(query, int(pos))
// }
// }
// message := fmt.Sprintf("migration failed: %s", pgErr.Message)
// if lineColOK {
// message = fmt.Sprintf("%s (column %d)", message, col)
// }
// if pgErr.Detail != "" {
// message = fmt.Sprintf("%s, %s", message, pgErr.Detail)
// }
// return database.Error{OrigErr: err, Err: message, Query: migr, Line: line}
// }
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
}

return nil
}

// func computeLineFromPos(s string, pos int) (line uint, col uint, ok bool) {
// // replace crlf with lf
// s = strings.Replace(s, "\r\n", "\n", -1)
// // pg docs: pos uses index 1 for the first character, and positions are measured in characters not bytes
// runes := []rune(s)
// if pos > len(runes) {
// return 0, 0, false
// }
// sel := runes[:pos]
// line = uint(runesCount(sel, newLine) + 1)
// col = uint(pos - 1 - runesLastIndex(sel, newLine))
// return line, col, true
// }

const newLine = '\n'

func runesCount(input []rune, target rune) int {
var count int
for _, r := range input {
if r == target {
count++
}
}
return count
}

func runesLastIndex(input []rune, target rune) int {
for i := len(input) - 1; i >= 0; i-- {
if input[i] == target {
return i
}
}
return -1
}

// SetVersion for the current database
func (ss *MSSQL) SetVersion(version int, dirty bool) error {

tx := ss.db
// tx, err := ss.conn.BeginTx(context.Background(), &sql.TxOptions{})
// if err != nil {
// return &database.Error{OrigErr: err, Err: "transaction start failed"}
// }
tx, err := ss.conn.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
}

query := `TRUNCATE TABLE "` + ss.config.MigrationsTable + `"`
if _, err := tx.Exec(query); err != nil {
// tx.Rollback()
if errRollback := tx.Rollback(); errRollback != nil {
err = multierror.Append(err, errRollback)
}
return &database.Error{OrigErr: err, Query: []byte(query)}
}

Expand All @@ -295,14 +235,16 @@ func (ss *MSSQL) SetVersion(version int, dirty bool) error {
}
query = `INSERT INTO "` + ss.config.MigrationsTable + `" (version, dirty) VALUES ($1, $2)`
if _, err := tx.Exec(query, version, dirtyBit); err != nil {
// tx.Rollback()
if errRollback := tx.Rollback(); errRollback != nil {
err = multierror.Append(err, errRollback)
}
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}

// if err := tx.Commit(); err != nil {
// return &database.Error{OrigErr: err, Err: "transaction commit failed"}
// }
if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
}

return nil
}
Expand All @@ -317,11 +259,6 @@ func (ss *MSSQL) Version() (version int, dirty bool, err error) {

case err != nil:
// FIXME: convert to MSSQL error
// if e, ok := err.(*pq.Error); ok {
// if e.Code.Name() == "undefined_table" {
// return database.NilVersion, false, nil
// }
// }
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}

default:
Expand Down Expand Up @@ -361,50 +298,24 @@ func (ss *MSSQL) Drop() error {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

if err := ss.ensureVersionTable(); err != nil {
return err
}

return nil

// // select all tables in current schema
// query := `SELECT t.name FROM sys.tables AS t INNER JOIN sys.schemas AS s ON t.[schema_id] = s.[schema_id] WHERE s.name = N'dbo';`
// tables, err := ss.conn.QueryContext(context.Background(), query)
// if err != nil {
// return &database.Error{OrigErr: err, Query: []byte(query)}
// }
// defer tables.Close()

// // delete one table after another
// tableNames := make([]string, 0)
// for tables.Next() {
// var tableName string
// if err := tables.Scan(&tableName); err != nil {
// return err
// }
// if len(tableName) > 0 {
// tableNames = append(tableNames, tableName)
// }
// }

// if len(tableNames) > 0 {
// // delete one by one ...
// for _, t := range tableNames {
// query = `DROP TABLE IF EXISTS "` + t + `"`
// if _, err := ss.conn.ExecContext(context.Background(), query); err != nil {
// return &database.Error{OrigErr: err, Query: []byte(query)}
// }
// }

// if err := ss.ensureVersionTable(); err != nil {
// return err
// }
// }

// return nil
}

func (ss *MSSQL) ensureVersionTable() (err error) {
if err = ss.Lock(); err != nil {
return err
}

defer func() {
if e := ss.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()

query := `IF NOT EXISTS
(SELECT *
FROM sysobjects
Expand Down
Loading

0 comments on commit ae500db

Please sign in to comment.