Skip to content

Commit

Permalink
Merge branch 'dev', Version 1.0.4:
Browse files Browse the repository at this point in the history
* dev:
  Version 1.0.4
  Part 1 of ? for subscription extenstion POC:
  Correct alphabetical ordering of environment variables.
  Make network protocol name a constant.
  Correct already disconnected check in disconnect.go.
  Make reader logging more useful.
  Use subscription read lock when appropriate.
  Shorten time the subscription map lock is held.
  Adjust heartbeat test timing.
  Modify how version display works.
  Start next dev branch.
  • Loading branch information
gmallard committed Aug 4, 2016
2 parents 548bb9a + 2d02c6e commit 0f0c6f1
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 56 deletions.
18 changes: 9 additions & 9 deletions SENV.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,31 @@ Default: /queue/sng.sample.stomp.destination

<tr>
<td style="border: 1px solid black;padding-left: 10px;" >
STOMP_LOGIN
STOMP_HEARTBEATS
</td>
<td style="border: 1px solid black;padding-left: 10px;" >
The login to be used by the client in the CONNECT frame.<br />
Default: guest
For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.<br />
Default: 0,0
</td>
</tr>

<tr>
<td style="border: 1px solid black;padding-left: 10px;" >
STOMP_HEARTBEATS
STOMP_HOST
</td>
<td style="border: 1px solid black;padding-left: 10px;" >
For protocol 1.1+, the heart-beat value to be used by the client in the CONNECT frame.<br />
Default: 0,0
The broker host to connect to.<br />
Default: localhost
</td>
</tr>

<tr>
<td style="border: 1px solid black;padding-left: 10px;" >
STOMP_HOST
STOMP_LOGIN
</td>
<td style="border: 1px solid black;padding-left: 10px;" >
The broker host to connect to.<br />
Default: localhost
The login to be used by the client in the CONNECT frame.<br />
Default: guest
</td>
</tr>

Expand Down
36 changes: 36 additions & 0 deletions cmd/stompngo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Copyright © 2016 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package main

/*
Provide package version information. A nod to the concept of semver.
Example:
fmt.Println("current stompngo version", stompngo.Version())
*/

import (
"fmt"
//
"github.com/gmallard/stompngo"
)

func main() {
fmt.Println(stompngo.Version())
return
}
6 changes: 3 additions & 3 deletions connbv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestConnBadVer10One(t *testing.T) {
t.Skip("TestConnBadVer10One no 1.0 only servers available")
}
h, p := badVerHostAndPort()
n, e := net.Dial("tcp", net.JoinHostPort(h, p))
n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p))
ch := TEST_HEADERS
other_headers := Headers{"accept-version", "1.1,2.0,3.14159", "host", h}
ch = ch.AddHeaders(other_headers)
Expand All @@ -53,7 +53,7 @@ func TestConnBadVer10Two(t *testing.T) {
t.Skip("TestConnBadVer10Two norun, set STOMP_TEST11p")
}
h, p := badVerHostAndPort()
n, e := net.Dial("tcp", net.JoinHostPort(h, p))
n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p))
ch := TEST_HEADERS
other_headers := Headers{"accept-version", "2.0,1.0,3.14159", "host", h}
ch = ch.AddHeaders(other_headers)
Expand All @@ -79,7 +79,7 @@ func TestConnBadVer10Three(t *testing.T) {
t.Skip("TestConnBadVer10Three norun, set STOMP_TEST11p")
}
h, p := badVerHostAndPort()
n, e := net.Dial("tcp", net.JoinHostPort(h, p))
n, e := net.Dial(NetProtoTCP, net.JoinHostPort(h, p))
ch := TEST_HEADERS
other_headers := Headers{"accept-version", "4.5,3.14159", "host", h}
ch = ch.AddHeaders(other_headers)
Expand Down
6 changes: 3 additions & 3 deletions conndisc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestConnEconBad(t *testing.T) {
}

/*
ConnDisc Test: EDISCPC
ConnDisc Test: ECONBAD
*/
func TestConnEconDiscDone(t *testing.T) {
n, _ := openConn(t)
Expand All @@ -295,8 +295,8 @@ func TestConnEconDiscDone(t *testing.T) {
_ = closeConn(t, n)
//
e = conn.Disconnect(empty_headers)
if e != EDISCPC {
t.Errorf("Previous disconnect expected [%v] got [%v]\n", EDISCPC, e)
if e != ECONBAD {
t.Errorf("Previous disconnect expected [%v] got [%v]\n", ECONBAD, e)
}
}

Expand Down
4 changes: 2 additions & 2 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
Example:
// Obtain a network connection
n, e := net.Dial("tcp", "localhost:61613")
n, e := net.Dial(NetProtoTCP, "localhost:61613")
if e != nil {
// Do something sane ...
}
Expand All @@ -44,7 +44,7 @@ import (
Example:
// Obtain a network connection
n, e := net.Dial("tcp", "localhost:61613")
n, e := net.Dial(NetProtoTCP, "localhost:61613")
if e != nil {
// Do something sane ...
}
Expand Down
60 changes: 52 additions & 8 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ type Connection struct {
}

type subscription struct {
md chan MessageData // Subscription specific MessageData channel
id string // Subscription id (unique, self reference)
am string // ACK mode for this subscription
cs bool // Closed during shutdown
md chan MessageData // Subscription specific MessageData channel
id string // Subscription id (unique, self reference)
am string // ACK mode for this subscription
cs bool // Closed during shutdown
drav bool // Drain After value validity
dra uint // Start draining after # messages (MESSAGE frames)
drmc uint // Current drain count if draining
}

/*
Expand Down Expand Up @@ -166,7 +169,7 @@ const (
EBDYDATA = Error("body data not allowed")

// Not connected.
ECONBAD = Error("no current connection")
ECONBAD = Error("no current connection or DISCONNECT previously completed")

// Destination required
EREQDSTSND = Error("destination required, SEND")
Expand Down Expand Up @@ -210,9 +213,6 @@ const (

// Invalid broker command
EINVBCMD = Error("invalid broker command")

// DISCONNET Already completed
EDISCPC = Error("disconnect previously completed")
)

/*
Expand Down Expand Up @@ -290,3 +290,47 @@ type metrics struct {
var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true}

var logLock sync.Mutex

const (
NetProtoTCP = "tcp" // Protocol Name
)

/*
Commom Header keys
*/
const (
HK_ACCEPT_VERSION = "accept-version"
HK_ACK = "ack"
HK_CONTENT_TYPE = "content-type"
HK_CONTENT_LENGTH = "content-length"
HK_DEST = "destination"
HK_HEART_BEAT = "heart-beat"
HK_HOST = "host"
HK_ID = "id"
HK_LOGIN = "logon"
HK_MESSAGE = "message"
HK_MESSAGE_ID = "message-id"
HK_PASSCODE = "passcode"
HK_RECEIPT = "receipt"
HK_RECEIPT_ID = "receipt-id"
HK_SESSION = "session"
HK_SERVER = "server"
HK_TRANSACTION = "transaction"
HK_VERSION = "version"
)

/*
ACK Modes
*/
const (
AckModeAuto = "auto"
AckModeClient = "client"
AckModeClientIndividual = "client-individual"
)

/*
Extensions to STOMP protocol.
*/
const (
StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
)
8 changes: 3 additions & 5 deletions disconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,10 @@ func (c *Connection) Disconnect(h Headers) error {
c.discLock.Lock()
defer c.discLock.Unlock()
//
if !c.connected {
return EDISCPC
}
c.log(DISCONNECT, "start", h)
if !c.connected {
return ECONBAD
}
c.log(DISCONNECT, "start", h)
e := checkHeaders(h, c.Protocol())
if e != nil {
return e
Expand Down Expand Up @@ -83,8 +80,9 @@ func (c *Connection) Disconnect(h Headers) error {
if !cwr && e == nil {
// Receipt
c.DisconnectReceipt = <-c.input
c.log(DISCONNECT, "end", ch, c.DisconnectReceipt)
c.log(DISCONNECT, "dr", ch, c.DisconnectReceipt)
}
c.log(DISCONNECT, "ends", ch)
c.rsd <- true
return e
}
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
h := "localhost"
p := "61613"
n, err := net.Dial("tcp", net.JoinHostPort(h, p))
n, err := net.Dial(stompngo.NetProtoTCP, net.JoinHostPort(h, p))
if err != nil {
// Do something sane ...
}
Expand Down
17 changes: 11 additions & 6 deletions hb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
hbs = 30
hbs = 45
)

/*
Expand Down Expand Up @@ -194,9 +194,11 @@ func TestHB11NoSend(t *testing.T) {
t.Skip("TestHB11NoSend norun, set STOMP_HB11LONG")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
n, _ := openConn(t)
ch := check11(TEST_HEADERS)
ch = ch.Add("heart-beat", "0,6000") // No sending
l.Printf("ConnHeaders: %v\n", ch)
conn, e := Connect(n, ch)
// Error checks
if e != nil {
Expand All @@ -209,7 +211,6 @@ func TestHB11NoSend(t *testing.T) {
t.Errorf("Receive Ticker is zero.")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
conn.SetLogger(l)
//
conn.log("TestHB11NoSend connect response", conn.ConnectResponse.Command,
Expand Down Expand Up @@ -242,9 +243,11 @@ func TestHB11NoReceive(t *testing.T) {
t.Skip("TestHB11NoReceive norun, set STOMP_HB11LONG")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
n, _ := openConn(t)
ch := check11(TEST_HEADERS)
ch = ch.Add("heart-beat", "10000,0") // No Receiving
l.Printf("ConnHeaders: %v\n", ch)
conn, e := Connect(n, ch)
// Error checks
if e != nil {
Expand All @@ -257,7 +260,6 @@ func TestHB11NoReceive(t *testing.T) {
t.Errorf("Send Ticker is zero.")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
conn.SetLogger(l)
//
conn.log("TestHB11NoReceive start sleep")
Expand Down Expand Up @@ -287,9 +289,11 @@ func TestHB11SendReceive(t *testing.T) {
t.Skip("TestHB11SendReceive norun, set STOMP_HB11LONG")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
n, _ := openConn(t)
ch := check11(TEST_HEADERS)
ch = ch.Add("heart-beat", "10000,6000")
l.Printf("ConnHeaders: %v\n", ch)
conn, e := Connect(n, ch)
// Error checks
if e != nil {
Expand All @@ -305,7 +309,6 @@ func TestHB11SendReceive(t *testing.T) {
t.Errorf("Send Ticker is zero.")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
conn.SetLogger(l)
//
conn.log("TestHB11SendReceive start sleep")
Expand Down Expand Up @@ -337,9 +340,11 @@ func TestHB11SendReceiveApollo(t *testing.T) {
t.Skip("TestHB11SendReceiveApollo norun, set STOMP_HB11LONG")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
n, _ := openConn(t)
ch := check11(TEST_HEADERS)
ch = ch.Add("heart-beat", "10000,100")
l.Printf("ConnHeaders: %v\n", ch)
conn, e := Connect(n, ch)
// Error checks
if e != nil {
Expand All @@ -355,7 +360,6 @@ func TestHB11SendReceiveApollo(t *testing.T) {
t.Errorf("Send Ticker is zero.")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
conn.SetLogger(l)
//
conn.log("TestHB11SendReceiveApollo start sleep")
Expand Down Expand Up @@ -391,9 +395,11 @@ func TestHB11SendReceiveApolloRev(t *testing.T) {
t.Skip("TestHB11SendReceiveApolloRev norun, skip AMQ11+")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
n, _ := openConn(t)
ch := check11(TEST_HEADERS)
ch = ch.Add("heart-beat", "100,10000")
l.Printf("ConnHeaders: %v\n", ch)
conn, e := Connect(n, ch)
// Error checks
if e != nil {
Expand All @@ -409,7 +415,6 @@ func TestHB11SendReceiveApolloRev(t *testing.T) {
t.Errorf("Send Ticker is zero.")
}
//
l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
conn.SetLogger(l)
//
conn.log("TestHB11SendReceiveApolloRev start sleep")
Expand Down
Loading

0 comments on commit 0f0c6f1

Please sign in to comment.