From 8076641be704f94aac665adc5c9b52c6cbce5f25 Mon Sep 17 00:00:00 2001 From: Sergey Maskalik Date: Thu, 15 Nov 2018 18:44:05 -0800 Subject: [PATCH] Add progress indicator, better logging --- go.mod | 5 ++++- go.sum | 6 +++++ main.go | 70 ++++++++++++++++++++++++++++++++++++--------------------- 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index f8748a7..14e3840 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,13 @@ require ( github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/apex/log v1.1.0 github.com/aws/aws-sdk-go v1.15.76 + github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 // indirect github.com/fatih/color v1.7.0 github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.4 // indirect github.com/pkg/errors v0.8.0 // indirect - github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd // indirect + github.com/tj/go v1.8.6 + github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd + golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) diff --git a/go.sum b/go.sum index 803e6ee..5f95bea 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/apex/log v1.1.0 h1:J5rld6WVFi6NxA6m8GJ1LJqu3+GiTFIt3mYv27gdQWI= github.com/apex/log v1.1.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY= github.com/aws/aws-sdk-go v1.15.76 h1:AZB4clNWIk13YJaTm07kqyrHkj7gZYBQCgyTh/v4Sec= github.com/aws/aws-sdk-go v1.15.76/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= +github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 h1:uxxtrnACqI9zK4ENDMf0WpXfUsHP5V8liuq5QdgDISU= +github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= @@ -16,7 +18,11 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/tj/go v1.8.6 h1:HZ+XV+wB4vqN5y5VLoZqYUuUJTBF+2kblBru7aUa44E= +github.com/tj/go v1.8.6/go.mod h1:iDIwBG1ZkyeGIOBZLZQfpIztHr5m0gG+YGXrKaUC4yE= github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd h1:vVcJMsELyu9DdEDBEtFfK5PdAhEK+aYjMg0ZFxc1K4U= github.com/tj/go-progress v0.0.0-20180508172012-fadc638a53dd/go.mod h1:abH8hpo1+c7MbAa0ZCKvvGOgowFNgaoRQEcY0vsRTh4= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 h1:YoY1wS6JYVRpIfFngRf2HHo9R9dAne3xbkGOQ5rJXjU= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/main.go b/main.go index eb6b9b5..f40122f 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,10 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/fatih/color" + "github.com/tj/go-progress" + "github.com/tj/go/term" "gopkg.in/alecthomas/kingpin.v2" + "strconv" ) var ( @@ -71,10 +74,17 @@ func main() { AttributeNames: []*string{aws.String("All")}, }) + numberOfMessages, _ := strconv.Atoi(*queueAttributes.Attributes["ApproximateNumberOfMessages"]) + + if err != nil { + log.Error(color.New(color.FgRed).Sprintf("Unable to locate the destination queue with name: %s, check region and name", *sourceQueue)) + return + } + log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %s", *queueAttributes.Attributes["ApproximateNumberOfMessages"])) - moveMessages(sourceQueueUrl, destinationQueueUrl, svc) + moveMessages(sourceQueueUrl, destinationQueueUrl, svc, numberOfMessages) } @@ -102,34 +112,46 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs return result } -func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS) { +func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, numberOfMessages int) { params := &sqs.ReceiveMessageInput{ QueueUrl: aws.String(sourceQueueUrl), - VisibilityTimeout: aws.Int64(1), - WaitTimeSeconds: aws.Int64(1), + VisibilityTimeout: aws.Int64(2), + WaitTimeSeconds: aws.Int64(0), MaxNumberOfMessages: aws.Int64(10), } - for { - fmt.Println("Starting new batch") + log.Info(color.New(color.FgCyan).Sprintf("Starting to move messages...")) + fmt.Println() + + term.HideCursor() + defer term.ShowCursor() + b := progress.NewInt(numberOfMessages) + b.Width = 40 + b.StartDelimiter = color.New(color.FgCyan).Sprint("|") + b.EndDelimiter = color.New(color.FgCyan).Sprint("|") + b.Filled = color.New(color.FgCyan).Sprint("█") + b.Empty = color.New(color.FgCyan).Sprint("░") + b.Template(` {{.Bar}} {{.Text}}{{.Percent | printf "%3.0f"}}%`) + + render := term.Renderer() + + messagesProcessed := 0 + + for { resp, err := svc.ReceiveMessage(params) if len(resp.Messages) == 0 { - fmt.Println("Batch doesn't have any messages, transfer complete") + fmt.Println() + log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(numberOfMessages))) return } if err != nil { - // Print the error, cast err to awserr.Error to get the Code and - // Message from an error. - fmt.Println(err.Error()) + log.Error(color.New(color.FgRed).Sprint(err.Error())) return } - fmt.Println("Messages to transfer:") - fmt.Println(resp.Messages) - batch := &sqs.SendMessageBatchInput{ QueueUrl: aws.String(destinationQueueUrl), Entries: convertToEntries(resp.Messages), @@ -138,20 +160,15 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ sendResp, err := svc.SendMessageBatch(batch) if err != nil { - fmt.Println("Failed to unqueue messages to the destination queue") - fmt.Println(err.Error()) + log.Error(color.New(color.FgRed).Sprintf("Failed to un-queue messages to the destination. Error: %s", err.Error())) return } if len(sendResp.Failed) > 0 { - fmt.Println("Failed to unqueue messages to the destination queue") - fmt.Println(sendResp.Failed) + log.Error(color.New(color.FgRed).Sprint("Failed to un-queue messages to the destination queue.")) return } - fmt.Println("Unqueued to destination the following: ") - fmt.Println(sendResp.Successful) - if len(sendResp.Successful) == len(resp.Messages) { deleteMessageBatch := &sqs.DeleteMessageBatchInput{ Entries: convertSuccessfulMessageToBatchRequestEntry(resp.Messages), @@ -161,18 +178,21 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ deleteResp, err := svc.DeleteMessageBatch(deleteMessageBatch) if err != nil { - fmt.Println("Error deleting messages, exiting...") + log.Error(color.New(color.FgRed).Sprint("Error deleting messages, exiting...")) return } if len(deleteResp.Failed) > 0 { - fmt.Println("Error deleting messages, the following were not deleted") - fmt.Println(deleteResp.Failed) + log.Error(color.New(color.FgRed).Sprintf("Error deleting messages, the following were not deleted\n %s", deleteResp.Failed)) return } - fmt.Printf("Deleted: %d messages \n", len(deleteResp.Successful)) - fmt.Println("========================") + messagesProcessed += len(resp.Messages) } + + b.ValueInt(messagesProcessed) + render(b.String()) } + + }