Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Establish tls without negotiating when protocol scheme is amqps
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Menzhinsky authored and vcabbage committed Jan 28, 2018
1 parent 9d0e0f3 commit 746c1d6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 31 deletions.
46 changes: 26 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package amqp
import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"math"
Expand All @@ -24,49 +25,54 @@ type Client struct {
// Dial connects to an AMQP server.
//
// If the addr includes a scheme, it must be "amqp" or "amqps".
// TLS will be negotiated when the scheme is "amqps".
//
// If no port is provided, 5672 will be used.
// If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps".
func Dial(addr string, opts ...ConnOption) (*Client, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}

host, port, err := net.SplitHostPort(u.Host)
if err != nil {
host = u.Host
port = "5672" // use default AMQP if parse fails
}

switch u.Scheme {
case "amqp", "amqps", "":
default:
return nil, errorErrorf("unsupported scheme %q", u.Scheme)
}

conn, err := net.Dial("tcp", host+":"+port)
if err != nil {
return nil, err
port = "5672" // use default port values if parse fails
if u.Scheme == "amqps" {
port = "5671"
}
}

// append default options so user specified can overwrite
opts = append([]ConnOption{
ConnServerHostname(host),
ConnTLS(u.Scheme == "amqps"),
}, opts...)

c, err := New(conn, opts...)
c, err := newConn(nil, opts...)
if err != nil {
return nil, err
}

return c, err
switch u.Scheme {
case "amqp", "":
c.net, err = net.Dial("tcp", host+":"+port)
case "amqps":
c.initTLSConfig()
c.tlsNegotiation = false
c.net, err = tls.Dial("tcp", host+":"+port, c.tlsConfig)
default:
return nil, errorErrorf("unsupported scheme %q", u.Scheme)
}
if err != nil {
return nil, err
}
err = c.start()
return &Client{conn: c}, err
}

// New establishes an AMQP client connection over conn.
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
if err != nil {
return nil, err
}
err = c.start()
return &Client{conn: c}, err
}

Expand Down
29 changes: 18 additions & 11 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,22 @@ func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
return nil, err
}
}
return c, nil
}

func (c *conn) initTLSConfig() {
// create a new config if not already set
if c.tlsConfig == nil {
c.tlsConfig = new(tls.Config)
}

// TLS config must have ServerName or InsecureSkipVerify set
if c.tlsConfig.ServerName == "" && !c.tlsConfig.InsecureSkipVerify {
c.tlsConfig.ServerName = c.hostname
}
}

func (c *conn) start() error {
// start reader
go c.connReader()

Expand All @@ -220,14 +235,14 @@ func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
if c.err != nil {
close(c.txDone) // close here since connWriter hasn't been started yet
c.Close()
return nil, c.err
return c.err
}

// start multiplexor and writer
go c.mux()
go c.connWriter()

return c, nil
return nil
}

func (c *conn) Close() error {
Expand Down Expand Up @@ -684,15 +699,7 @@ func (c *conn) readProtoHeader() (protoHeader, error) {

// startTLS wraps the conn with TLS and returns to Client.negotiateProto
func (c *conn) startTLS() stateFunc {
// create a new config if not already set
if c.tlsConfig == nil {
c.tlsConfig = new(tls.Config)
}

// TLS config must have ServerName or InsecureSkipVerify set
if c.tlsConfig.ServerName == "" && !c.tlsConfig.InsecureSkipVerify {
c.tlsConfig.ServerName = c.hostname
}
c.initTLSConfig()

// convoluted method to pause connReader, explorer simpler alternatives
c.resumeRead = make(chan struct{}) // 1. create channel
Expand Down

0 comments on commit 746c1d6

Please sign in to comment.