Skip to content

Commit

Permalink
Merge pull request #1851 from aaronlehmann/use-connectionbroker
Browse files Browse the repository at this point in the history
Convert code to use connectionbroker package
  • Loading branch information
aaronlehmann authored Jan 10, 2017
2 parents c80ff0c + d15f8ca commit 69ea950
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 146 deletions.
10 changes: 4 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,10 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
log.G(ctx).WithField("manager.addr", manager.Peer.Addr).
Warnf("skipping bad manager address")
continue
}

a.config.Managers.Observe(*manager.Peer, int(manager.Weight))
a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight))
seen[*manager.Peer] = struct{}{}
}

Expand All @@ -358,9 +356,9 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
}

// prune managers not in list.
for peer := range a.config.Managers.Weights() {
for peer := range a.config.ConnBroker.Remotes().Weights() {
if _, ok := seen[peer]; !ok {
a.config.Managers.Remove(peer)
a.config.ConnBroker.Remotes().Remove(peer)
}
}

Expand Down Expand Up @@ -468,7 +466,7 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
)

err = a.withSession(ctx, func(session *session) error {
publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx)
return err
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/remotes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestAgentStartStop(t *testing.T) {

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
ConnBroker: connectionbroker.New(remotes),
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
Expand Down Expand Up @@ -147,7 +148,7 @@ func agentTestEnv(t *testing.T) (*Agent, func()) {

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
ConnBroker: connectionbroker.New(remotes),
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
Expand Down
8 changes: 4 additions & 4 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/boltdb/bolt"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/connectionbroker"
"github.com/pkg/errors"
"google.golang.org/grpc/credentials"
)
Expand All @@ -14,9 +14,9 @@ type Config struct {
// Hostname the name of host for agent instance.
Hostname string

// Managers provides the manager backend used by the agent. It will be
// updated with managers weights as observed by the agent.
Managers remotes.Remotes
// ConnBroker provides a connection broker for retrieving gRPC
// connections to managers.
ConnBroker *connectionbroker.Broker

// Executor specifies the executor to use for the agent.
Executor exec.Executor
Expand Down
4 changes: 2 additions & 2 deletions agent/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ResourceAllocator interface {
func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) {
var taskID string
if err := r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{
Config: &api.NetworkAttachmentConfig{
Target: target,
Expand All @@ -53,7 +53,7 @@ func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string
// DetachNetwork deletes a network attachment.
func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error {
return r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
_, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{
AttachmentID: aID,
})
Expand Down
28 changes: 10 additions & 18 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -30,8 +30,7 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string
conn *connectionbroker.Conn

agent *Agent
sessionID string
Expand Down Expand Up @@ -61,20 +60,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.

peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc

go s.run(ctx, delay, description)
Expand Down Expand Up @@ -127,7 +120,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)

stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
Expand Down Expand Up @@ -160,7 +153,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e

func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

Expand Down Expand Up @@ -224,7 +217,7 @@ func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")

client := api.NewLogBrokerClient(s.conn)
client := api.NewLogBrokerClient(s.conn.ClientConn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
Expand Down Expand Up @@ -269,7 +262,7 @@ func (s *session) watch(ctx context.Context) error {
err error
)

client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
for {
// If this is the first time we're running the loop, or there was a reference mismatch
// attempt to get the assignmentWatch
Expand Down Expand Up @@ -344,7 +337,7 @@ func (s *session) watch(ctx context.Context) error {

// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
Expand Down Expand Up @@ -385,7 +378,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}

client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
n := batchSize

if len(updates) < n {
Expand Down Expand Up @@ -416,8 +409,7 @@ func (s *session) sendError(err error) {
func (s *session) close() error {
s.closeOnce.Do(func() {
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight)
s.conn.Close()
s.conn.Close(false)
}

close(s.closed)
Expand Down
72 changes: 36 additions & 36 deletions ca/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/cloudflare/cfssl/signer/local"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/ioutils"
"github.com/docker/swarmkit/remotes"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
Expand Down Expand Up @@ -169,6 +169,15 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
if err == nil {
break
}

// If the first attempt fails, we should try a remote
// connection. The local node may be a manager that was
// demoted, so the local connection (which is preferred) may
// not work. If we are successful in renewing the certificate,
// the local connection will not be returned by the connection
// broker anymore.
config.ForceRemote = true

}
if err != nil {
return nil, err
Expand Down Expand Up @@ -202,7 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit

var kekUpdate *KEKData
for i := 0; i < 5; i++ {
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes)
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.ConnBroker)
if err == nil {
break
}
Expand All @@ -218,7 +227,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
return &tlsKeyPair, nil
}

func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, r remotes.Remotes) (*KEKData, error) {
func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, connBroker *connectionbroker.Broker) (*KEKData, error) {
var managerRole bool
for _, ou := range cert.Subject.OrganizationalUnit {
if ou == ManagerRole {
Expand All @@ -229,25 +238,25 @@ func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, key

if managerRole {
mtlsCreds := credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rca.Pool, Certificates: []tls.Certificate{keypair}})
conn, peer, err := getGRPCConnection(mtlsCreds, r)
conn, err := getGRPCConnection(mtlsCreds, connBroker, false)
if err != nil {
return nil, err
}
defer conn.Close()

client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
response, err := client.GetUnlockKey(ctx, &api.GetUnlockKeyRequest{})
if err != nil {
if grpc.Code(err) == codes.Unimplemented { // if the server does not support keks, return as if no encryption key was specified
conn.Close(true)
return &KEKData{}, nil
}

r.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return &KEKData{KEK: response.UnlockKey, Version: response.Version.Index}, nil
}

Expand Down Expand Up @@ -440,45 +449,33 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) {
return NewRootCA(cert, key, DefaultNodeCertExpiration)
}

func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) {
peer, err := r.Select()
if err != nil {
return nil, api.Peer{}, err
}

opts := []grpc.DialOption{
func getGRPCConnection(creds credentials.TransportCredentials, connBroker *connectionbroker.Broker, forceRemote bool) (*connectionbroker.Conn, error) {
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithTimeout(5 * time.Second),
grpc.WithBackoffMaxDelay(5 * time.Second),
}

conn, err := grpc.Dial(peer.Addr, opts...)
if err != nil {
return nil, api.Peer{}, err
if forceRemote {
return connBroker.SelectRemote(dialOpts...)
}
return conn, peer, nil
return connBroker.Select(dialOpts...)
}

// GetRemoteCA returns the remote endpoint's CA certificate
func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) {
func GetRemoteCA(ctx context.Context, d digest.Digest, connBroker *connectionbroker.Broker) (RootCA, error) {
// This TLS Config is intentionally using InsecureSkipVerify. We use the
// digest instead to check the integrity of the CA certificate.
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, peer, err := getGRPCConnection(insecureCreds, r)
conn, err := getGRPCConnection(insecureCreds, connBroker, false)
if err != nil {
return RootCA{}, err
}
defer conn.Close()

client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer func() {
if err != nil {
r.Observe(peer, -remotes.DefaultObservationWeight)
return
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(err == nil)
}()
response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{})
if err != nil {
Expand Down Expand Up @@ -558,20 +555,22 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
}

conn, peer, err := getGRPCConnection(creds, config.Remotes)
conn, err := getGRPCConnection(creds, config.ConnBroker, config.ForceRemote)
if err != nil {
return nil, err
}
defer conn.Close()

// Create a CAClient to retrieve a new Certificate
caClient := api.NewNodeCAClient(conn)
caClient := api.NewNodeCAClient(conn.ClientConn)

issueCtx, issueCancel := context.WithTimeout(ctx, 5*time.Second)
defer issueCancel()

// Send the Request and retrieve the request token
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token, Availability: config.Availability}
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
issueResponse, err := caClient.IssueNodeCertificate(issueCtx, issueRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}

Expand All @@ -589,13 +588,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
defer cancel()
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}

// If the certificate was issued, return
if statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Certificate == nil {
conn.Close(false)
return nil, errors.New("no certificate in CertificateStatus response")
}

Expand All @@ -605,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
// retry until the certificate gets updated per our
// current request.
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return statusResponse.Certificate.Certificate, nil
}
}
Expand Down
Loading

0 comments on commit 69ea950

Please sign in to comment.