Skip to content

Commit

Permalink
Add connectionbroker package
Browse files Browse the repository at this point in the history
This package is a small abstraction on top of Remotes that provides a
gRPC connection to a manager. If running on a manager, it uses the unix
socket, otherwise it will pick a remote manager using Remotes. This
will allow things like agent and certificate renewal to work even on a
single node with no TCP port bound.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Jan 9, 2017
1 parent 80c66b0 commit e32ca49
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions connectionbroker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Package connectionbroker is a layer on top of remotes that returns
// a gRPC connection to a manager. The connection may be a local connection
// using a local socket such as a UNIX socket.
package connectionbroker

import (
"sync"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
"google.golang.org/grpc"
)

// Broker is a simple connection broker. It can either return a fresh
// connection to a remote manager selected with weighted randomization, or a
// local gRPC connection to the local manager.
type Broker struct {
mu sync.Mutex
remotes remotes.Remotes
localConn *grpc.ClientConn
}

// New creates a new connection broker.
func New(remotes remotes.Remotes) *Broker {
return &Broker{
remotes: remotes,
}
}

// SetLocalConn changes the local gRPC connection used by the connection broker.
func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
b.mu.Lock()
defer b.mu.Unlock()

b.localConn = localConn
}

// Select a manager from the set of available managers, and return a connection.
func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
b.mu.Lock()
localConn := b.localConn
b.mu.Unlock()

if localConn != nil {
return &Conn{
ClientConn: localConn,
isLocal: true,
}, nil
}

return b.SelectRemote(dialOpts...)
}

// SelectRemote chooses a manager from the remotes, and returns a TCP
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()
if err != nil {
return nil, err
}

cc, err := grpc.Dial(peer.Addr, dialOpts...)
if err != nil {
b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
return nil, err
}

return &Conn{
ClientConn: cc,
remotes: b.remotes,
peer: peer,
}, nil
}

// Remotes returns the remotes interface used by the broker, so the caller
// can make observations or see weights directly.
func (b *Broker) Remotes() remotes.Remotes {
return b.remotes
}

// Conn is a wrapper around a gRPC client connection.
type Conn struct {
*grpc.ClientConn
isLocal bool
remotes remotes.Remotes
peer api.Peer
}

// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
// Close is a noop.
func (c *Conn) Close(success bool) error {
if c.isLocal {
return nil
}

if success {
c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
} else {
c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
}

return c.ClientConn.Close()
}

0 comments on commit e32ca49

Please sign in to comment.