Skip to content

Commit

Permalink
Add leader cluster address to status/leader output. (#3061)
Browse files Browse the repository at this point in the history
* Add leader cluster address to status/leader output. This helps in
identifying a particular node when all share the same redirect address.

Fixes #3042
  • Loading branch information
jefferai authored Jul 31, 2017
1 parent b55d13c commit 95ce578
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 52 deletions.
7 changes: 4 additions & 3 deletions api/sys_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func (c *Sys) Leader() (*LeaderResponse, error) {
}

type LeaderResponse struct {
HAEnabled bool `json:"ha_enabled"`
IsSelf bool `json:"is_self"`
LeaderAddress string `json:"leader_address"`
HAEnabled bool `json:"ha_enabled"`
IsSelf bool `json:"is_self"`
LeaderAddress string `json:"leader_address"`
LeaderClusterAddress string `json:"leader_cluster_address"`
}
6 changes: 3 additions & 3 deletions command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ CLUSTER_SYNTHESIS_COMPLETE:
sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery)
if ok {
activeFunc := func() bool {
if isLeader, _, err := core.Leader(); err == nil {
if isLeader, _, _, err := core.Leader(); err == nil {
return isLeader
}
return false
Expand Down Expand Up @@ -689,7 +689,7 @@ func (c *ServerCommand) enableDev(core *vault.Core, coreConfig *vault.CoreConfig
return nil, fmt.Errorf("failed to unseal Vault for dev mode")
}

isLeader, _, err := core.Leader()
isLeader, _, _, err := core.Leader()
if err != nil && err != vault.ErrHANotEnabled {
return nil, fmt.Errorf("failed to check active status: %v", err)
}
Expand All @@ -702,7 +702,7 @@ func (c *ServerCommand) enableDev(core *vault.Core, coreConfig *vault.CoreConfig
return nil, fmt.Errorf("failed to get active status after five seconds; call stack is\n%s\n", buf)
}
time.Sleep(1 * time.Second)
isLeader, _, err = core.Leader()
isLeader, _, _, err = core.Leader()
if err != nil {
return nil, fmt.Errorf("failed to check active status: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion command/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func (c *StatusCommand) Run(args []string) int {
if leaderStatus.LeaderAddress == "" {
leaderStatus.LeaderAddress = "<none>"
}
c.Ui.Output(fmt.Sprintf("\tLeader: %s", leaderStatus.LeaderAddress))
if leaderStatus.LeaderClusterAddress == "" {
leaderStatus.LeaderClusterAddress = "<none>"
}
c.Ui.Output(fmt.Sprintf("\tLeader Cluster Address: %s", leaderStatus.LeaderClusterAddress))
}
}

Expand Down
6 changes: 3 additions & 3 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func handleRequestForwarding(core *vault.Core, handler http.Handler) http.Handle
// Note: in an HA setup, this call will also ensure that connections to
// the leader are set up, as that happens once the advertised cluster
// values are read during this function
isLeader, leaderAddr, err := core.Leader()
isLeader, leaderAddr, _, err := core.Leader()
if err != nil {
if err == vault.ErrHANotEnabled {
// Standalone node, serve request normally
Expand All @@ -171,7 +171,7 @@ func handleRequestForwarding(core *vault.Core, handler http.Handler) http.Handle
return
}
if leaderAddr == "" {
respondError(w, http.StatusInternalServerError, fmt.Errorf("node not active but active node not found"))
respondError(w, http.StatusInternalServerError, fmt.Errorf("local node not active but active cluster node not found"))
return
}

Expand Down Expand Up @@ -223,7 +223,7 @@ func request(core *vault.Core, w http.ResponseWriter, rawReq *http.Request, r *l
// respondStandby is used to trigger a redirect in the case that this Vault is currently a hot standby
func respondStandby(core *vault.Core, w http.ResponseWriter, reqURL *url.URL) {
// Request the leader address
_, redirectAddr, err := core.Leader()
_, redirectAddr, _, err := core.Leader()
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
Expand Down
16 changes: 9 additions & 7 deletions http/sys_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func handleSysLeader(core *vault.Core) http.Handler {

func handleSysLeaderGet(core *vault.Core, w http.ResponseWriter, r *http.Request) {
haEnabled := true
isLeader, address, err := core.Leader()
isLeader, address, clusterAddr, err := core.Leader()
if errwrap.Contains(err, vault.ErrHANotEnabled.Error()) {
haEnabled = false
err = nil
Expand All @@ -31,14 +31,16 @@ func handleSysLeaderGet(core *vault.Core, w http.ResponseWriter, r *http.Request
}

respondOk(w, &LeaderResponse{
HAEnabled: haEnabled,
IsSelf: isLeader,
LeaderAddress: address,
HAEnabled: haEnabled,
IsSelf: isLeader,
LeaderAddress: address,
LeaderClusterAddress: clusterAddr,
})
}

type LeaderResponse struct {
HAEnabled bool `json:"ha_enabled"`
IsSelf bool `json:"is_self"`
LeaderAddress string `json:"leader_address"`
HAEnabled bool `json:"ha_enabled"`
IsSelf bool `json:"is_self"`
LeaderAddress string `json:"leader_address"`
LeaderClusterAddress string `json:"leader_cluster_address"`
}
7 changes: 4 additions & 3 deletions http/sys_leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ func TestSysLeader_get(t *testing.T) {

var actual map[string]interface{}
expected := map[string]interface{}{
"ha_enabled": false,
"is_self": false,
"leader_address": "",
"ha_enabled": false,
"is_self": false,
"leader_address": "",
"leader_cluster_address": "",
}
testResponseStatus(t, resp, 200)
testResponseBody(t, resp, &actual)
Expand Down
2 changes: 1 addition & 1 deletion vault/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func testCluster_ForwardRequests(t *testing.T, c *TestClusterCore, rootToken, re
}

// We need to call Leader as that refreshes the connection info
isLeader, _, err := c.Leader()
isLeader, _, _, err := c.Leader()
if err != nil {
panic(err.Error())
t.Fatal(err)
Expand Down
32 changes: 18 additions & 14 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ type Core struct {
clusterLeaderUUID string
// Most recent leader redirect addr
clusterLeaderRedirectAddr string
// Most recent leader cluster addr
clusterLeaderClusterAddr string
// Lock for the cluster leader values
clusterLeaderParamsLock sync.RWMutex
// Info on cluster members
Expand Down Expand Up @@ -715,49 +717,50 @@ func (c *Core) Standby() (bool, error) {
}

// Leader is used to get the current active leader
func (c *Core) Leader() (isLeader bool, leaderAddr string, err error) {
func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err error) {
c.stateLock.RLock()
defer c.stateLock.RUnlock()

// Check if sealed
if c.sealed {
return false, "", consts.ErrSealed
return false, "", "", consts.ErrSealed
}

// Check if HA enabled
if c.ha == nil {
return false, "", ErrHANotEnabled
return false, "", "", ErrHANotEnabled
}

// Check if we are the leader
if !c.standby {
return true, c.redirectAddr, nil
return true, c.redirectAddr, c.clusterAddr, nil
}

// Initialize a lock
lock, err := c.ha.LockWith(coreLockPath, "read")
if err != nil {
return false, "", err
return false, "", "", err
}

// Read the value
held, leaderUUID, err := lock.Value()
if err != nil {
return false, "", err
return false, "", "", err
}
if !held {
return false, "", nil
return false, "", "", nil
}

c.clusterLeaderParamsLock.RLock()
localLeaderUUID := c.clusterLeaderUUID
localRedirAddr := c.clusterLeaderRedirectAddr
localClusterAddr := c.clusterLeaderClusterAddr
c.clusterLeaderParamsLock.RUnlock()

// If the leader hasn't changed, return the cached value; nothing changes
// mid-leadership, and the barrier caches anyways
if leaderUUID == localLeaderUUID && localRedirAddr != "" {
return false, localRedirAddr, nil
return false, localRedirAddr, localClusterAddr, nil
}

c.logger.Trace("core: found new active node information, refreshing")
Expand All @@ -767,16 +770,16 @@ func (c *Core) Leader() (isLeader bool, leaderAddr string, err error) {

// Validate base conditions again
if leaderUUID == c.clusterLeaderUUID && c.clusterLeaderRedirectAddr != "" {
return false, localRedirAddr, nil
return false, localRedirAddr, localClusterAddr, nil
}

key := coreLeaderPrefix + leaderUUID
entry, err := c.barrier.Get(key)
if err != nil {
return false, "", err
return false, "", "", err
}
if entry == nil {
return false, "", nil
return false, "", "", nil
}

var oldAdv bool
Expand All @@ -796,23 +799,24 @@ func (c *Core) Leader() (isLeader bool, leaderAddr string, err error) {
// Ensure we are using current values
err = c.loadLocalClusterTLS(adv)
if err != nil {
return false, "", err
return false, "", "", err
}

// This will ensure that we both have a connection at the ready and that
// the address is the current known value
err = c.refreshRequestForwardingConnection(adv.ClusterAddr)
if err != nil {
return false, "", err
return false, "", "", err
}
}

// Don't set these until everything has been parsed successfully or we'll
// never try again
c.clusterLeaderRedirectAddr = adv.RedirectAddr
c.clusterLeaderClusterAddr = adv.ClusterAddr
c.clusterLeaderUUID = leaderUUID

return false, adv.RedirectAddr, nil
return false, adv.RedirectAddr, adv.ClusterAddr, nil
}

// SecretProgress returns the number of keys provided so far
Expand Down
28 changes: 14 additions & 14 deletions vault/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func TestCore_Standby_Seal(t *testing.T) {
TestWaitActive(t, core)

// Check the leader is local
isLeader, advertise, err := core.Leader()
isLeader, advertise, _, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1053,7 +1053,7 @@ func TestCore_Standby_Seal(t *testing.T) {
}

// Check the leader is not local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1117,7 +1117,7 @@ func TestCore_StepDown(t *testing.T) {
TestWaitActive(t, core)

// Check the leader is local
isLeader, advertise, err := core.Leader()
isLeader, advertise, _, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func TestCore_StepDown(t *testing.T) {
}

// Check the leader is not local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1205,7 +1205,7 @@ func TestCore_StepDown(t *testing.T) {
}

// Check the leader is core2
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -1217,7 +1217,7 @@ func TestCore_StepDown(t *testing.T) {
}

// Check the leader is not local
isLeader, advertise, err = core.Leader()
isLeader, advertise, _, err = core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1248,7 +1248,7 @@ func TestCore_StepDown(t *testing.T) {
}

// Check the leader is core1
isLeader, advertise, err = core.Leader()
isLeader, advertise, _, err = core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -1260,7 +1260,7 @@ func TestCore_StepDown(t *testing.T) {
}

// Check the leader is not local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func TestCore_CleanLeaderPrefix(t *testing.T) {
}

// Check the leader is local
isLeader, advertise, err := core.Leader()
isLeader, advertise, _, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestCore_CleanLeaderPrefix(t *testing.T) {
}

// Check the leader is not local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func TestCore_CleanLeaderPrefix(t *testing.T) {
TestWaitActive(t, core2)

// Check the leader is local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1494,7 +1494,7 @@ func testCore_Standby_Common(t *testing.T, inm physical.Backend, inmha physical.
}

// Check the leader is local
isLeader, advertise, err := core.Leader()
isLeader, advertise, _, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1547,7 +1547,7 @@ func testCore_Standby_Common(t *testing.T, inm physical.Backend, inmha physical.
}

// Check the leader is not local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func testCore_Standby_Common(t *testing.T, inm physical.Backend, inmha physical.
}

// Check the leader is local
isLeader, advertise, err = core2.Leader()
isLeader, advertise, _, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions vault/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,14 +1197,14 @@ func NewTestCluster(t testing.TB, base *CoreConfig, opts *TestClusterOptions) *T
time.Sleep(2 * time.Second)

// Ensure cluster connection info is populated
isLeader, _, err := c2.Leader()
isLeader, _, _, err := c2.Leader()
if err != nil {
t.Fatal(err)
}
if isLeader {
t.Fatal("c2 should not be leader")
}
isLeader, _, err = c3.Leader()
isLeader, _, _, err = c3.Leader()
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 95ce578

Please sign in to comment.