-
Notifications
You must be signed in to change notification settings - Fork 0
/
mq.go
90 lines (74 loc) · 2 KB
/
mq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package fluffle
import (
"github.com/rabbitmq/amqp091-go"
)
type MQ struct {
QueueProperties
channels chan *rabbitChannel
deliveryChannel chan amqp091.Delivery
}
type IRabbitTrans interface {
Commit() error
Rollback() error
Publish(bty []byte) error
}
func (b *MQ) PublishIdempotent(idempotencyKey, idempotencyValue string, bty []byte) error {
return b.publish(bty, amqp091.Table{
idempotencyKey: idempotencyValue,
})
}
func (b *MQ) Publish(bty []byte) error {
return b.publish(bty, nil)
}
func (b *MQ) publish(bty []byte, publishingHeader amqp091.Table) error {
ch := getChannel()
defer ch.amqpChan.Close()
if err := CreateQueue(ch.amqpChan, b.QueueProperties); err != nil {
logger.Error("failed to initialize queue while publishing", err, nil)
return err
}
err := ch.amqpChan.Publish(
"", // exchange
b.QueueProperties.Name, // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
Headers: publishingHeader,
ContentType: "application/json",
Body: bty,
DeliveryMode: amqp091.Persistent,
})
if err != nil {
logger.Error("RabbitMQ", err, map[string]interface{}{"message": "Error while sending message"})
}
return err
}
func (b MQ) Retry(delivery amqp091.Delivery) {
defer func() {
recover()
}()
err := delivery.Nack(false, false)
if err != nil {
logger.Error("MQ Retry", err, map[string]interface{}{
"message": "Error while nack ",
})
}
}
func (b *MQ) Consume() <-chan amqp091.Delivery {
// lazy loading. delivery channel won't be created if there are no consumers
if b.deliveryChannel == nil {
b.deliveryChannel = make(chan amqp091.Delivery)
}
go subscribe(b.deliveryChannel, b.QueueProperties)
return b.deliveryChannel
}
func new(name string, prefetch int) *MQ {
if !poolStarted {
panic("connections not initiated")
}
b := MQ{channels: make(chan *rabbitChannel)}
b.Name = name
b.PrefetchCount = prefetch
go publish(b.channels, b.QueueProperties)
return &b
}