-
Notifications
You must be signed in to change notification settings - Fork 10
/
integration_test.go
86 lines (69 loc) · 2.08 KB
/
integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// +build integration
package mq_test
import (
"context"
"os"
"testing"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
mq "github.com/remind101/mq-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
func TestRouterIntegration(t *testing.T) {
c := newClient()
// Create a new queue.
queueURL := createQueue(t, c, "jobs")
// Send a message.
sendMessage(t, c, queueURL, aws.String("worker.1"), aws.String("do some work"))
done := make(chan struct{})
// Init handler.
r := mq.NewRouter()
// Route certain messages to a specific handler.
r.Handle("worker.1", mq.HandlerFunc(func(m *mq.Message) (err error) {
defer func() {
err = m.Delete()
assert.NoError(t, err)
}()
assert.Equal(t, "do some work", aws.StringValue(m.SQSMessage.Body))
close(done)
return
}))
// Run server until message is received.
s := mq.NewServer(*queueURL, r, mq.WithClient(c))
s.Start()
defer s.Shutdown(context.Background())
<-done
}
func newClient() sqsiface.SQSAPI {
config := aws.NewConfig()
config = config.WithEndpoint(os.Getenv("ELASTICMQ_URL"))
config = config.WithRegion("local")
config = config.WithCredentials(credentials.NewStaticCredentials("id", "secret", "token"))
return sqs.New(session.New(config))
}
func createQueue(t *testing.T, c sqsiface.SQSAPI, name string) *string {
out, err := c.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
})
require.NoError(t, err)
_, err = c.PurgeQueue(&sqs.PurgeQueueInput{QueueUrl: out.QueueUrl})
require.NoError(t, err)
return out.QueueUrl
}
func sendMessage(t *testing.T, c sqsiface.SQSAPI, qURL, route, body *string) {
_, err := c.SendMessage(&sqs.SendMessageInput{
QueueUrl: qURL,
MessageAttributes: map[string]*sqs.MessageAttributeValue{
mq.MessageAttributeNameRoute: &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: route,
},
},
MessageBody: body,
})
require.NoError(t, err)
}