Skip to content

Commit

Permalink
Add progress indicator, better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mercury2269 committed Nov 16, 2018
1 parent 8de932b commit 8076641
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ require (
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/apex/log v1.1.0
github.com/aws/aws-sdk-go v1.15.76
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 // indirect
github.com/fatih/color v1.7.0
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/pkg/errors v0.8.0 // indirect
github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd // indirect
github.com/tj/go v1.8.6
github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/apex/log v1.1.0 h1:J5rld6WVFi6NxA6m8GJ1LJqu3+GiTFIt3mYv27gdQWI=
github.com/apex/log v1.1.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY=
github.com/aws/aws-sdk-go v1.15.76 h1:AZB4clNWIk13YJaTm07kqyrHkj7gZYBQCgyTh/v4Sec=
github.com/aws/aws-sdk-go v1.15.76/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 h1:uxxtrnACqI9zK4ENDMf0WpXfUsHP5V8liuq5QdgDISU=
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
Expand All @@ -16,7 +18,11 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/tj/go v1.8.6 h1:HZ+XV+wB4vqN5y5VLoZqYUuUJTBF+2kblBru7aUa44E=
github.com/tj/go v1.8.6/go.mod h1:iDIwBG1ZkyeGIOBZLZQfpIztHr5m0gG+YGXrKaUC4yE=
github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd h1:vVcJMsELyu9DdEDBEtFfK5PdAhEK+aYjMg0ZFxc1K4U=
github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd/go.mod h1:abH8hpo1+c7MbAa0ZCKvvGOgowFNgaoRQEcY0vsRTh4=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 h1:YoY1wS6JYVRpIfFngRf2HHo9R9dAne3xbkGOQ5rJXjU=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
70 changes: 45 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/fatih/color"
"github.com/tj/go-progress"
"github.com/tj/go/term"
"gopkg.in/alecthomas/kingpin.v2"
"strconv"
)

var (
Expand Down Expand Up @@ -71,10 +74,17 @@ func main() {
AttributeNames: []*string{aws.String("All")},
})

numberOfMessages, _ := strconv.Atoi(*queueAttributes.Attributes["ApproximateNumberOfMessages"])

if err != nil {
log.Error(color.New(color.FgRed).Sprintf("Unable to locate the destination queue with name: %s, check region and name", *sourceQueue))
return
}

log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %s",
*queueAttributes.Attributes["ApproximateNumberOfMessages"]))

moveMessages(sourceQueueUrl, destinationQueueUrl, svc)
moveMessages(sourceQueueUrl, destinationQueueUrl, svc, numberOfMessages)

}

Expand Down Expand Up @@ -102,34 +112,46 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs
return result
}

func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS) {
func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, numberOfMessages int) {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sourceQueueUrl),
VisibilityTimeout: aws.Int64(1),
WaitTimeSeconds: aws.Int64(1),
VisibilityTimeout: aws.Int64(2),
WaitTimeSeconds: aws.Int64(0),
MaxNumberOfMessages: aws.Int64(10),
}

for {
fmt.Println("Starting new batch")
log.Info(color.New(color.FgCyan).Sprintf("Starting to move messages..."))
fmt.Println()

term.HideCursor()
defer term.ShowCursor()

b := progress.NewInt(numberOfMessages)
b.Width = 40
b.StartDelimiter = color.New(color.FgCyan).Sprint("|")
b.EndDelimiter = color.New(color.FgCyan).Sprint("|")
b.Filled = color.New(color.FgCyan).Sprint("█")
b.Empty = color.New(color.FgCyan).Sprint("░")
b.Template(` {{.Bar}} {{.Text}}{{.Percent | printf "%3.0f"}}%`)

render := term.Renderer()

messagesProcessed := 0

for {
resp, err := svc.ReceiveMessage(params)

if len(resp.Messages) == 0 {
fmt.Println("Batch doesn't have any messages, transfer complete")
fmt.Println()
log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(numberOfMessages)))
return
}

if err != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
log.Error(color.New(color.FgRed).Sprint(err.Error()))
return
}

fmt.Println("Messages to transfer:")
fmt.Println(resp.Messages)

batch := &sqs.SendMessageBatchInput{
QueueUrl: aws.String(destinationQueueUrl),
Entries: convertToEntries(resp.Messages),
Expand All @@ -138,20 +160,15 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
sendResp, err := svc.SendMessageBatch(batch)

if err != nil {
fmt.Println("Failed to unqueue messages to the destination queue")
fmt.Println(err.Error())
log.Error(color.New(color.FgRed).Sprintf("Failed to un-queue messages to the destination. Error: %s", err.Error()))
return
}

if len(sendResp.Failed) > 0 {
fmt.Println("Failed to unqueue messages to the destination queue")
fmt.Println(sendResp.Failed)
log.Error(color.New(color.FgRed).Sprint("Failed to un-queue messages to the destination queue."))
return
}

fmt.Println("Unqueued to destination the following: ")
fmt.Println(sendResp.Successful)

if len(sendResp.Successful) == len(resp.Messages) {
deleteMessageBatch := &sqs.DeleteMessageBatchInput{
Entries: convertSuccessfulMessageToBatchRequestEntry(resp.Messages),
Expand All @@ -161,18 +178,21 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
deleteResp, err := svc.DeleteMessageBatch(deleteMessageBatch)

if err != nil {
fmt.Println("Error deleting messages, exiting...")
log.Error(color.New(color.FgRed).Sprint("Error deleting messages, exiting..."))
return
}

if len(deleteResp.Failed) > 0 {
fmt.Println("Error deleting messages, the following were not deleted")
fmt.Println(deleteResp.Failed)
log.Error(color.New(color.FgRed).Sprintf("Error deleting messages, the following were not deleted\n %s", deleteResp.Failed))
return
}

fmt.Printf("Deleted: %d messages \n", len(deleteResp.Successful))
fmt.Println("========================")
messagesProcessed += len(resp.Messages)
}

b.ValueInt(messagesProcessed)
render(b.String())
}


}

0 comments on commit 8076641

Please sign in to comment.