Skip to content

Commit

Permalink
On node, if a Join Address is specified, track the number of sessions…
Browse files Browse the repository at this point in the history
… established.

If a session has never been established, and we get an x509 error, the make sure the
agent does not rebuild new sessions and just exists.  Make sure that this error gets
propagated back from the node.

Signed-off-by: Ying Li <[email protected]>
  • Loading branch information
cyli committed May 30, 2017
1 parent 542a7bf commit 8884a3c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 12 deletions.
4 changes: 2 additions & 2 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (c *testCluster) SetNodeRole(id string, role api.NodeRole) error {
}

// Starts a node from a stopped state
func (c *testCluster) StartNode(id string) error {
func (c *testCluster) StartNode(id string, waitTimeout time.Duration) error {
n, ok := c.nodes[id]
if !ok {
return fmt.Errorf("set node role: node %s not found", id)
Expand Down Expand Up @@ -364,7 +364,7 @@ func (c *testCluster) StartNode(id string) error {
case <-n.node.Ready():
case err := <-done:
return err
case <-time.After(opsTimeout):
case <-time.After(waitTimeout):
return fmt.Errorf("node did not ready in time")
}
if n.node.NodeID() != id {
Expand Down
86 changes: 80 additions & 6 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
cautils "github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/testutils"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -438,7 +439,7 @@ func TestDemoteDownedManager(t *testing.T) {
}, opsTimeout))

// start it back up again
require.NoError(t, cl.StartNode(nodeID))
require.NoError(t, cl.StartNode(nodeID, opsTimeout))

// wait to become worker
require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error {
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestRestartLeader(t *testing.T) {
return nil
}, opsTimeout))

require.NoError(t, cl.StartNode(origLeaderID))
require.NoError(t, cl.StartNode(origLeaderID, opsTimeout))

pollClusterReady(t, cl, numWorker, numManager)
}
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestForceNewCluster(t *testing.T) {
nodeID := leader.node.NodeID()
require.NoError(t, leader.Pause(true))
require.NoError(t, ioutil.WriteFile(managerCertFile, expiredCertPEM, 0644))
require.NoError(t, cl.StartNode(nodeID))
require.NoError(t, cl.StartNode(nodeID, opsTimeout))
pollClusterReady(t, cl, numWorker, numManager)
pollServiceReady(t, cl, sid, 2)

Expand All @@ -551,7 +552,7 @@ func TestForceNewCluster(t *testing.T) {
// restart node with an expired certificate without forcing a new cluster - it should error on start
require.NoError(t, leader.Pause(true))
require.NoError(t, ioutil.WriteFile(managerCertFile, expiredCertPEM, 0644))
require.Error(t, cl.StartNode(nodeID))
require.Error(t, cl.StartNode(nodeID, opsTimeout))
}

func pollRootRotationDone(t *testing.T, cl *testCluster) {
Expand Down Expand Up @@ -629,8 +630,8 @@ func TestSuccessfulRootRotation(t *testing.T) {

// Bring the other manager back. Also bring one worker back, kill the other worker,
// and add a new worker - show that we can converge on a root rotation.
require.NoError(t, cl.StartNode(downManagerID))
require.NoError(t, cl.StartNode(downWorkerIDs[0]))
require.NoError(t, cl.StartNode(downManagerID, opsTimeout))
require.NoError(t, cl.StartNode(downWorkerIDs[0], opsTimeout))
require.NoError(t, cl.RemoveNode(downWorkerIDs[1], false))
require.NoError(t, cl.AddAgent())

Expand Down Expand Up @@ -718,3 +719,76 @@ func TestRepeatedRootRotation(t *testing.T) {
return nil
}, opsTimeout))
}

func TestNodeRejoins(t *testing.T) {
t.Parallel()
numWorker, numManager := 1, 3
cl := newCluster(t, numWorker, numManager)
defer func() {
require.NoError(t, cl.Stop())
}()
pollClusterReady(t, cl, numWorker, numManager)

clusterInfo, err := cl.GetClusterInfo()
require.NoError(t, err)

leader, err := cl.Leader()
require.NoError(t, err)

// Find a manager (not the leader) and the worker to shut down.
getNonLeaderAndWorker := func() map[string]*testNode {
results := make(map[string]*testNode)
for _, n := range cl.nodes {
nodeID := n.node.NodeID()
if n.IsManager() {
if nodeID != leader.node.NodeID() {
results[ca.ManagerRole] = n
}
} else {
results[ca.WorkerRole] = n
}
}
return results
}

// rejoining succeeds - (both because the certs are correct, and because node.Pause sets the JoinAddr to "")
for _, n := range getNonLeaderAndWorker() {
nodeID := n.node.NodeID()
require.NoError(t, n.Pause(false))
require.NoError(t, cl.StartNode(nodeID, opsTimeout))
}
pollClusterReady(t, cl, numWorker, numManager)

// rejoining if the certs are wrong will fail fast so long as the join address is passed, but will keep retrying
// forever if the join address is not passed
leader, err = cl.Leader() // in case leadership changed
require.NoError(t, err)
for role, n := range getNonLeaderAndWorker() {
nodeID := n.node.NodeID()
require.NoError(t, n.Pause(false))

// generate new certs with the same node ID, role, and cluster ID, but with the wrong CA
paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, "certificates"))
newRootCA, err := ca.CreateRootCA("bad root CA")
require.NoError(t, err)
ca.SaveRootCA(newRootCA, paths.RootCA)
krw := ca.NewKeyReadWriter(paths.Node, nil, &manager.RaftDEKData{}) // make sure the key headers are preserved
_, _, err = krw.Read()
require.NoError(t, err)
_, _, err = newRootCA.IssueAndSaveNewCertificates(krw, nodeID, role, clusterInfo.ID)
require.NoError(t, err)

n.config.JoinAddr, err = leader.node.RemoteAPIAddr()
require.NoError(t, err)
err = cl.StartNode(nodeID, 5*time.Second)
require.Error(t, err)
require.Contains(t, err.Error(), "certificate signed by unknown authority")

// don't wait forever - 5 seconds should be sufficient
require.NoError(t, n.Pause(false))
n.config.JoinAddr = ""
err = cl.StartNode(nodeID, 5*time.Second)
require.Error(t, err)
require.Contains(t, err.Error(), "node did not ready in time")
}
}
45 changes: 41 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"strings"

"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/docker/docker/pkg/plugingetter"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
)

Expand Down Expand Up @@ -432,10 +435,10 @@ func (n *Node) run(ctx context.Context) (err error) {
}()

wg.Wait()
if managerErr != nil && managerErr != context.Canceled {
if managerErr != nil && errors.Cause(managerErr) != context.Canceled {
return managerErr
}
if agentErr != nil && agentErr != context.Canceled {
if agentErr != nil && errors.Cause(agentErr) != context.Canceled {
return agentErr
}
return err
Expand Down Expand Up @@ -516,7 +519,7 @@ waitPeer:
rootCA := securityConfig.RootCA()
issuer := securityConfig.IssuerInfo()

a, err := agent.New(&agent.Config{
agentConfig := &agent.Config{
Hostname: n.config.Hostname,
ConnBroker: n.connBroker,
Executor: n.config.Executor,
Expand All @@ -529,7 +532,14 @@ waitPeer:
CertIssuerPublicKey: issuer.PublicKey,
CertIssuerSubject: issuer.Subject,
},
})
}
// if a join address has been specified, then if the agent fails to connect due to a TLS error, fail fast - don't
// keep re-trying to join
if n.config.JoinAddr != "" {
agentConfig.SessionTracker = &firstSessionErrorTracker{}
}

a, err := agent.New(agentConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -1055,3 +1065,30 @@ func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID
func (sp sortablePeers) Len() int { return len(sp) }

func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }

// firstSessionErrorTracker is a utility that helps determine whether the agent should exit after
// a TLS failure on establishing the first session. This should only happen if a join address
// is specified. If establishing the first session succeeds, but later on some session fails
// because of a TLS error, we don't want to exit the agent because a previously successful
// session indicates that the TLS error may be a transient issue.
type firstSessionErrorTracker struct {
pastFirstSession bool
err error
}

func (fs *firstSessionErrorTracker) OnSessionEstablished() {
fs.pastFirstSession = true
}

func (fs *firstSessionErrorTracker) OnSessionError(err error) {
fs.err = err
}

func (fs *firstSessionErrorTracker) OnSessionClosed() error {
// unfortunately grpc connection errors are type grpc.rpcError, which are not exposed, and we can't get at the underlying error type
if !fs.pastFirstSession && grpc.Code(fs.err) == codes.Internal &&
strings.HasPrefix(grpc.ErrorDesc(fs.err), "connection error") && strings.Contains(grpc.ErrorDesc(fs.err), "transport: x509:") {
return fs.err
}
return nil
}

0 comments on commit 8884a3c

Please sign in to comment.