Skip to content

Commit

Permalink
rename signal to interrupt and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mattes committed Aug 13, 2014
1 parent 69df0f6 commit e245a46
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 28 deletions.
3 changes: 0 additions & 3 deletions driver/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/mattes/migrate/file"
"github.com/mattes/migrate/migrate/direction"
"strconv"
"time"
)

type Driver struct {
Expand Down Expand Up @@ -52,8 +51,6 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {

pipe <- f

time.Sleep(3 * time.Second)

tx, err := driver.db.Begin()
if err != nil {
pipe <- err
Expand Down
6 changes: 1 addition & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/mattes/migrate/migrate/direction"
pipep "github.com/mattes/migrate/pipe"
"os"
// "os/signal"
"strconv"
"time"
)
Expand Down Expand Up @@ -106,12 +105,9 @@ func writePipe(pipe chan interface{}) {
}
fmt.Printf(" %s\n", f.FileName)

case os.Signal:
fmt.Println("signal", item)

default:
text := fmt.Sprint(item)
fmt.Println("TEXT", text)
fmt.Println(text)
}
}
}
Expand Down
34 changes: 22 additions & 12 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Up(pipe chan interface{}, url, migrationsPath string) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func Down(pipe chan interface{}, url, migrationsPath string) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
Expand All @@ -95,7 +95,7 @@ func DownSync(url, migrationsPath string) (err []error, ok bool) {
func Redo(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Migrate(pipe1, url, migrationsPath, -1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
go pipep.Close(pipe, nil)
return
} else {
Expand All @@ -115,7 +115,7 @@ func RedoSync(url, migrationsPath string) (err []error, ok bool) {
func Reset(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Down(pipe1, url, migrationsPath)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
go pipep.Close(pipe, nil)
return
} else {
Expand Down Expand Up @@ -149,7 +149,7 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
Expand Down Expand Up @@ -256,18 +256,28 @@ func NewPipe() chan interface{} {
return pipep.New()
}

var handleSignals = true
// interrupts is an internal variable that holds the state of
// interrupt handling
var interrupts = true

func EnableSignals() {
handleSignals = true
// Graceful enables interrupts checking. Once the first ^C is received
// it will finish the currently running migration and abort execution
// of the next migration. If ^C is received twice, it will stop
// execution immediately.
func Graceful() {
interrupts = true
}

func DisableSignals() {
handleSignals = false
// NonGraceful disables interrupts checking. The first received ^C will
// stop execution immediately.
func NonGraceful() {
interrupts = false
}

func handleSignal() chan os.Signal {
if handleSignals {
// interrupts returns a signal channel if interrupts checking is
// enabled. nil otherwise.
func handleInterrupts() chan os.Signal {
if interrupts {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
return c
Expand Down
20 changes: 12 additions & 8 deletions pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@ func Close(pipe chan interface{}, err error) {
close(pipe)
}

func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal) (ok bool) {
// WaitAndRedirect waits for pipe to be closed and
// redirects all messages from pipe to redirectPipe
// while it waits. It also checks if there was an
// interrupt send and will quit gracefully if yes.
func WaitAndRedirect(pipe, redirectPipe chan interface{}, interrupt chan os.Signal) (ok bool) {
errorReceived := false
signalsReceived := 0

interruptsReceived := 0
if pipe != nil && redirectPipe != nil {
for {
select {

case <-signal:
signalsReceived += 1
if signalsReceived > 1 {
case <-interrupt:
interruptsReceived += 1
if interruptsReceived > 1 {
os.Exit(5)
} else {
// add white space at beginning for ^C splitting
redirectPipe <- " Aborting after this migration ..."
}

case item, ok := <-pipe:
if !ok {
return !errorReceived && signalsReceived == 0
return !errorReceived && interruptsReceived == 0
} else {
redirectPipe <- item
switch item.(type) {
Expand All @@ -47,7 +51,7 @@ func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal)
}
}
}
return !errorReceived && signalsReceived == 0
return !errorReceived && interruptsReceived == 0
}

// ReadErrors selects all received errors and returns them.
Expand Down

0 comments on commit e245a46

Please sign in to comment.