Skip to content

Commit

Permalink
A STOMP protocol extension.
Browse files Browse the repository at this point in the history
This allow a client to implement a fix for the hangs described in
issue #25.  This is a second approach to solving that problem.
  • Loading branch information
gmallard committed Aug 14, 2018
1 parent e30c1f9 commit 512e8d8
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
1 change: 1 addition & 0 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ const (
*/
const (
StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
StompPlusDrainNow = "sng_drnow" // UNSUBSCRIBE Header
)

var (
Expand Down
60 changes: 50 additions & 10 deletions unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package stompngo

//import (
// "fmt"
//)
import (
"strconv"
"time"
)

/*
Unsubscribe from a STOMP subscription.
Expand Down Expand Up @@ -73,9 +74,10 @@ func (c *Connection) Unsubscribe(h Headers) error {
//
shaid := Sha1(h.Value(HK_DESTINATION)) // Special for 1.0
c.subsLock.RLock()
_, p := c.subs[shid]
_, ps := c.subs[shaid]
s1x, p := c.subs[shid]
s10, ps := c.subs[shaid] // Special for 1.0
c.subsLock.RUnlock()
var usesp *subscription
usekey := ""

switch c.Protocol() {
Expand All @@ -89,23 +91,61 @@ func (c *Connection) Unsubscribe(h Headers) error {
return EBADSID // invalid subscription-id
}
usekey = shid
usesp = s1x
case SPL_10:
if !p && !ps {
return EUNODSID
}
usekey = shaid
usesp = s10
default:
panic("unsubscribe version not supported: " + c.Protocol())
}

e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers
if e != nil {
return e
}
sdn, ok := h.Contains(StompPlusDrainNow) // STOMP Protocol Extension

if !ok {
e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers
if e != nil {
return e
}

c.subsLock.Lock()
delete(c.subs, usekey)
c.subsLock.Unlock()
c.log(UNSUBSCRIBE, "end", h)
return nil
}
//
// STOMP Protocol Extension
//
c.log("sngdrnow extension detected")
idn, err := strconv.ParseInt(sdn, 10, 64)
if err != nil {
idn = 100 // 100 milliseconds if bad parameter
}
ival := time.Duration(idn * 1000000)
dmc := 0
forsel:
for {
ticker := time.NewTicker(ival)
select {
case mi, ok := <-usesp.md:
if !ok {
break forsel
}
dmc++
c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers)
case _ = <-ticker.C:
c.log("sngdrnow extension BREAK")
break forsel
}
}
//
c.log("sngdrnow extension at very end")
c.subsLock.Lock()
delete(c.subs, usekey)
c.subsLock.Unlock()
c.log(UNSUBSCRIBE, "end", h)
c.log(UNSUBSCRIBE, "endsngdrnow", h)
return nil
}

0 comments on commit 512e8d8

Please sign in to comment.