From 26d80504c4093b7062edd8f98e4d6708efe88478 Mon Sep 17 00:00:00 2001 From: gmallard Date: Thu, 21 Jul 2016 21:07:19 -0400 Subject: [PATCH 01/11] Start next dev branch. --- version.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/version.go b/version.go index f823619..b9041ad 100644 --- a/version.go +++ b/version.go @@ -35,9 +35,9 @@ var ( minor = "0" // Minor - patch = "3" // Patch + //patch = "3" // Patch - //patch = "3.plvl.001" // Patch + patch = "3.plvl.001" // Patch ) func Version() string { From f7f2c00aac2356607aca1fdac4eed4de25bf2c7a Mon Sep 17 00:00:00 2001 From: "Guy M. Allard" Date: Sat, 30 Jul 2016 00:21:16 -0400 Subject: [PATCH 02/11] Modify how version display works. --- cmd/stompngo/main.go | 36 ++++++++++++++++++++++++++++++++++++ version.go | 2 +- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 cmd/stompngo/main.go diff --git a/cmd/stompngo/main.go b/cmd/stompngo/main.go new file mode 100644 index 0000000..b68c7f8 --- /dev/null +++ b/cmd/stompngo/main.go @@ -0,0 +1,36 @@ +// +// Copyright © 2016 Guy M. Allard +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +/* + Provide package version information. A nod to the concept of semver. + + Example: + fmt.Println("current stompngo version", stompngo.Version()) + +*/ + +import ( + "fmt" + // + "github.com/gmallard/stompngo" +) + +func main() { + fmt.Println(stompngo.Version()) + return +} diff --git a/version.go b/version.go index b9041ad..823c531 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.001" // Patch + patch = "3.plvl.002" // Patch ) func Version() string { From 934e70426f4b7e099e3c6d2523cdc5f81e6ed23d Mon Sep 17 00:00:00 2001 From: gmallard Date: Sat, 30 Jul 2016 19:12:25 -0400 Subject: [PATCH 03/11] Adjust heartbeat test timing. --- hb_test.go | 15 ++++++++++----- version.go | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/hb_test.go b/hb_test.go index 6714f13..311dcd3 100644 --- a/hb_test.go +++ b/hb_test.go @@ -194,9 +194,11 @@ func TestHB11NoSend(t *testing.T) { t.Skip("TestHB11NoSend norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "0,6000") // No sending + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -209,7 +211,6 @@ func TestHB11NoSend(t *testing.T) { t.Errorf("Receive Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11NoSend connect response", conn.ConnectResponse.Command, @@ -242,9 +243,11 @@ func TestHB11NoReceive(t *testing.T) { t.Skip("TestHB11NoReceive norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,0") // No Receiving + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -257,7 +260,6 @@ func TestHB11NoReceive(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11NoReceive start sleep") @@ -287,9 +289,11 @@ func TestHB11SendReceive(t *testing.T) { t.Skip("TestHB11SendReceive norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,6000") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -305,7 +309,6 @@ func TestHB11SendReceive(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceive start sleep") @@ -337,9 +340,11 @@ func TestHB11SendReceiveApollo(t *testing.T) { t.Skip("TestHB11SendReceiveApollo norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,100") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -355,7 +360,6 @@ func TestHB11SendReceiveApollo(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceiveApollo start sleep") @@ -391,9 +395,11 @@ func TestHB11SendReceiveApolloRev(t *testing.T) { t.Skip("TestHB11SendReceiveApolloRev norun, skip AMQ11+") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "100,10000") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -409,7 +415,6 @@ func TestHB11SendReceiveApolloRev(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceiveApolloRev start sleep") diff --git a/version.go b/version.go index 823c531..adf93f2 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.002" // Patch + patch = "3.plvl.003" // Patch ) func Version() string { From 0dc146cbbfd343b8f8e4edce2998916466b121ab Mon Sep 17 00:00:00 2001 From: gmallard Date: Sun, 31 Jul 2016 21:24:47 -0400 Subject: [PATCH 04/11] Shorten time the subscription map lock is held. --- subscribe.go | 8 ++++---- version.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/subscribe.go b/subscribe.go index 072cfb9..f23eb3a 100644 --- a/subscribe.go +++ b/subscribe.go @@ -82,9 +82,6 @@ func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) { Handle subscribe id. */ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) { - // This is a write lock - c.subsLock.Lock() - defer c.subsLock.Unlock() // id, hid := h.Contains("id") uuid1 := Uuid() @@ -128,6 +125,9 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea sd.id = uuid1 // Set subscription ID to that } } + // This is a write lock + c.subsLock.Lock() c.subs[sd.id] = sd // Add subscription to the connection subscription map - return sd, nil, h // Return the subscription pointer + c.subsLock.Unlock() + return sd, nil, h // Return the subscription pointer } diff --git a/version.go b/version.go index adf93f2..4ae0308 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.003" // Patch + patch = "3.plvl.004" // Patch ) func Version() string { From c40969a4e5956d8db38cb7addeffe5cc272001c2 Mon Sep 17 00:00:00 2001 From: gmallard Date: Mon, 1 Aug 2016 18:37:47 -0400 Subject: [PATCH 05/11] Use subscription read lock when appropriate. --- sub_test.go | 27 +++++++++++++++++++-------- subscribe.go | 7 +++++++ version.go | 2 +- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/sub_test.go b/sub_test.go index 3f1f550..f03b569 100644 --- a/sub_test.go +++ b/sub_test.go @@ -17,7 +17,7 @@ package stompngo import ( - //"log" + "log" "os" "testing" "time" @@ -81,6 +81,7 @@ func TestSubNoIdTwice10(t *testing.T) { t.Skip("TestSubNoIdTwice10 norun, need 1.0") } + t.Log("TestSubNoIdTwice10", "starts") n, _ := openConn(t) ch := check11(TEST_HEADERS) conn, _ := Connect(n, ch) @@ -157,45 +158,55 @@ func TestSubNoIdTwice11p(t *testing.T) { t.Skip("TestSubNoIdTwice11p norun, need 1.1+") } + t.Log("TestSubNoIdTwice11p", "starts") n, _ := openConn(t) ch := check11(TEST_HEADERS) conn, _ := Connect(n, ch) + l := log.New(os.Stdout, "TSNI211P ", log.Ldate|log.Lmicroseconds) + conn.SetLogger(l) d := tdest("/queue/subdup.p11.01") id := "TestSubNoIdTwice11p" sbh := Headers{"destination", d, "id", id} // First time + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - start 1st SUBSCRIBE") sc, e := conn.Subscribe(sbh) + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - end 1st SUBSCRIBE") if e != nil { - t.Errorf("Expected no subscribe error (T1), got [%v]\n", e) + t.Errorf("ERROR Expected no subscribe error (T1), got [%v]\n", e) } if sc == nil { - t.Errorf("Expected subscribe channel (T1), got [nil]\n") + t.Errorf("ERROR Expected subscribe channel (T2), got [nil]\n") } time.Sleep(500 * time.Millisecond) // give a broker a break select { case v := <-sc: - t.Logf("Unexpected frame received (T1), got [%v]\n", v) + t.Errorf("ERROR Unexpected frame received (T3), got [%v]\n", v) case v := <-conn.MessageData: - t.Logf("Unexpected frame received (T1), got [%v]\n", v) + t.Errorf("ERROR Unexpected frame received (T4), got [%v]\n", v) default: } // Second time. The stompngo package maintains a list of all current // subscription ids. An attempt to subscribe using an existing id is // immediately rejected by the package (never actually sent to the broker). + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - start 2nd SUBSCRIBE") sc, e = conn.Subscribe(sbh) + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - end 2nd SUBSCRIBE") if e == nil { - t.Errorf("Expected subscribe error (T2), got [nil]\n") + t.Errorf("ERROR Expected subscribe error (T5), got [nil]\n") } if e != EDUPSID { - t.Errorf("Expected subscribe error (T2), [%v] got [%v]\n", EDUPSID, e) + t.Errorf("ERROR Expected subscribe error (T6), [%v] got [%v]\n", EDUPSID, e) + } else { + t.Logf("INFO wanted/got actual (T7), [%v]\n", e) } if sc != nil { - t.Errorf("Expected nil subscribe channel (T1), got [%v]\n", sc) + t.Errorf("ERROR Expected nil subscribe channel (T8), got [%v]\n", sc) } _ = conn.Disconnect(empty_headers) _ = closeConn(t, n) + t.Log("TestSubNoIdTwice11p", "ends") } /* diff --git a/subscribe.go b/subscribe.go index f23eb3a..4eedbcb 100644 --- a/subscribe.go +++ b/subscribe.go @@ -82,19 +82,25 @@ func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) { Handle subscribe id. */ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) { + // c.log(SUBSCRIBE, "start establishSubscription") // id, hid := h.Contains("id") uuid1 := Uuid() + // + c.subsLock.RLock() // Acquire Read lock // No duplicates if hid { if _, q := c.subs[id]; q { + c.subsLock.RUnlock() // Release Read lock return nil, EDUPSID, h // Duplicate subscriptions not allowed } } else { if _, q := c.subs[uuid1]; q { + c.subsLock.RUnlock() // Release Read lock return nil, EDUPSID, h // Duplicate subscriptions not allowed } } + c.subsLock.RUnlock() // Release Read lock // sd := new(subscription) // New subscription data @@ -129,5 +135,6 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea c.subsLock.Lock() c.subs[sd.id] = sd // Add subscription to the connection subscription map c.subsLock.Unlock() + //c.log(SUBSCRIBE, "end establishSubscription") return sd, nil, h // Return the subscription pointer } diff --git a/version.go b/version.go index 4ae0308..0de66b1 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.004" // Patch + patch = "3.plvl.005" // Patch ) func Version() string { From 8c06617eda158f38e69c69baf1ce2784b2fd7e95 Mon Sep 17 00:00:00 2001 From: gmallard Date: Mon, 1 Aug 2016 18:39:38 -0400 Subject: [PATCH 06/11] Make reader logging more useful. --- reader.go | 4 +--- version.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/reader.go b/reader.go index ed56cf9..289b46d 100644 --- a/reader.go +++ b/reader.go @@ -34,7 +34,7 @@ func (c *Connection) reader() { for { f, e := c.readFrame() - c.log("RDR_READFRAME", f, "RDR_RF_ERR", e) + c.log("RDR_RECEIVE_FRAME", f, "RDR_RECEIVE_ERR", e) if e != nil { f.Headers = append(f.Headers, "connection_read_error", e.Error()) md := MessageData{Message(f), e} @@ -71,8 +71,6 @@ func (c *Connection) reader() { c.input <- md } - c.log("RDR_RECEIVE", m.Command, m.Headers) - select { case q = <-c.rsd: default: diff --git a/version.go b/version.go index 0de66b1..438363b 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.005" // Patch + patch = "3.plvl.006" // Patch ) func Version() string { From f7daea8a142481bb7aa5a6271ce11a0959ae558f Mon Sep 17 00:00:00 2001 From: gmallard Date: Mon, 1 Aug 2016 21:23:46 -0400 Subject: [PATCH 07/11] Correct already disconnected check in disconnect.go. --- conndisc_test.go | 6 +++--- data.go | 5 +---- disconnect.go | 5 +---- version.go | 2 +- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/conndisc_test.go b/conndisc_test.go index 9f363f9..c8defb5 100644 --- a/conndisc_test.go +++ b/conndisc_test.go @@ -285,7 +285,7 @@ func TestConnEconBad(t *testing.T) { } /* - ConnDisc Test: EDISCPC + ConnDisc Test: ECONBAD */ func TestConnEconDiscDone(t *testing.T) { n, _ := openConn(t) @@ -295,8 +295,8 @@ func TestConnEconDiscDone(t *testing.T) { _ = closeConn(t, n) // e = conn.Disconnect(empty_headers) - if e != EDISCPC { - t.Errorf("Previous disconnect expected [%v] got [%v]\n", EDISCPC, e) + if e != ECONBAD { + t.Errorf("Previous disconnect expected [%v] got [%v]\n", ECONBAD, e) } } diff --git a/data.go b/data.go index a61c5bf..111eaad 100644 --- a/data.go +++ b/data.go @@ -166,7 +166,7 @@ const ( EBDYDATA = Error("body data not allowed") // Not connected. - ECONBAD = Error("no current connection") + ECONBAD = Error("no current connection or DISCONNECT previously completed") // Destination required EREQDSTSND = Error("destination required, SEND") @@ -210,9 +210,6 @@ const ( // Invalid broker command EINVBCMD = Error("invalid broker command") - - // DISCONNET Already completed - EDISCPC = Error("disconnect previously completed") ) /* diff --git a/disconnect.go b/disconnect.go index 608d632..84a1427 100644 --- a/disconnect.go +++ b/disconnect.go @@ -49,13 +49,10 @@ func (c *Connection) Disconnect(h Headers) error { c.discLock.Lock() defer c.discLock.Unlock() // - if !c.connected { - return EDISCPC - } - c.log(DISCONNECT, "start", h) if !c.connected { return ECONBAD } + c.log(DISCONNECT, "start", h) e := checkHeaders(h, c.Protocol()) if e != nil { return e diff --git a/version.go b/version.go index 438363b..a2b4856 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.006" // Patch + patch = "3.plvl.007" // Patch ) func Version() string { From 1f14299065c7f217f22a2452f3eb05d80cb565fb Mon Sep 17 00:00:00 2001 From: gmallard Date: Tue, 2 Aug 2016 07:17:39 -0400 Subject: [PATCH 08/11] Make network protocol name a constant. --- connbv_test.go | 6 +++--- connect.go | 4 ++-- data.go | 4 ++++ doc.go | 2 +- utils_test.go | 2 +- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/connbv_test.go b/connbv_test.go index 716f997..3d11fb2 100644 --- a/connbv_test.go +++ b/connbv_test.go @@ -30,7 +30,7 @@ func TestConnBadVer10One(t *testing.T) { t.Skip("TestConnBadVer10One no 1.0 only servers available") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "1.1,2.0,3.14159", "host", h} ch = ch.AddHeaders(other_headers) @@ -53,7 +53,7 @@ func TestConnBadVer10Two(t *testing.T) { t.Skip("TestConnBadVer10Two norun, set STOMP_TEST11p") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "2.0,1.0,3.14159", "host", h} ch = ch.AddHeaders(other_headers) @@ -79,7 +79,7 @@ func TestConnBadVer10Three(t *testing.T) { t.Skip("TestConnBadVer10Three norun, set STOMP_TEST11p") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "4.5,3.14159", "host", h} ch = ch.AddHeaders(other_headers) diff --git a/connect.go b/connect.go index 26dc0f2..b2c4455 100644 --- a/connect.go +++ b/connect.go @@ -31,7 +31,7 @@ import ( Example: // Obtain a network connection - n, e := net.Dial("tcp", "localhost:61613") + n, e := net.Dial(NetProtoTCP, "localhost:61613") if e != nil { // Do something sane ... } @@ -44,7 +44,7 @@ import ( Example: // Obtain a network connection - n, e := net.Dial("tcp", "localhost:61613") + n, e := net.Dial(NetProtoTCP, "localhost:61613") if e != nil { // Do something sane ... } diff --git a/data.go b/data.go index 111eaad..d973716 100644 --- a/data.go +++ b/data.go @@ -287,3 +287,7 @@ type metrics struct { var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true} var logLock sync.Mutex + +const ( + NetProtoTCP = "tcp" // Protocol Name +) diff --git a/doc.go b/doc.go index 3eea8b6..cb01ac7 100644 --- a/doc.go +++ b/doc.go @@ -31,7 +31,7 @@ h := "localhost" p := "61613" - n, err := net.Dial("tcp", net.JoinHostPort(h, p)) + n, err := net.Dial(stompngo.NetProtoTCP, net.JoinHostPort(h, p)) if err != nil { // Do something sane ... } diff --git a/utils_test.go b/utils_test.go index 79ca7f8..39a6b0d 100644 --- a/utils_test.go +++ b/utils_test.go @@ -140,7 +140,7 @@ func getMessageData(sc <-chan MessageData, conn *Connection, t *testing.T) (md M func openConn(t *testing.T) (net.Conn, error) { h, p := senv.HostAndPort() hap := net.JoinHostPort(h, p) - n, err := net.Dial("tcp", hap) + n, err := net.Dial(NetProtoTCP, hap) if err != nil { t.Errorf("Unexpected net.Dial error: %v\n", err) } From f5baeb0896090bb6691ad5067a70bdefa40291d5 Mon Sep 17 00:00:00 2001 From: gmallard Date: Tue, 2 Aug 2016 13:01:00 -0400 Subject: [PATCH 09/11] Correct alphabetical ordering of environment variables. --- SENV.md | 18 +++++++++--------- version.go | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/SENV.md b/SENV.md index 8a307fe..f1b82df 100644 --- a/SENV.md +++ b/SENV.md @@ -48,31 +48,31 @@ Default: /queue/sng.sample.stomp.destination -STOMP_LOGIN +STOMP_HEARTBEATS -The login to be used by the client in the CONNECT frame.
-Default: guest +For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.
+Default: 0,0 -STOMP_HEARTBEATS +STOMP_HOST -For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.
-Default: 0,0 +The broker host to connect to.
+Default: localhost -STOMP_HOST +STOMP_LOGIN -The broker host to connect to.
-Default: localhost +The login to be used by the client in the CONNECT frame.
+Default: guest diff --git a/version.go b/version.go index a2b4856..d8e7e79 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.007" // Patch + patch = "3.plvl.008" // Patch ) func Version() string { From 9bc315808a1a9811d013d94f3439251893421dc6 Mon Sep 17 00:00:00 2001 From: gmallard Date: Wed, 3 Aug 2016 19:02:48 -0400 Subject: [PATCH 10/11] Part 1 of ? for subscription extenstion POC: - The idea is to let clients request that received messages be stopped after a specified number. - Certainly an extenstion to STOMP protocol, specific to this package. --- data.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++---- disconnect.go | 3 ++- hb_test.go | 2 +- reader.go | 20 ++++++++++++++++++-- subscribe.go | 17 +++++++++++++++++ version.go | 2 +- 6 files changed, 86 insertions(+), 9 deletions(-) diff --git a/data.go b/data.go index d973716..023ec6a 100644 --- a/data.go +++ b/data.go @@ -132,10 +132,13 @@ type Connection struct { } type subscription struct { - md chan MessageData // Subscription specific MessageData channel - id string // Subscription id (unique, self reference) - am string // ACK mode for this subscription - cs bool // Closed during shutdown + md chan MessageData // Subscription specific MessageData channel + id string // Subscription id (unique, self reference) + am string // ACK mode for this subscription + cs bool // Closed during shutdown + drav bool // Drain After value validity + dra uint // Start draining after # messages (MESSAGE frames) + drmc uint // Current drain count if draining } /* @@ -291,3 +294,43 @@ var logLock sync.Mutex const ( NetProtoTCP = "tcp" // Protocol Name ) + +/* + Commom Header keys +*/ +const ( + HK_ACCEPT_VERSION = "accept-version" + HK_ACK = "ack" + HK_CONTENT_TYPE = "content-type" + HK_CONTENT_LENGTH = "content-length" + HK_DEST = "destination" + HK_HEART_BEAT = "heart-beat" + HK_HOST = "host" + HK_ID = "id" + HK_LOGIN = "logon" + HK_MESSAGE = "message" + HK_MESSAGE_ID = "message-id" + HK_PASSCODE = "passcode" + HK_RECEIPT = "receipt" + HK_RECEIPT_ID = "receipt-id" + HK_SESSION = "session" + HK_SERVER = "server" + HK_TRANSACTION = "transaction" + HK_VERSION = "version" +) + +/* + ACK Modes +*/ +const ( + AckModeAuto = "auto" + AckModeClient = "client" + AckModeClientIndividual = "client-individual" +) + +/* + Extensions to STOMP protocol. +*/ +const ( + StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header +) diff --git a/disconnect.go b/disconnect.go index 84a1427..e27b7e2 100644 --- a/disconnect.go +++ b/disconnect.go @@ -80,8 +80,9 @@ func (c *Connection) Disconnect(h Headers) error { if !cwr && e == nil { // Receipt c.DisconnectReceipt = <-c.input - c.log(DISCONNECT, "end", ch, c.DisconnectReceipt) + c.log(DISCONNECT, "dr", ch, c.DisconnectReceipt) } + c.log(DISCONNECT, "ends", ch) c.rsd <- true return e } diff --git a/hb_test.go b/hb_test.go index 311dcd3..90075b3 100644 --- a/hb_test.go +++ b/hb_test.go @@ -24,7 +24,7 @@ import ( ) const ( - hbs = 30 + hbs = 45 ) /* diff --git a/reader.go b/reader.go index 289b46d..8e56b41 100644 --- a/reader.go +++ b/reader.go @@ -51,7 +51,9 @@ func (c *Connection) reader() { // Headers already decoded c.mets.tbr += m.Size(false) // Total bytes read md := MessageData{m, e} - // TODO ? Maybe ? Rethink this logic. + + // TODO START - can this be simplified ? Look cleaner ? + if sid, ok := f.Headers.Contains("subscription"); ok { // This is a read lock c.subsLock.RLock() @@ -61,16 +63,30 @@ func (c *Connection) reader() { if c.subs[sid].cs { c.log("RDR_CLSUB", sid, md) } else { - c.subs[sid].md <- md + if c.subs[sid].drav { + c.subs[sid].drmc++ + if c.subs[sid].drmc > c.subs[sid].dra { + c.log("RDR_DROPM", c.subs[sid].drmc, sid, md) + } else { + c.subs[sid].md <- md + } + } else { + c.subs[sid].md <- md + } } } else { c.log("RDR_NOSUB", sid, md) } c.subsLock.RUnlock() } else { + // RECEIPTs and ERRORs are never drained. They actually cannot + // be drained in any logical manner because they do not have a + // 'subscription' header. c.input <- md } + // TODO END + select { case q = <-c.rsd: default: diff --git a/subscribe.go b/subscribe.go index 4eedbcb..3240f40 100644 --- a/subscribe.go +++ b/subscribe.go @@ -18,6 +18,8 @@ package stompngo import ( "fmt" + "log" + "strconv" ) var _ = fmt.Println @@ -105,6 +107,9 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea sd := new(subscription) // New subscription data sd.cs = false // No shutdown yet + sd.drav = false // Drain after value validity + sd.dra = 0 // Never drain MESSAGE frames + sd.drmc = 0 // Current drain count lam := "auto" // Default/used ACK mode if ham, ok := h.Contains("ack"); ok { lam = ham // Reset (possible) used ack mode @@ -131,6 +136,18 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea sd.id = uuid1 // Set subscription ID to that } } + + // STOMP Protocol Enhancement + if dc, okda := h.Contains(StompPlusDrainAfter); okda { + n, e := strconv.ParseInt(dc, 10, 0) + if e != nil { + log.Printf("sng_drafter conversion error: %v\n", e) + } else { + sd.drav = true // Drain after value is OK + sd.dra = uint(n) // Drain after count + } + } + // This is a write lock c.subsLock.Lock() c.subs[sd.id] = sd // Add subscription to the connection subscription map diff --git a/version.go b/version.go index d8e7e79..d78be62 100644 --- a/version.go +++ b/version.go @@ -37,7 +37,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.008" // Patch + patch = "3.plvl.009" // Patch ) func Version() string { From 2d02c6e2d4e9d3a1549657643661ceeb709f942d Mon Sep 17 00:00:00 2001 From: gmallard Date: Wed, 3 Aug 2016 21:11:59 -0400 Subject: [PATCH 11/11] Version 1.0.4 --- version.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/version.go b/version.go index d78be62..7f8aacf 100644 --- a/version.go +++ b/version.go @@ -35,9 +35,9 @@ var ( minor = "0" // Minor - //patch = "3" // Patch + patch = "4" // Patch - patch = "3.plvl.009" // Patch + //patch = "4.plvl.001" // Patch ) func Version() string {