diff --git a/SENV.md b/SENV.md index 8a307fe..f1b82df 100644 --- a/SENV.md +++ b/SENV.md @@ -48,31 +48,31 @@ Default: /queue/sng.sample.stomp.destination -STOMP_LOGIN +STOMP_HEARTBEATS -The login to be used by the client in the CONNECT frame.
-Default: guest +For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.
+Default: 0,0 -STOMP_HEARTBEATS +STOMP_HOST -For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.
-Default: 0,0 +The broker host to connect to.
+Default: localhost -STOMP_HOST +STOMP_LOGIN -The broker host to connect to.
-Default: localhost +The login to be used by the client in the CONNECT frame.
+Default: guest diff --git a/cmd/stompngo/main.go b/cmd/stompngo/main.go new file mode 100644 index 0000000..b68c7f8 --- /dev/null +++ b/cmd/stompngo/main.go @@ -0,0 +1,36 @@ +// +// Copyright © 2016 Guy M. Allard +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +/* + Provide package version information. A nod to the concept of semver. + + Example: + fmt.Println("current stompngo version", stompngo.Version()) + +*/ + +import ( + "fmt" + // + "github.com/gmallard/stompngo" +) + +func main() { + fmt.Println(stompngo.Version()) + return +} diff --git a/connbv_test.go b/connbv_test.go index 716f997..3d11fb2 100644 --- a/connbv_test.go +++ b/connbv_test.go @@ -30,7 +30,7 @@ func TestConnBadVer10One(t *testing.T) { t.Skip("TestConnBadVer10One no 1.0 only servers available") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "1.1,2.0,3.14159", "host", h} ch = ch.AddHeaders(other_headers) @@ -53,7 +53,7 @@ func TestConnBadVer10Two(t *testing.T) { t.Skip("TestConnBadVer10Two norun, set STOMP_TEST11p") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "2.0,1.0,3.14159", "host", h} ch = ch.AddHeaders(other_headers) @@ -79,7 +79,7 @@ func TestConnBadVer10Three(t *testing.T) { t.Skip("TestConnBadVer10Three norun, set STOMP_TEST11p") } h, p := badVerHostAndPort() - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) + n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p)) ch := TEST_HEADERS other_headers := Headers{"accept-version", "4.5,3.14159", "host", h} ch = ch.AddHeaders(other_headers) diff --git a/conndisc_test.go b/conndisc_test.go index 9f363f9..c8defb5 100644 --- a/conndisc_test.go +++ b/conndisc_test.go @@ -285,7 +285,7 @@ func TestConnEconBad(t *testing.T) { } /* - ConnDisc Test: EDISCPC + ConnDisc Test: ECONBAD */ func TestConnEconDiscDone(t *testing.T) { n, _ := openConn(t) @@ -295,8 +295,8 @@ func TestConnEconDiscDone(t *testing.T) { _ = closeConn(t, n) // e = conn.Disconnect(empty_headers) - if e != EDISCPC { - t.Errorf("Previous disconnect expected [%v] got [%v]\n", EDISCPC, e) + if e != ECONBAD { + t.Errorf("Previous disconnect expected [%v] got [%v]\n", ECONBAD, e) } } diff --git a/connect.go b/connect.go index 26dc0f2..b2c4455 100644 --- a/connect.go +++ b/connect.go @@ -31,7 +31,7 @@ import ( Example: // Obtain a network connection - n, e := net.Dial("tcp", "localhost:61613") + n, e := net.Dial(NetProtoTCP, "localhost:61613") if e != nil { // Do something sane ... } @@ -44,7 +44,7 @@ import ( Example: // Obtain a network connection - n, e := net.Dial("tcp", "localhost:61613") + n, e := net.Dial(NetProtoTCP, "localhost:61613") if e != nil { // Do something sane ... } diff --git a/data.go b/data.go index a61c5bf..023ec6a 100644 --- a/data.go +++ b/data.go @@ -132,10 +132,13 @@ type Connection struct { } type subscription struct { - md chan MessageData // Subscription specific MessageData channel - id string // Subscription id (unique, self reference) - am string // ACK mode for this subscription - cs bool // Closed during shutdown + md chan MessageData // Subscription specific MessageData channel + id string // Subscription id (unique, self reference) + am string // ACK mode for this subscription + cs bool // Closed during shutdown + drav bool // Drain After value validity + dra uint // Start draining after # messages (MESSAGE frames) + drmc uint // Current drain count if draining } /* @@ -166,7 +169,7 @@ const ( EBDYDATA = Error("body data not allowed") // Not connected. - ECONBAD = Error("no current connection") + ECONBAD = Error("no current connection or DISCONNECT previously completed") // Destination required EREQDSTSND = Error("destination required, SEND") @@ -210,9 +213,6 @@ const ( // Invalid broker command EINVBCMD = Error("invalid broker command") - - // DISCONNET Already completed - EDISCPC = Error("disconnect previously completed") ) /* @@ -290,3 +290,47 @@ type metrics struct { var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true} var logLock sync.Mutex + +const ( + NetProtoTCP = "tcp" // Protocol Name +) + +/* + Commom Header keys +*/ +const ( + HK_ACCEPT_VERSION = "accept-version" + HK_ACK = "ack" + HK_CONTENT_TYPE = "content-type" + HK_CONTENT_LENGTH = "content-length" + HK_DEST = "destination" + HK_HEART_BEAT = "heart-beat" + HK_HOST = "host" + HK_ID = "id" + HK_LOGIN = "logon" + HK_MESSAGE = "message" + HK_MESSAGE_ID = "message-id" + HK_PASSCODE = "passcode" + HK_RECEIPT = "receipt" + HK_RECEIPT_ID = "receipt-id" + HK_SESSION = "session" + HK_SERVER = "server" + HK_TRANSACTION = "transaction" + HK_VERSION = "version" +) + +/* + ACK Modes +*/ +const ( + AckModeAuto = "auto" + AckModeClient = "client" + AckModeClientIndividual = "client-individual" +) + +/* + Extensions to STOMP protocol. +*/ +const ( + StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header +) diff --git a/disconnect.go b/disconnect.go index 608d632..e27b7e2 100644 --- a/disconnect.go +++ b/disconnect.go @@ -49,13 +49,10 @@ func (c *Connection) Disconnect(h Headers) error { c.discLock.Lock() defer c.discLock.Unlock() // - if !c.connected { - return EDISCPC - } - c.log(DISCONNECT, "start", h) if !c.connected { return ECONBAD } + c.log(DISCONNECT, "start", h) e := checkHeaders(h, c.Protocol()) if e != nil { return e @@ -83,8 +80,9 @@ func (c *Connection) Disconnect(h Headers) error { if !cwr && e == nil { // Receipt c.DisconnectReceipt = <-c.input - c.log(DISCONNECT, "end", ch, c.DisconnectReceipt) + c.log(DISCONNECT, "dr", ch, c.DisconnectReceipt) } + c.log(DISCONNECT, "ends", ch) c.rsd <- true return e } diff --git a/doc.go b/doc.go index 3eea8b6..cb01ac7 100644 --- a/doc.go +++ b/doc.go @@ -31,7 +31,7 @@ h := "localhost" p := "61613" - n, err := net.Dial("tcp", net.JoinHostPort(h, p)) + n, err := net.Dial(stompngo.NetProtoTCP, net.JoinHostPort(h, p)) if err != nil { // Do something sane ... } diff --git a/hb_test.go b/hb_test.go index 6714f13..90075b3 100644 --- a/hb_test.go +++ b/hb_test.go @@ -24,7 +24,7 @@ import ( ) const ( - hbs = 30 + hbs = 45 ) /* @@ -194,9 +194,11 @@ func TestHB11NoSend(t *testing.T) { t.Skip("TestHB11NoSend norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "0,6000") // No sending + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -209,7 +211,6 @@ func TestHB11NoSend(t *testing.T) { t.Errorf("Receive Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11NoSend connect response", conn.ConnectResponse.Command, @@ -242,9 +243,11 @@ func TestHB11NoReceive(t *testing.T) { t.Skip("TestHB11NoReceive norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,0") // No Receiving + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -257,7 +260,6 @@ func TestHB11NoReceive(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11NoReceive start sleep") @@ -287,9 +289,11 @@ func TestHB11SendReceive(t *testing.T) { t.Skip("TestHB11SendReceive norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,6000") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -305,7 +309,6 @@ func TestHB11SendReceive(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceive start sleep") @@ -337,9 +340,11 @@ func TestHB11SendReceiveApollo(t *testing.T) { t.Skip("TestHB11SendReceiveApollo norun, set STOMP_HB11LONG") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "10000,100") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -355,7 +360,6 @@ func TestHB11SendReceiveApollo(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceiveApollo start sleep") @@ -391,9 +395,11 @@ func TestHB11SendReceiveApolloRev(t *testing.T) { t.Skip("TestHB11SendReceiveApolloRev norun, skip AMQ11+") } // + l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) n, _ := openConn(t) ch := check11(TEST_HEADERS) ch = ch.Add("heart-beat", "100,10000") + l.Printf("ConnHeaders: %v\n", ch) conn, e := Connect(n, ch) // Error checks if e != nil { @@ -409,7 +415,6 @@ func TestHB11SendReceiveApolloRev(t *testing.T) { t.Errorf("Send Ticker is zero.") } // - l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) conn.SetLogger(l) // conn.log("TestHB11SendReceiveApolloRev start sleep") diff --git a/reader.go b/reader.go index ed56cf9..8e56b41 100644 --- a/reader.go +++ b/reader.go @@ -34,7 +34,7 @@ func (c *Connection) reader() { for { f, e := c.readFrame() - c.log("RDR_READFRAME", f, "RDR_RF_ERR", e) + c.log("RDR_RECEIVE_FRAME", f, "RDR_RECEIVE_ERR", e) if e != nil { f.Headers = append(f.Headers, "connection_read_error", e.Error()) md := MessageData{Message(f), e} @@ -51,7 +51,9 @@ func (c *Connection) reader() { // Headers already decoded c.mets.tbr += m.Size(false) // Total bytes read md := MessageData{m, e} - // TODO ? Maybe ? Rethink this logic. + + // TODO START - can this be simplified ? Look cleaner ? + if sid, ok := f.Headers.Contains("subscription"); ok { // This is a read lock c.subsLock.RLock() @@ -61,17 +63,29 @@ func (c *Connection) reader() { if c.subs[sid].cs { c.log("RDR_CLSUB", sid, md) } else { - c.subs[sid].md <- md + if c.subs[sid].drav { + c.subs[sid].drmc++ + if c.subs[sid].drmc > c.subs[sid].dra { + c.log("RDR_DROPM", c.subs[sid].drmc, sid, md) + } else { + c.subs[sid].md <- md + } + } else { + c.subs[sid].md <- md + } } } else { c.log("RDR_NOSUB", sid, md) } c.subsLock.RUnlock() } else { + // RECEIPTs and ERRORs are never drained. They actually cannot + // be drained in any logical manner because they do not have a + // 'subscription' header. c.input <- md } - c.log("RDR_RECEIVE", m.Command, m.Headers) + // TODO END select { case q = <-c.rsd: diff --git a/sub_test.go b/sub_test.go index 3f1f550..f03b569 100644 --- a/sub_test.go +++ b/sub_test.go @@ -17,7 +17,7 @@ package stompngo import ( - //"log" + "log" "os" "testing" "time" @@ -81,6 +81,7 @@ func TestSubNoIdTwice10(t *testing.T) { t.Skip("TestSubNoIdTwice10 norun, need 1.0") } + t.Log("TestSubNoIdTwice10", "starts") n, _ := openConn(t) ch := check11(TEST_HEADERS) conn, _ := Connect(n, ch) @@ -157,45 +158,55 @@ func TestSubNoIdTwice11p(t *testing.T) { t.Skip("TestSubNoIdTwice11p norun, need 1.1+") } + t.Log("TestSubNoIdTwice11p", "starts") n, _ := openConn(t) ch := check11(TEST_HEADERS) conn, _ := Connect(n, ch) + l := log.New(os.Stdout, "TSNI211P ", log.Ldate|log.Lmicroseconds) + conn.SetLogger(l) d := tdest("/queue/subdup.p11.01") id := "TestSubNoIdTwice11p" sbh := Headers{"destination", d, "id", id} // First time + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - start 1st SUBSCRIBE") sc, e := conn.Subscribe(sbh) + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - end 1st SUBSCRIBE") if e != nil { - t.Errorf("Expected no subscribe error (T1), got [%v]\n", e) + t.Errorf("ERROR Expected no subscribe error (T1), got [%v]\n", e) } if sc == nil { - t.Errorf("Expected subscribe channel (T1), got [nil]\n") + t.Errorf("ERROR Expected subscribe channel (T2), got [nil]\n") } time.Sleep(500 * time.Millisecond) // give a broker a break select { case v := <-sc: - t.Logf("Unexpected frame received (T1), got [%v]\n", v) + t.Errorf("ERROR Unexpected frame received (T3), got [%v]\n", v) case v := <-conn.MessageData: - t.Logf("Unexpected frame received (T1), got [%v]\n", v) + t.Errorf("ERROR Unexpected frame received (T4), got [%v]\n", v) default: } // Second time. The stompngo package maintains a list of all current // subscription ids. An attempt to subscribe using an existing id is // immediately rejected by the package (never actually sent to the broker). + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - start 2nd SUBSCRIBE") sc, e = conn.Subscribe(sbh) + t.Logf("%s\n", "INFO TestSubNoIdTwice11p - end 2nd SUBSCRIBE") if e == nil { - t.Errorf("Expected subscribe error (T2), got [nil]\n") + t.Errorf("ERROR Expected subscribe error (T5), got [nil]\n") } if e != EDUPSID { - t.Errorf("Expected subscribe error (T2), [%v] got [%v]\n", EDUPSID, e) + t.Errorf("ERROR Expected subscribe error (T6), [%v] got [%v]\n", EDUPSID, e) + } else { + t.Logf("INFO wanted/got actual (T7), [%v]\n", e) } if sc != nil { - t.Errorf("Expected nil subscribe channel (T1), got [%v]\n", sc) + t.Errorf("ERROR Expected nil subscribe channel (T8), got [%v]\n", sc) } _ = conn.Disconnect(empty_headers) _ = closeConn(t, n) + t.Log("TestSubNoIdTwice11p", "ends") } /* diff --git a/subscribe.go b/subscribe.go index 072cfb9..3240f40 100644 --- a/subscribe.go +++ b/subscribe.go @@ -18,6 +18,8 @@ package stompngo import ( "fmt" + "log" + "strconv" ) var _ = fmt.Println @@ -82,26 +84,32 @@ func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) { Handle subscribe id. */ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) { - // This is a write lock - c.subsLock.Lock() - defer c.subsLock.Unlock() + // c.log(SUBSCRIBE, "start establishSubscription") // id, hid := h.Contains("id") uuid1 := Uuid() + // + c.subsLock.RLock() // Acquire Read lock // No duplicates if hid { if _, q := c.subs[id]; q { + c.subsLock.RUnlock() // Release Read lock return nil, EDUPSID, h // Duplicate subscriptions not allowed } } else { if _, q := c.subs[uuid1]; q { + c.subsLock.RUnlock() // Release Read lock return nil, EDUPSID, h // Duplicate subscriptions not allowed } } + c.subsLock.RUnlock() // Release Read lock // sd := new(subscription) // New subscription data sd.cs = false // No shutdown yet + sd.drav = false // Drain after value validity + sd.dra = 0 // Never drain MESSAGE frames + sd.drmc = 0 // Current drain count lam := "auto" // Default/used ACK mode if ham, ok := h.Contains("ack"); ok { lam = ham // Reset (possible) used ack mode @@ -128,6 +136,22 @@ func (c *Connection) establishSubscription(h Headers) (*subscription, error, Hea sd.id = uuid1 // Set subscription ID to that } } + + // STOMP Protocol Enhancement + if dc, okda := h.Contains(StompPlusDrainAfter); okda { + n, e := strconv.ParseInt(dc, 10, 0) + if e != nil { + log.Printf("sng_drafter conversion error: %v\n", e) + } else { + sd.drav = true // Drain after value is OK + sd.dra = uint(n) // Drain after count + } + } + + // This is a write lock + c.subsLock.Lock() c.subs[sd.id] = sd // Add subscription to the connection subscription map - return sd, nil, h // Return the subscription pointer + c.subsLock.Unlock() + //c.log(SUBSCRIBE, "end establishSubscription") + return sd, nil, h // Return the subscription pointer } diff --git a/utils_test.go b/utils_test.go index 79ca7f8..39a6b0d 100644 --- a/utils_test.go +++ b/utils_test.go @@ -140,7 +140,7 @@ func getMessageData(sc <-chan MessageData, conn *Connection, t *testing.T) (md M func openConn(t *testing.T) (net.Conn, error) { h, p := senv.HostAndPort() hap := net.JoinHostPort(h, p) - n, err := net.Dial("tcp", hap) + n, err := net.Dial(NetProtoTCP, hap) if err != nil { t.Errorf("Unexpected net.Dial error: %v\n", err) } diff --git a/version.go b/version.go index f823619..7f8aacf 100644 --- a/version.go +++ b/version.go @@ -35,9 +35,9 @@ var ( minor = "0" // Minor - patch = "3" // Patch + patch = "4" // Patch - //patch = "3.plvl.001" // Patch + //patch = "4.plvl.001" // Patch ) func Version() string {