Skip to content
This repository has been archived by the owner on May 1, 2020. It is now read-only.

Commit

Permalink
Implement the autoconnect feature
Browse files Browse the repository at this point in the history
Autoconnect is an optional feature, enabled by default,
that makes client.Request, client.TimedRequest and
client.RestoreSession automatically try to establish
a connection if there's none until the timeout is reached.
Also make client.Signal automatically try to connect
without retrials directly returning a DisconnectedErr
if the connection couldn't be established at the first trial.

Add a new server-side hook: BeforeUpgrade to be able
to intercept connection attempts and either prevent, delay
or monitor them. This hook is rather useful for testing.
  • Loading branch information
romshark committed Mar 21, 2018
1 parent 37e0a86 commit aad55e7
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 151 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (clt *Client) write(data []byte) error {
defer clt.connLock.Unlock()
if !clt.connected {
return DisconnectedErr{
msg: "Can't write to a disconnected client agent",
cause: fmt.Errorf("Can't write to a disconnected client agent"),
}
}
return clt.conn.WriteMessage(websocket.BinaryMessage, data)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (clt *Client) CreateSession(attachment interface{}) error {
if !clt.connected {
clt.connLock.RUnlock()
return DisconnectedErr{
msg: "Can't create session on disconnected client agent",
cause: fmt.Errorf("Can't create session on disconnected client agent"),
}
}
clt.connLock.RUnlock()
Expand Down
124 changes: 36 additions & 88 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"fmt"
"log"
"net/url"
"sync"
"time"

Expand All @@ -19,10 +18,12 @@ const supportedProtocolVersion = "1.2"

// Client represents an instance of one of the servers clients
type Client struct {
serverAddr string
isConnected int32
defaultTimeout time.Duration
hooks Hooks
serverAddr string
isConnected int32
defaultReqTimeout time.Duration
reconnInterval time.Duration
autoconnect bool
hooks Hooks

sessionLock sync.RWMutex
session *webwire.Session
Expand All @@ -32,9 +33,10 @@ type Client struct {
// because performing multiple requests and/or signals simultaneously is fine.
// The Connect, RestoreSession, CloseSession and Close methods are locked exclusively
// because they should temporarily block any other interaction with this client instance.
apiLock sync.RWMutex
connLock sync.Mutex
conn *websocket.Conn
apiLock sync.RWMutex
connectLock sync.Mutex
connLock sync.Mutex
conn *websocket.Conn

requestManager reqman.RequestManager

Expand All @@ -47,17 +49,25 @@ type Client struct {
func NewClient(serverAddress string, opts Options) *Client {
opts.SetDefaults()

autoconnect := true
if opts.Autoconnect == OptDisabled {
autoconnect = false
}

return &Client{
serverAddress,
0,
opts.DefaultRequestTimeout,
opts.ReconnectionInterval,
autoconnect,
opts.Hooks,

sync.RWMutex{},
nil,

sync.RWMutex{},
sync.Mutex{},
sync.Mutex{},
nil,

reqman.NewRequestManager(),
Expand All @@ -83,82 +93,8 @@ func (clt *Client) IsConnected() bool {
// Connect connects the client to the configured server and
// returns an error in case of a connection failure.
// Automatically tries to restore the previous session
func (clt *Client) Connect() (err error) {
clt.apiLock.Lock()
defer clt.apiLock.Unlock()

if atomic.LoadInt32(&clt.isConnected) > 0 {
return nil
}

if err := clt.verifyProtocolVersion(); err != nil {
return err
}

connURL := url.URL{Scheme: "ws", Host: clt.serverAddr, Path: "/"}

clt.connLock.Lock()
clt.conn, _, err = websocket.DefaultDialer.Dial(connURL.String(), nil)
if err != nil {
return webwire.NewConnDialErr(err)
}
clt.connLock.Unlock()

// Setup reader thread
go func() {
defer clt.close()
for {
_, message, err := clt.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
// Error while reading message
clt.errorLog.Print("Failed reading message:", err)
break
} else {
// Shutdown client due to clean disconnection
break
}
}
// Try to handle the message
if err = clt.handleMessage(message); err != nil {
clt.warningLog.Print("Failed handling message:", err)
}
}
}()

atomic.StoreInt32(&clt.isConnected, 1)

// Read the current sessions key if there is any
clt.sessionLock.RLock()
if clt.session == nil {
clt.sessionLock.RUnlock()
return nil
}
sessionKey := clt.session.Key
clt.sessionLock.RUnlock()

// Try to restore session if necessary
restoredSession, err := clt.requestSessionRestoration([]byte(sessionKey))
if err != nil {
// Just log a warning and still return nil, even if session restoration failed,
// because we only care about the connection establishment in this method
clt.warningLog.Printf("Couldn't restore session on reconnection: %s", err)

// Reset the session
clt.sessionLock.Lock()
clt.session = nil
clt.sessionLock.Unlock()
return nil
}

clt.sessionLock.Lock()
clt.session = restoredSession
clt.sessionLock.Unlock()
return nil
func (clt *Client) Connect() error {
return clt.connect()
}

// Request sends a request containing the given payload to the server
Expand All @@ -172,14 +108,18 @@ func (clt *Client) Request(
clt.apiLock.RLock()
defer clt.apiLock.RUnlock()

if err := clt.tryAutoconnect(clt.defaultReqTimeout); err != nil {
return webwire.Payload{}, err
}

reqType := webwire.MsgRequestBinary
switch payload.Encoding {
case webwire.EncodingUtf8:
reqType = webwire.MsgRequestUtf8
case webwire.EncodingUtf16:
reqType = webwire.MsgRequestUtf16
}
return clt.sendRequest(reqType, name, payload, clt.defaultTimeout)
return clt.sendRequest(reqType, name, payload, clt.defaultReqTimeout)
}

// TimedRequest sends a request containing the given payload to the server
Expand All @@ -195,6 +135,10 @@ func (clt *Client) TimedRequest(
clt.apiLock.RLock()
defer clt.apiLock.RUnlock()

if err := clt.tryAutoconnect(timeout); err != nil {
return webwire.Payload{}, err
}

reqType := webwire.MsgRequestBinary
switch payload.Encoding {
case webwire.EncodingUtf8:
Expand All @@ -210,8 +154,8 @@ func (clt *Client) Signal(name string, payload webwire.Payload) error {
clt.apiLock.RLock()
defer clt.apiLock.RUnlock()

if atomic.LoadInt32(&clt.isConnected) < 1 {
return DisconnectedErr{}
if err := clt.connect(); err != nil {
return err
}

msgBytes := webwire.NewSignalMessage(name, payload)
Expand Down Expand Up @@ -269,6 +213,10 @@ func (clt *Client) RestoreSession(sessionKey []byte) error {
}
clt.sessionLock.RUnlock()

if err := clt.tryAutoconnect(clt.defaultReqTimeout); err != nil {
return err
}

restoredSession, err := clt.requestSessionRestoration(sessionKey)
if err != nil {
return err
Expand Down Expand Up @@ -300,7 +248,7 @@ func (clt *Client) CloseSession() error {
if _, err := clt.sendNamelessRequest(
webwire.MsgCloseSession,
webwire.Payload{},
clt.defaultTimeout,
clt.defaultReqTimeout,
); err != nil {
return err
}
Expand Down
87 changes: 87 additions & 0 deletions client/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package client

import (
"fmt"
"net/url"
"sync/atomic"

"github.com/gorilla/websocket"
webwire "github.com/qbeon/webwire-go"
)

func (clt *Client) connect() (err error) {
clt.connectLock.Lock()
defer clt.connectLock.Unlock()
if atomic.LoadInt32(&clt.isConnected) > 0 {
return nil
}

if err := clt.verifyProtocolVersion(); err != nil {
return err
}

connURL := url.URL{Scheme: "ws", Host: clt.serverAddr, Path: "/"}

clt.connLock.Lock()
clt.conn, _, err = websocket.DefaultDialer.Dial(connURL.String(), nil)
if err != nil {
return webwire.NewDisconnectedErr(fmt.Errorf("Dial failure: %s", err))
}
clt.connLock.Unlock()

// Setup reader thread
go func() {
defer clt.close()
for {
_, message, err := clt.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
// Error while reading message
clt.errorLog.Print("Failed reading message:", err)
break
} else {
// Shutdown client due to clean disconnection
break
}
}
// Try to handle the message
if err = clt.handleMessage(message); err != nil {
clt.warningLog.Print("Failed handling message:", err)
}
}
}()

atomic.StoreInt32(&clt.isConnected, 1)

// Read the current sessions key if there is any
clt.sessionLock.RLock()
if clt.session == nil {
clt.sessionLock.RUnlock()
return nil
}
sessionKey := clt.session.Key
clt.sessionLock.RUnlock()

// Try to restore session if necessary
restoredSession, err := clt.requestSessionRestoration([]byte(sessionKey))
if err != nil {
// Just log a warning and still return nil, even if session restoration failed,
// because we only care about the connection establishment in this method
clt.warningLog.Printf("Couldn't restore session on reconnection: %s", err)

// Reset the session
clt.sessionLock.Lock()
clt.session = nil
clt.sessionLock.Unlock()
return nil
}

clt.sessionLock.Lock()
clt.session = restoredSession
clt.sessionLock.Unlock()
return nil
}
8 changes: 0 additions & 8 deletions client/errors.go

This file was deleted.

43 changes: 40 additions & 3 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,59 @@ import (
"time"
)

// OptionToggle represents the value of a togglable option
type OptionToggle int

const (
// OptUnset defines unset togglable options
OptUnset OptionToggle = iota

// OptDisabled defines disabled togglable options
OptDisabled

// OptEnabled defines enabled togglable options
OptEnabled
)

// Options represents the options used during the creation a new client instance
type Options struct {
Hooks Hooks
// Hooks define the callback hook functions provided by the user to define behavior
// on certain events
Hooks Hooks

// DefaultRequestTimeout defines the default request timeout duration used in client.Request
DefaultRequestTimeout time.Duration
WarnLog io.Writer
ErrorLog io.Writer

// ReconnectionInterval defines the interval at which autoconnect should poll for a connection.
// If undefined then the default value of 2 seconds is applied
ReconnectionInterval time.Duration

// If autoconnect is enabled, client.Request, client.TimedRequest and client.RestoreSession
// won't immediately return a disconnected error if there's no active connection to the server,
// instead they will automatically try to reestablish the connection
// before the timeout is triggered and a timeout error is returned.
// Autoconnect is enabled by default
Autoconnect OptionToggle
WarnLog io.Writer
ErrorLog io.Writer
}

// SetDefaults sets default values for undefined required options
func (opts *Options) SetDefaults() {
opts.Hooks.SetDefaults()

if opts.Autoconnect == OptUnset {
opts.Autoconnect = OptEnabled
}

if opts.DefaultRequestTimeout < 1 {
opts.DefaultRequestTimeout = 60 * time.Second
}

if opts.ReconnectionInterval < 1 {
opts.ReconnectionInterval = 2 * time.Second
}

if opts.WarnLog == nil {
opts.WarnLog = os.Stdout
}
Expand Down
2 changes: 1 addition & 1 deletion client/requestSessionRestoration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (clt *Client) requestSessionRestoration(sessionKey []byte) (*webwire.Sessio
Encoding: webwire.EncodingBinary,
Data: sessionKey,
},
clt.defaultTimeout,
clt.defaultReqTimeout,
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit aad55e7

Please sign in to comment.