forked from gmallard/stompngo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribe.go
119 lines (105 loc) · 3.2 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//
// Copyright © 2011-2014 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package stompngo
import (
"fmt"
)
var _ = fmt.Println
/*
Subscribe to a STOMP subscription.
Headers MUST contain a "destination" header key.
All clients are recommended to supply a unique "id" header on Subscribe.
For STOMP 1.0 clients: if an "id" header is supplied, attempt to use it. If the
"id" header is not unique, return an error. If no "id" header is supplied, send the
SUBSCRIBE frame without an "id" header. Some brokers may respond with an ERROR
frame in this case if the subscription is seen as a duplicate.
For STOMP 1.1+ clients: If any client does not supply an "id" header, attempt to generate
a unique "id". In all cases, do not allow duplicate subscription "id"s in this session.
For details about the returned MessageData channel, see: https://github.com/gmallard/stompngo/wiki/subscribe-and-messagedata
Example:
// Possible additional Header keys: ack, id.
h := stompngo.Headers{"destination", "/queue/myqueue"}
s, e := c.Subscribe(h)
if e != nil {
// Do something sane ...
}
*/
func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) {
c.log(SUBSCRIBE, "start", h, c.Protocol())
if !c.connected {
return nil, ECONBAD
}
e := checkHeaders(h, c.Protocol())
if e != nil {
return nil, e
}
if _, ok := h.Contains("destination"); !ok {
return nil, EREQDSTSUB
}
ch := h.Clone()
if _, ok := ch.Contains("ack"); !ok {
ch = append(ch, "ack", "auto")
}
s, e, ch := c.establishSubscription(ch)
if e != nil {
return nil, e
}
//
f := Frame{SUBSCRIBE, ch, NULLBUFF}
//
r := make(chan error)
c.output <- wiredata{f, r}
e = <-r
c.log(SUBSCRIBE, "end", ch, c.Protocol())
return s, e
}
/*
Handle subscribe id.
*/
func (c *Connection) establishSubscription(h Headers) (<-chan MessageData, error, Headers) {
c.subsLock.Lock()
defer c.subsLock.Unlock()
//
id, hid := h.Contains("id")
uuid1 := Uuid()
// No duplicates
if hid {
if _, q := c.subs[id]; q {
return nil, EDUPSID, h // Duplicate subscriptions not allowed
}
} else {
if _, q := c.subs[uuid1]; q {
return nil, EDUPSID, h // Duplicate subscriptions not allowed
}
}
//
if c.Protocol() == SPL_10 {
if hid { // If 1.0 client wants one, assign it.
c.subs[id] = make(chan MessageData, c.scc)
} else {
return c.input, nil, h // 1.0 clients with no id take their own chances
}
} else { // 1.1+
if hid { // Client specified id
c.subs[id] = make(chan MessageData, c.scc) // Assign subscription
} else {
h = h.Add("id", uuid1)
c.subs[uuid1] = make(chan MessageData, c.scc) // Assign subscription
id = uuid1 // reset
}
}
return c.subs[id], nil, h
}