From 512e8d8b2df757981dfebf317bbe07825ca759c7 Mon Sep 17 00:00:00 2001 From: "Guy M. Allard" Date: Tue, 14 Aug 2018 12:57:37 -0400 Subject: [PATCH] A STOMP protocol extension. This allow a client to implement a fix for the hangs described in issue #25. This is a second approach to solving that problem. --- data.go | 1 + unsubscribe.go | 60 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/data.go b/data.go index 68f46c0..d8d5866 100644 --- a/data.go +++ b/data.go @@ -461,6 +461,7 @@ const ( */ const ( StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header + StompPlusDrainNow = "sng_drnow" // UNSUBSCRIBE Header ) var ( diff --git a/unsubscribe.go b/unsubscribe.go index 8fb5bf4..2840caa 100644 --- a/unsubscribe.go +++ b/unsubscribe.go @@ -16,9 +16,10 @@ package stompngo -//import ( -// "fmt" -//) +import ( + "strconv" + "time" +) /* Unsubscribe from a STOMP subscription. @@ -73,9 +74,10 @@ func (c *Connection) Unsubscribe(h Headers) error { // shaid := Sha1(h.Value(HK_DESTINATION)) // Special for 1.0 c.subsLock.RLock() - _, p := c.subs[shid] - _, ps := c.subs[shaid] + s1x, p := c.subs[shid] + s10, ps := c.subs[shaid] // Special for 1.0 c.subsLock.RUnlock() + var usesp *subscription usekey := "" switch c.Protocol() { @@ -89,23 +91,61 @@ func (c *Connection) Unsubscribe(h Headers) error { return EBADSID // invalid subscription-id } usekey = shid + usesp = s1x case SPL_10: if !p && !ps { return EUNODSID } usekey = shaid + usesp = s10 default: panic("unsubscribe version not supported: " + c.Protocol()) } - e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers - if e != nil { - return e - } + sdn, ok := h.Contains(StompPlusDrainNow) // STOMP Protocol Extension + if !ok { + e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers + if e != nil { + return e + } + + c.subsLock.Lock() + delete(c.subs, usekey) + c.subsLock.Unlock() + c.log(UNSUBSCRIBE, "end", h) + return nil + } + // + // STOMP Protocol Extension + // + c.log("sngdrnow extension detected") + idn, err := strconv.ParseInt(sdn, 10, 64) + if err != nil { + idn = 100 // 100 milliseconds if bad parameter + } + ival := time.Duration(idn * 1000000) + dmc := 0 +forsel: + for { + ticker := time.NewTicker(ival) + select { + case mi, ok := <-usesp.md: + if !ok { + break forsel + } + dmc++ + c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers) + case _ = <-ticker.C: + c.log("sngdrnow extension BREAK") + break forsel + } + } + // + c.log("sngdrnow extension at very end") c.subsLock.Lock() delete(c.subs, usekey) c.subsLock.Unlock() - c.log(UNSUBSCRIBE, "end", h) + c.log(UNSUBSCRIBE, "endsngdrnow", h) return nil }