Skip to content

Commit

Permalink
Cleanup, preparation to start:
Browse files Browse the repository at this point in the history
actual work on stompngo issue #25.
  • Loading branch information
gmallard committed May 21, 2015
1 parent 0c372bb commit e6c7532
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
6 changes: 6 additions & 0 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ type subscription struct {
id string // Subscription 'id' header
am string // Subscription 'ack' keader
dst string // Subscription 'destination' header
// Controlled by the "subdrain" header value for SUBSCRIBE
df bool // Drain control flag, drain or not. Default is false.
dfd bool // Drain already done flag
// Also controlled by the "subdrain" header value for SUBSCRIBE. See
// code and comments in subscribe.go for more information.
dfw string // Drain when indicator: "before" or "after" unsubscribe
}

/*
Expand Down
2 changes: 2 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package stompngo

import (
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -54,6 +55,7 @@ func (c *Connection) reader() {
if sid, ok := f.Headers.Contains("subscription"); ok {
// This is a read lock
c.subsLock.RLock()
fmt.Println("reader_lock_for", sid, c.subs[sid])
c.subs[sid].md <- d
c.subsLock.RUnlock()
} else {
Expand Down
28 changes: 21 additions & 7 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,48 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea
}
//

sd := new(subscription)
lam := "auto" // Default ack mode
// sd := new(subscription)
sd := &subscription{md: make(chan MessageData, c.scc),
id: id,
am: "auto",
dst: h.Value("destination"),
df: false,
dfd: false,
dfw: "before",
}

if ham, q := h.Contains("ack"); q {
lam = ham // Reset it
sd.am = ham // Reset it, might still be "auto"
}

// stompngo extension: subscription drain control
if v, q := h.Contains("subdrain"); q {
sd.df = true
if v == "after" {
sd.dfw = v
}
}

// Rewrite the below.
if c.Protocol() == SPL_10 {
if hid { // If 1.0 client wants one, assign it.
sd.md = make(chan MessageData, c.scc)
sd.id = id
sd.am = lam
sd.dst = h.Value("destination")
} else {
sd.md = c.input
sd.am = lam
sd.dst = h.Value("destination")
return sd, nil, h // 1.0 clients with no id take their own chances
}
} else { // 1.1+
if hid { // Client specified id
sd.md = make(chan MessageData, c.scc) // Assign subscription
sd.id = id // Set subscription id
sd.am = lam // Set subscription ack mode
sd.dst = h.Value("destination") // Set subscription destination
} else {
h = h.Add("id", uuid1)
sd.md = make(chan MessageData, c.scc) // Assign subscription
sd.id = uuid1 // Set subscription id
sd.am = lam // Set subscription ack mode
sd.dst = h.Value("destination") // Set subscription destination
}
}
Expand Down
32 changes: 22 additions & 10 deletions unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *Connection) Unsubscribe(h Headers) error {

// This is a read lock
c.subsLock.RLock()
_, p := c.subs[hid]
sp, p := c.subs[hid]
c.subsLock.RUnlock()

switch c.Protocol() {
Expand Down Expand Up @@ -97,34 +97,46 @@ func (c *Connection) Unsubscribe(h Headers) error {

if oki {

c.Drain(hid)
// drain *after* the UNSUBSCRIBE is on the wire, and only if
// the client requested it when SUBSCRIBE was sent.
log.Println("unsubDF", sp.df)
if sp.df {
c.Drain(hid, false) // Hard coded false may change one day
}

// This is a write lock
c.subsLock.Lock()
fmt.Println("unsub_delete_for", hid)
delete(c.subs, hid)
c.subsLock.Unlock()
}
c.log(UNSUBSCRIBE, "end", h)
return nil
}

func (c *Connection) Drain(id string) {
func (c *Connection) Drain(id string, nac12 bool) {
// Drain any latent messages inbound for this subscription.
b := false
for {
select {
case m := <-c.subs[id].md: // Drop a MessageData on the floor
if c.Protocol() == SPL_12 {
// Nacks are a 'SHOULD' for 1.2 clients
ai := m.Message.Headers.Value("ack")
nh := Headers{"id", ai}
case md := <-c.subs[id].md: // Drop a MessageData on the floor
log.Println("drainsuc", string(md.Message.Body))
if c.Protocol() == SPL_12 && nac12 {
nh := Headers{"id", md.Message.Headers.Value("ack")}
e := c.Nack(nh)
if e != nil {
log.Fatalln(e)
log.Fatalln("nackerror", e)
}
}
break
case _ = <-time.After(time.Duration(250 * time.Millisecond)): // A guess
case md := <-c.MessageData: // Drop a MessageData on the floor
log.Println("drainmd", string(md.Message.Body))
if md.Error != nil {
log.Fatalln("mderror", md.Message, md.Error)
}
break
case _ = <-time.After(time.Duration(250 * time.Millisecond)):
// Duration value above is a guess
b = true
break
}
Expand Down

0 comments on commit e6c7532

Please sign in to comment.