Skip to content

Commit

Permalink
Add batch limit option, use default region from config
Browse files Browse the repository at this point in the history
  • Loading branch information
mercury2269 committed May 2, 2020
1 parent bd73c60 commit c842c2c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
29 changes: 21 additions & 8 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

## Features

* Reliable delivery. Messages are only deleted from the source queue if they were enqueued to the destination.
* Messages are sent and received in batches for faster processing.
* Reliable delivery. SQS Mover will only delete messages from the source queue after they were enqueued to the destination.
* Receives and sends messages in batches for faster processing.
* Progress indicator.
* User friendly info and error messages.
* Queue name resolution. For ease of use, you only need to provide a queue name and not the full `arn` address.
* Message Attributes are copied over.
* Message attributes copy.
* Support for FIFO queues. MessageGroupId and MessageDeduplicationId are copied over to the destination messages.
* Optional flag to limit the number of messages to move.
* An optional flag to limit the number of messages to move.

## Getting Started

Expand All @@ -38,6 +38,12 @@ aws_secret_access_key = <YOUR_SECRET_ACCESS_KEY>

The [default] heading defines credentials for the default profile, which the SQSMover will use unless you configure it to use another profile.

Optionally you can configure default region in `~/.aws/config`
```
[default]
region=us-west-2
```

#### Environment variables

As an alternative, you can setup AWS credentials in the environment variables.
Expand Down Expand Up @@ -117,11 +123,12 @@ usage: sqsmover --source=SOURCE --destination=DESTINATION [<flags>]
Flags:
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
-s, --source=SOURCE Source queue name to move messages from.
-d, --destination=DESTINATION Destination queue name to move messages to.
-r, --region="us-west-2" AWS region for source and destination queues.
-s, --source=SOURCE The source queue name to move messages from.
-d, --destination=DESTINATION The destination queue name to move messages to.
-r, --region="us-west-2" The AWS region for source and destination queues.
-p, --profile="" Use a specific profile from AWS credentials file.
-l, --limit=0 Limits number of messages moved. No limit is set by default.
-l, --limit=0 Limits total number of messages moved. No limit is set by default.
-b, --batch=10 The maximum number of messages to move at a time.
-v, --version Show application version.
```

Expand All @@ -148,3 +155,9 @@ Limit number of moved messages to 10
sqsmover -s my_source_queue_name -d my_destination_queuename -l 10
```

By default, `sqsmover` will try to move 10 messages at a time. However, if the total size of messages
in a batch exceeds 256kb (262,144 bytes) you will receive an error: `Batch requests cannot be longer than 262144 bytes. You have sent x bytes.`
To resolve, reduce the batch size by setting `-b` flag.
```
sqsmover -s my_source_queue_name -d my_destination_queuename -b 3
```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/mercury2269/sqsmover

go 1.14

require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
Expand Down
33 changes: 18 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ var (
)

var (
sourceQueue = kingpin.Flag("source", "Source queue name to move messages from.").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "Destination queue name to move messages to.").Short('d').Required().String()
region = kingpin.Flag("region", "AWS region for source and destination queues.").Short('r').Default("us-west-2").String()
profile = kingpin.Flag("profile", "Use a specific profile from AWS credentials file.").Short('p').Default("").String()
limit = kingpin.Flag("limit", "Limits number of messages moved. No limit is set by default.").Short('l').Default("0").Int()
sourceQueue = kingpin.Flag("source", "The source queue name to move messages from.").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "The destination queue name to move messages to.").Short('d').Required().String()
region = kingpin.Flag("region", "The AWS region for source and destination queues.").Short('r').Default("").String()
profile = kingpin.Flag("profile", "Use a specific profile from AWS credentials file.").Short('p').String()
limit = kingpin.Flag("limit", "Limits total number of messages moved. No limit is set by default.").Short('l').Default("0").Int()
maxBatchSize = kingpin.Flag("batch", "The maximum number of messages to move at a time").Short('b').Default("10").Int64()
)

func main() {
Expand All @@ -45,13 +46,16 @@ func main() {

kingpin.Parse()

sess, err := session.NewSessionWithOptions(
session.Options{
Config: aws.Config{Region: aws.String(*region)},
Profile: *profile,
SharedConfigState: session.SharedConfigEnable,
},
)
options := session.Options{
Profile: *profile,
SharedConfigState: session.SharedConfigEnable,
}

if region != nil {
options.Config = aws.Config{Region: aws.String(*region)}
}

sess, err := session.NewSessionWithOptions(options)

if err != nil {
log.Error(color.New(color.FgRed).Sprintf("Unable to create AWS session for region \r\n", *region))
Expand Down Expand Up @@ -163,17 +167,16 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs
}

func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, totalMessages int) {
params := &sqs.ReceiveMessageInput{
var params = &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sourceQueueUrl),
VisibilityTimeout: aws.Int64(2),
WaitTimeSeconds: aws.Int64(0),
MaxNumberOfMessages: aws.Int64(10),
MaxNumberOfMessages: aws.Int64(*maxBatchSize),
MessageAttributeNames: []*string{aws.String(sqs.QueueAttributeNameAll)},
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameMessageGroupId),
aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId)},
}

log.Info(color.New(color.FgCyan).Sprintf("Starting to move messages..."))
fmt.Println()

Expand Down

0 comments on commit c842c2c

Please sign in to comment.