-
Notifications
You must be signed in to change notification settings - Fork 10
/
publisher_test.go
96 lines (80 loc) · 2.37 KB
/
publisher_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package mq_test
import (
"context"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/aws/aws-sdk-go/service/sqs"
mq "github.com/remind101/mq-go"
"github.com/remind101/mq-go/pkg/memsqs"
"github.com/stretchr/testify/assert"
)
func TestPublisherFlushesOnShutdown(t *testing.T) {
qURL := "jobs"
c := memsqs.New()
p := newPublisher(t, qURL, c)
p.Start()
p.Publish(&sqs.SendMessageBatchRequestEntry{
MessageBody: aws.String("job1"),
})
shutdownPublisher(t, p) // Wait for server to shutdown
assert.Equal(t, 1, len(c.Queue(qURL)))
}
func TestPublisherSendsWhenBatchMaxReached(t *testing.T) {
qURL := "jobs"
c := memsqs.New()
p := newPublisher(t, qURL, c)
p.Start()
defer shutdownPublisher(t, p)
// Push enough messages into the queue to trigger a batch send call.
for i := 0; i < mq.DefaultMaxNumberOfMessages; i++ {
p.Publish(&sqs.SendMessageBatchRequestEntry{
MessageBody: aws.String(fmt.Sprintf("job%d", i)),
})
}
// Assert queue receives messages before mq.DefaultPublishInterval is reached.
eventually(t, 500*time.Millisecond, func() bool {
return len(c.Queue(qURL)) == mq.DefaultMaxNumberOfMessages
}, "expected messages to be batch sent")
}
func TestPublisherSendsWhenIntervalIsReached(t *testing.T) {
qURL := "jobs"
c := memsqs.New()
p := newPublisher(t, qURL, c)
p.PublishInterval = 10 * time.Millisecond
p.Start()
defer shutdownPublisher(t, p)
p.Publish(&sqs.SendMessageBatchRequestEntry{
MessageBody: aws.String("job1"),
})
p.Publish(&sqs.SendMessageBatchRequestEntry{
MessageBody: aws.String("job1"),
})
// Assert message is in queue after at least 2 intervals.
eventually(t, 20*time.Millisecond, func() bool {
return len(c.Queue(qURL)) == 2
}, "expected messages to be batch sent after the PublishInterval")
}
func newPublisher(t *testing.T, qURL string, c sqsiface.SQSAPI) *mq.Publisher {
return mq.NewPublisher(qURL, func(p *mq.Publisher) {
p.Client = c
p.OutputHandler = func(out *sqs.SendMessageBatchOutput, err error) {
if err != nil {
t.Fatal(err)
}
}
if os.Getenv("DEBUG") == "true" {
p.Logger = log.New(os.Stderr, "publisher - ", 0)
}
})
}
func shutdownPublisher(t *testing.T, s *mq.Publisher) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
assert.NoError(t, s.Shutdown(ctx))
}