From 7e74bebea84b098d9d2de0eed2490f352481717a Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Mon, 17 Jan 2022 10:33:03 -0500 Subject: [PATCH] Parallel retry join (#13606) --- changelog/13606.txt | 3 + helper/testhelpers/testhelpers.go | 2 +- vault/external_tests/raftha/raft_ha_test.go | 4 +- vault/raft.go | 400 +++++++++++--------- 4 files changed, 231 insertions(+), 178 deletions(-) create mode 100644 changelog/13606.txt diff --git a/changelog/13606.txt b/changelog/13606.txt new file mode 100644 index 000000000000..c41abbab5952 --- /dev/null +++ b/changelog/13606.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: When using retry_join stanzas, join against all of them in parallel. +``` \ No newline at end of file diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index bca97f6717e1..5eb84826503b 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -655,7 +655,7 @@ func VerifyRaftPeers(t testing.T, client *api.Client, expected map[string]bool) // If the collection is non-empty, it means that the peer was not found in // the response. if len(expected) != 0 { - t.Fatalf("failed to read configuration successfully, expected peers no found in configuration list: %v", expected) + t.Fatalf("failed to read configuration successfully, expected peers not found in configuration list: %v", expected) } } diff --git a/vault/external_tests/raftha/raft_ha_test.go b/vault/external_tests/raftha/raft_ha_test.go index 3a8066f5c680..e650b6bbb6d2 100644 --- a/vault/external_tests/raftha/raft_ha_test.go +++ b/vault/external_tests/raftha/raft_ha_test.go @@ -170,7 +170,7 @@ func TestRaft_HA_ExistingCluster(t *testing.T) { haStorage, haCleanup := teststorage.MakeReusableRaftHAStorage(t, logger, opts.NumCores, physBundle) defer haCleanup() - updateCLuster := func(t *testing.T) { + updateCluster := func(t *testing.T) { t.Log("simulating cluster update with raft as HABackend") opts.SkipInit = true @@ -240,5 +240,5 @@ func TestRaft_HA_ExistingCluster(t *testing.T) { }) } - updateCLuster(t) + updateCluster(t) } diff --git a/vault/raft.go b/vault/raft.go index 7ac373700264..7545bd42d24d 100644 --- a/vault/raft.go +++ b/vault/raft.go @@ -710,6 +710,104 @@ func (c *Core) InitiateRetryJoin(ctx context.Context) error { return nil } +// getRaftChallenge is a helper function used by the raft join process for adding a +// node to a cluster: it contacts the given node and initiates the bootstrap +// challenge, returning the result or an error. +func (c *Core) getRaftChallenge(leaderInfo *raft.LeaderJoinInfo) (*raftInformation, error) { + if leaderInfo == nil { + return nil, errors.New("raft leader information is nil") + } + if len(leaderInfo.LeaderAPIAddr) == 0 { + return nil, errors.New("raft leader address not provided") + } + + c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr) + + // Create an API client to interact with the leader node + transport := cleanhttp.DefaultPooledTransport() + + var err error + if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) { + leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey)) + if err != nil { + return nil, fmt.Errorf("failed to create TLS config: %w", err) + } + leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName + } + if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" { + leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "") + if err != nil { + return nil, fmt.Errorf("failed to create TLS config: %w", err) + } + } + + if leaderInfo.TLSConfig != nil { + transport.TLSClientConfig = leaderInfo.TLSConfig.Clone() + if err := http2.ConfigureTransport(transport); err != nil { + return nil, fmt.Errorf("failed to configure TLS: %w", err) + } + } + + client := &http.Client{ + Transport: transport, + } + + config := api.DefaultConfig() + if config.Error != nil { + return nil, fmt.Errorf("failed to create api client: %w", config.Error) + } + + config.Address = leaderInfo.LeaderAPIAddr + config.HttpClient = client + config.MaxRetries = 0 + + apiClient, err := api.NewClient(config) + if err != nil { + return nil, fmt.Errorf("failed to create api client: %w", err) + } + + // Attempt to join the leader by requesting for the bootstrap challenge + secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{ + "server_id": c.getRaftBackend().NodeID(), + }) + if err != nil { + return nil, fmt.Errorf("error during raft bootstrap init call: %w", err) + } + if secret == nil { + return nil, errors.New("could not retrieve raft bootstrap package") + } + + var sealConfig SealConfig + err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig) + if err != nil { + return nil, err + } + + if sealConfig.Type != c.seal.BarrierType() { + return nil, fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType()) + } + + challengeB64, ok := secret.Data["challenge"] + if !ok { + return nil, errors.New("error during raft bootstrap call, no challenge given") + } + challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string)) + if err != nil { + return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err) + } + + eBlob := &wrapping.EncryptedBlobInfo{} + if err := proto.Unmarshal(challengeRaw, eBlob); err != nil { + return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err) + } + + return &raftInformation{ + challenge: eBlob, + leaderClient: apiClient, + leaderBarrierConfig: &sealConfig, + }, nil +} + func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) { raftBackend := c.getRaftBackend() if raftBackend == nil { @@ -790,201 +888,110 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo return false, fmt.Errorf("failed to create auto-join discovery: %w", err) } - join := func(retry bool) error { - joinLeader := func(leaderInfo *raft.LeaderJoinInfo, leaderAddr string) error { - if leaderInfo == nil { - return errors.New("raft leader information is nil") - } - if len(leaderAddr) == 0 { - return errors.New("raft leader address not provided") - } - - init, err := c.InitializedLocally(ctx) - if err != nil { - return fmt.Errorf("failed to check if core is initialized: %w", err) + retryFailures := leaderInfos[0].Retry + // answerChallenge performs the second part of a raft join: after we've issued + // the sys/storage/raft/bootstrap/challenge call to initiate the join, this + // func uses the seal to compute an answer to the challenge and sends it + // back to the server that provided the challenge. + answerChallenge := func(ctx context.Context, raftInfo *raftInformation) error { + // If we're using Shamir and using raft for both physical and HA, we + // need to block until the node is unsealed, unless retry is set to + // false. + if c.seal.BarrierType() == wrapping.Shamir && !c.isRaftHAOnly() { + c.raftInfo = raftInfo + if err := c.seal.SetBarrierConfig(ctx, raftInfo.leaderBarrierConfig); err != nil { + return err } - if init && !isRaftHAOnly { - c.logger.Info("returning from raft join as the node is initialized") + if !retryFailures { return nil } - c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderAddr) - - // Create an API client to interact with the leader node - transport := cleanhttp.DefaultPooledTransport() - - if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) { - leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey)) - if err != nil { - return fmt.Errorf("failed to create TLS config: %w", err) - } - leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName - } - if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" { - leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "") - if err != nil { - return fmt.Errorf("failed to create TLS config: %w", err) - } - } - - if leaderInfo.TLSConfig != nil { - transport.TLSClientConfig = leaderInfo.TLSConfig.Clone() - if err := http2.ConfigureTransport(transport); err != nil { - return fmt.Errorf("failed to configure TLS: %w", err) - } - } - - client := &http.Client{ - Transport: transport, - } - - config := api.DefaultConfig() - if config.Error != nil { - return fmt.Errorf("failed to create api client: %w", config.Error) + // Wait until unseal keys are supplied + c.raftInfo.joinInProgress = true + if atomic.LoadUint32(c.postUnsealStarted) != 1 { + return errors.New("waiting for unseal keys to be supplied") } + } - config.Address = leaderAddr - config.HttpClient = client - config.MaxRetries = 0 + raftInfo.nonVoter = nonVoter + if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil { + return fmt.Errorf("failed to send answer to raft leader node: %w", err) + } - apiClient, err := api.NewClient(config) - if err != nil { - return fmt.Errorf("failed to create api client: %w", err) - } + if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly { + // Reset the state + c.raftInfo = nil - // Attempt to join the leader by requesting for the bootstrap challenge - secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{ - "server_id": raftBackend.NodeID(), - }) - if err != nil { - return fmt.Errorf("error during raft bootstrap init call: %w", err) - } - if secret == nil { - return errors.New("could not retrieve raft bootstrap package") - } - - var sealConfig SealConfig - err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig) - if err != nil { - return err - } + // In case of Shamir unsealing, inform the unseal process that raft join is completed + close(c.raftJoinDoneCh) + } - if sealConfig.Type != c.seal.BarrierType() { - return fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType()) - } + c.logger.Info("successfully joined the raft cluster", "leader_addr", raftInfo.leaderClient.Address()) + return nil + } - challengeB64, ok := secret.Data["challenge"] - if !ok { - return errors.New("error during raft bootstrap call, no challenge given") - } - challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string)) + // join attempts to join to any of the leaders defined in leaderInfos, + // using the first one that returns a challenge to our request. If shamir + // seal is in use, we must wait to get enough unseal keys to solve the + // challenge. If we're unable to get a challenge from any leader, or if + // we fail to answer the challenge successfully, or if ctx times out, + // an error is returned. + join := func() error { + init, err := c.InitializedLocally(ctx) + if err != nil { + return fmt.Errorf("failed to check if core is initialized: %w", err) + } + if init && !isRaftHAOnly { + c.logger.Info("returning from raft join as the node is initialized") + return nil + } + challengeCh := make(chan *raftInformation) + var expandedJoinInfos []*raft.LeaderJoinInfo + for _, leaderInfo := range leaderInfos { + joinInfos, err := c.raftLeaderInfo(leaderInfo, disco) if err != nil { - return fmt.Errorf("error decoding raft bootstrap challenge: %w", err) - } - - eBlob := &wrapping.EncryptedBlobInfo{} - if err := proto.Unmarshal(challengeRaw, eBlob); err != nil { - return fmt.Errorf("error decoding raft bootstrap challenge: %w", err) + c.logger.Error("error in retry_join stanza, will not use it for raft join", "error", err, + "leader_api_addr", leaderInfo.LeaderAPIAddr, "auto_join", leaderInfo.AutoJoin != "") + continue } - - raftInfo := &raftInformation{ - challenge: eBlob, - leaderClient: apiClient, - leaderBarrierConfig: &sealConfig, - nonVoter: nonVoter, - } - - // If we're using Shamir and using raft for both physical and HA, we - // need to block until the node is unsealed, unless retry is set to - // false. - if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly { - c.raftInfo = raftInfo - if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil { - return err - } - - if !retry { - return nil - } - - // Wait until unseal keys are supplied - c.raftInfo.joinInProgress = true - if atomic.LoadUint32(c.postUnsealStarted) != 1 { - return errors.New("waiting for unseal keys to be supplied") + expandedJoinInfos = append(expandedJoinInfos, joinInfos...) + } + if err != nil { + return err + } + var wg sync.WaitGroup + for i := range leaderInfos { + wg.Add(1) + go func(joinInfo *raft.LeaderJoinInfo) { + defer wg.Done() + raftInfo, err := c.getRaftChallenge(joinInfo) + if err != nil { + c.Logger().Trace("failed to get raft challenge", "leader_addr", joinInfo.LeaderAPIAddr, "error", err) + return } - } - - if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil { - return fmt.Errorf("failed to send answer to raft leader node: %w", err) - } - - if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly { - // Reset the state - c.raftInfo = nil - - // In case of Shamir unsealing, inform the unseal process that raft join is completed - close(c.raftJoinDoneCh) - } - - c.logger.Info("successfully joined the raft cluster", "leader_addr", leaderInfo.LeaderAPIAddr) - return nil + challengeCh <- raftInfo + }(leaderInfos[i]) } + go func() { + wg.Wait() + close(challengeCh) + }() - // Each join try goes through all the possible leader nodes and attempts to join - // them, until one of the attempt succeeds. - for _, leaderInfo := range leaderInfos { - switch { - case leaderInfo.LeaderAPIAddr != "" && leaderInfo.AutoJoin != "": - c.logger.Error("join attempt failed", "error", errors.New("cannot provide both leader address and auto-join metadata")) - - case leaderInfo.LeaderAPIAddr != "": - if err := joinLeader(leaderInfo, leaderInfo.LeaderAPIAddr); err != nil { - c.logger.Warn("join attempt failed", "error", err) - } else { - // successfully joined leader + select { + case <-ctx.Done(): + case raftInfo := <-challengeCh: + if raftInfo != nil { + err = answerChallenge(ctx, raftInfo) + if err == nil { return nil } - - case leaderInfo.AutoJoin != "": - scheme := leaderInfo.AutoJoinScheme - if scheme == "" { - // default to HTTPS when no scheme is provided - scheme = "https" - } - port := leaderInfo.AutoJoinPort - if port == 0 { - // default to 8200 when no port is provided - port = 8200 - } - // Addrs returns either IPv4 or IPv6 address sans scheme or port - clusterIPs, err := disco.Addrs(leaderInfo.AutoJoin, c.logger.StandardLogger(nil)) - if err != nil { - c.logger.Error("failed to parse addresses from auto-join metadata", "error", err) - } - for _, ip := range clusterIPs { - if strings.Count(ip, ":") >= 2 && !strings.HasPrefix(ip, "["){ - // An IPv6 address in implicit form, however we need it in explicit form to use in a URL. - ip = fmt.Sprintf("[%s]", ip) - } - u := fmt.Sprintf("%s://%s:%d", scheme, ip, port) - if err := joinLeader(leaderInfo, u); err != nil { - c.logger.Warn("join attempt failed", "error", err) - } else { - // successfully joined leader - return nil - } - } - - default: - c.logger.Error("join attempt failed", "error", errors.New("must provide leader address or auto-join metadata")) } } - - return errors.New("failed to join any raft leader node") + return fmt.Errorf("timed out on raft join: %w", ctx.Err()) } - switch leaderInfos[0].Retry { + switch retryFailures { case true: go func() { for { @@ -993,7 +1000,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo return default: } - err := join(true) + err := join() if err == nil { return } @@ -1005,7 +1012,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo // Backgrounded so return false return false, nil default: - if err := join(false); err != nil { + if err := join(); err != nil { c.logger.Error("failed to join raft cluster", "error", err) return false, fmt.Errorf("failed to join raft cluster: %w", err) } @@ -1014,6 +1021,49 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo return true, nil } +// raftLeaderInfo uses go-discover to expand leaderInfo to include any auto-join results +func (c *Core) raftLeaderInfo(leaderInfo *raft.LeaderJoinInfo, disco *discover.Discover) ([]*raft.LeaderJoinInfo, error) { + var ret []*raft.LeaderJoinInfo + switch { + case leaderInfo.LeaderAPIAddr != "" && leaderInfo.AutoJoin != "": + return nil, errors.New("cannot provide both leader address and auto-join metadata") + + case leaderInfo.LeaderAPIAddr != "": + ret = append(ret, leaderInfo) + + case leaderInfo.AutoJoin != "": + scheme := leaderInfo.AutoJoinScheme + if scheme == "" { + // default to HTTPS when no scheme is provided + scheme = "https" + } + port := leaderInfo.AutoJoinPort + if port == 0 { + // default to 8200 when no port is provided + port = 8200 + } + // Addrs returns either IPv4 or IPv6 address, without scheme or port + clusterIPs, err := disco.Addrs(leaderInfo.AutoJoin, c.logger.StandardLogger(nil)) + if err != nil { + return nil, fmt.Errorf("failed to parse addresses from auto-join metadata: %w", err) + } + for _, ip := range clusterIPs { + if strings.Count(ip, ":") >= 2 && !strings.HasPrefix(ip, "[") { + // An IPv6 address in implicit form, however we need it in explicit form to use in a URL. + ip = fmt.Sprintf("[%s]", ip) + } + u := fmt.Sprintf("%s://%s:%d", scheme, ip, port) + info := *leaderInfo + info.LeaderAPIAddr = u + ret = append(ret, &info) + } + + default: + return nil, errors.New("must provide leader address or auto-join metadata") + } + return ret, nil +} + // getRaftBackend returns the RaftBackend from the HA or physical backend, // in that order of preference, or nil if not of type RaftBackend. func (c *Core) getRaftBackend() *raft.RaftBackend {