From f2dc2f636e47f2427acfc5ba003191b9f51ba38e Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Fri, 8 Apr 2016 11:55:20 -0700 Subject: [PATCH 01/21] Comment nits --- command/server.go | 2 +- command/server/config.go | 4 ++-- website/source/docs/config/index.html.md | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/command/server.go b/command/server.go index 602981a7fc24..b13f8d710386 100644 --- a/command/server.go +++ b/command/server.go @@ -205,7 +205,7 @@ func (c *ServerCommand) Run(args []string) int { coreConfig.AdvertiseAddr = envAA } - // Attempt to detect the advertise address possible + // Attempt to detect the advertise address, if possible var detect physical.AdvertiseDetect if coreConfig.HAPhysical != nil { detect, ok = coreConfig.HAPhysical.(physical.AdvertiseDetect) diff --git a/command/server/config.go b/command/server/config.go index 450f640f8be4..1945040f4585 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -365,7 +365,7 @@ func parseBackends(result *Config, list *ast.ObjectList) error { return multierror.Prefix(err, fmt.Sprintf("backend.%s:", key)) } - // Pull out the advertise address since it's commong to all backends + // Pull out the advertise address since it's common to all backends var advertiseAddr string if v, ok := m["advertise_addr"]; ok { advertiseAddr = v @@ -398,7 +398,7 @@ func parseHABackends(result *Config, list *ast.ObjectList) error { return multierror.Prefix(err, fmt.Sprintf("ha_backend.%s:", key)) } - // Pull out the advertise address since it's commong to all backends + // Pull out the advertise address since it's common to all backends var advertiseAddr string if v, ok := m["advertise_addr"]; ok { advertiseAddr = v diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 52226a9f218b..237efd1dba35 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -185,7 +185,7 @@ All backends support the following options: nodes to when A is the active node and B and C are standby nodes. This may be the same address across nodes if using a load balancer or service discovery. Most HA backends will attempt to determine the advertise address - if not provided. This can also be set via the `VAULT_ADVERTISE_ADDR` + if not provided. This can also be overridden via the `VAULT_ADVERTISE_ADDR` environment variable. #### Backend Reference: Consul From bd5305e470d63ddd0b3490029ce3d6c1f891e00f Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Fri, 22 Apr 2016 19:55:17 -0700 Subject: [PATCH 02/21] Stub out service discovery functionality Hook asynchronous notifications into Core to change the status of vault based on its active/standby, and sealed/unsealed status. --- command/server.go | 11 +++++++++++ physical/physical.go | 25 ++++++++++++++++++++++++ vault/core.go | 46 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/command/server.go b/command/server.go index b13f8d710386..0af41cddd266 100644 --- a/command/server.go +++ b/command/server.go @@ -286,6 +286,17 @@ func (c *ServerCommand) Run(args []string) int { } } + // If the backend supports service discovery, run service discovery + if coreConfig.HAPhysical != nil { + sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) + if ok { + if err := sd.RunServiceDiscovery(c.ShutdownCh); err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) + return 1 + } + } + } + // Initialize the listeners lns := make([]net.Listener, 0, len(config.Listeners)) for i, lnConfig := range config.Listeners { diff --git a/physical/physical.go b/physical/physical.go index 8f28f00a7cfc..881d9eae9645 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -4,6 +4,9 @@ import "fmt" const DefaultParallelOperations = 128 +// ShutdownSignal +type ShutdownChannel chan struct{} + // Backend is the interface required for a physical // backend. A physical backend is used to durably store // data outside of Vault. As such, it is completely untrusted, @@ -40,6 +43,28 @@ type HABackend interface { type AdvertiseDetect interface { // DetectHostAddr is used to detect the host address DetectHostAddr() (string, error) + + // UpdateAdvertiseAddr allows for a non-Running backend to update the + // advertise address. HABackends may want to present a different + // address that wasn't available when a Backend was created. + UpdateAdvertiseAddr(addr string) error +} + +// ServiceDiscovery is an optional interface that an HABackend can implement. +// If they do, the state of a backend is advertised to the service discovery +// network. +type ServiceDiscovery interface { + // AdvertiseActive is used to reflect whether or not a backend is in + // an active or standby state. + AdvertiseActive(bool) error + + // AdvertiseSealed is used to reflect whether or not a backend is in + // a sealed state or not. + AdvertiseSealed(bool) error + + // Run executes any background service discovery tasks until the + // shutdown channel is closed. + RunServiceDiscovery(ShutdownChannel) error } type Lock interface { diff --git a/vault/core.go b/vault/core.go index 873ad4f7581d..4590ae9d39d6 100644 --- a/vault/core.go +++ b/vault/core.go @@ -942,6 +942,16 @@ func (c *Core) Unseal(key []byte) (bool, error) { // Success! c.sealed = false + if c.ha != nil { + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseSealed(c.sealed); err != nil { + c.logger.Printf("[WARN] core: failed to advertise unsealed status: %v", err) + } + }() + } + } return true, nil } @@ -1087,6 +1097,17 @@ func (c *Core) sealInternal() error { } c.logger.Printf("[INFO] core: vault is sealed") + if c.ha != nil { + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseSealed(c.sealed); err != nil { + c.logger.Printf("[WARN] core: failed to advertise sealed status: %v", err) + } + }() + } + } + return nil } @@ -1203,6 +1224,16 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) { defer close(manualStepDownCh) c.logger.Printf("[INFO] core: entering standby mode") + // Advertise ourselves as a standby + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseActive(false); err != nil { + c.logger.Printf("[WARN] core: failed to advertise standby status: %v", err) + } + }() + } + // Monitor for key rotation keyRotateDone := make(chan struct{}) keyRotateStop := make(chan struct{}) @@ -1396,7 +1427,20 @@ func (c *Core) advertiseLeader(uuid string, leaderLostCh <-chan struct{}) error Key: coreLeaderPrefix + uuid, Value: []byte(c.advertiseAddr), } - return c.barrier.Put(ent) + err := c.barrier.Put(ent) + if err != nil { + return err + } + + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseActive(true); err != nil { + c.logger.Printf("[WARN] core: failed to advertise active status: %v", err) + } + }() + } + return nil } func (c *Core) cleanLeaderPrefix(uuid string, leaderLostCh <-chan struct{}) { From 0d3ce59542207eaf2dd6c5df6b4150294468ff3f Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 12:33:55 -0700 Subject: [PATCH 03/21] Update vendor'ed version of hashicorp/consul/lib Note: Godeps.json not updated --- .../hashicorp/consul/lib/cluster.go | 36 +++++++++++++++++++ .../github.com/hashicorp/consul/lib/math.go | 22 ++++++++++++ .../github.com/hashicorp/consul/lib/rand.go | 18 ++++++++++ .../github.com/hashicorp/consul/lib/string.go | 11 ++++++ 4 files changed, 87 insertions(+) create mode 100644 vendor/github.com/hashicorp/consul/lib/cluster.go create mode 100644 vendor/github.com/hashicorp/consul/lib/math.go create mode 100644 vendor/github.com/hashicorp/consul/lib/rand.go create mode 100644 vendor/github.com/hashicorp/consul/lib/string.go diff --git a/vendor/github.com/hashicorp/consul/lib/cluster.go b/vendor/github.com/hashicorp/consul/lib/cluster.go new file mode 100644 index 000000000000..5df39251d8b4 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/cluster.go @@ -0,0 +1,36 @@ +package lib + +import ( + "math/rand" + "time" +) + +// DurationMinusBuffer returns a duration, minus a buffer and jitter +// subtracted from the duration. This function is used primarily for +// servicing Consul TTL Checks in advance of the TTL. +func DurationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration { + d := intv - buffer + d -= RandomStagger(time.Duration(int64(d) / jitter)) + return d +} + +// Returns a random stagger interval between 0 and the duration +func RandomStagger(intv time.Duration) time.Duration { + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} + +// RateScaledInterval is used to choose an interval to perform an action in +// order to target an aggregate number of actions per second across the whole +// cluster. +func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration { + const minRate = 1 / 86400 // 1/(1 * time.Day) + if rate <= minRate { + return min + } + interval := time.Duration(float64(time.Second) * float64(n) / rate) + if interval < min { + return min + } + + return interval +} diff --git a/vendor/github.com/hashicorp/consul/lib/math.go b/vendor/github.com/hashicorp/consul/lib/math.go new file mode 100644 index 000000000000..1d0b6dc0f6b5 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/math.go @@ -0,0 +1,22 @@ +package lib + +func AbsInt(a int) int { + if a > 0 { + return a + } + return a * -1 +} + +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func MinInt(a, b int) int { + if a > b { + return b + } + return a +} diff --git a/vendor/github.com/hashicorp/consul/lib/rand.go b/vendor/github.com/hashicorp/consul/lib/rand.go new file mode 100644 index 000000000000..48307e63f324 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/rand.go @@ -0,0 +1,18 @@ +package lib + +import ( + "math/rand" + "sync" + "time" +) + +var ( + once sync.Once +) + +// SeedMathRand provides weak, but guaranteed seeding, which is better than +// running with Go's default seed of 1. A call to SeedMathRand() is expected +// to be called via init(), but never a second time. +func SeedMathRand() { + once.Do(func() { rand.Seed(time.Now().UTC().UnixNano()) }) +} diff --git a/vendor/github.com/hashicorp/consul/lib/string.go b/vendor/github.com/hashicorp/consul/lib/string.go new file mode 100644 index 000000000000..0780abb632c2 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/string.go @@ -0,0 +1,11 @@ +package lib + +// StrContains checks if a list contains a string +func StrContains(l []string, s string) bool { + for _, v := range l { + if v == s { + return true + } + } + return false +} From c0bbeba5adf837bbdab4bdb954f071a2524cf549 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:15:05 -0700 Subject: [PATCH 04/21] Teach Vault how to register with Consul Vault will now register itself with Consul. The active node can be found using `active.vault.service.consul`. All standby vaults are available via `standby.vault.service.consul`. All unsealed vaults are considered healthy and available via `vault.service.consul`. Change in status and registration is event driven and should happen at the speed of a write to Consul (~network RTT + ~1x fsync(2)). Healthy/active: ``` curl -X GET 'http://127.0.0.1:8500/v1/health/service/vault?pretty' && echo; [ { "Node": { "Node": "vm1", "Address": "127.0.0.1", "TaggedAddresses": { "wan": "127.0.0.1" }, "CreateIndex": 3, "ModifyIndex": 20 }, "Service": { "ID": "vault:127.0.0.1:8200", "Service": "vault", "Tags": [ "active" ], "Address": "127.0.0.1", "Port": 8200, "EnableTagOverride": false, "CreateIndex": 17, "ModifyIndex": 20 }, "Checks": [ { "Node": "vm1", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm1", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "passing", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "", "ServiceID": "vault:127.0.0.1:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 19 } ] } ] ``` Healthy/standby: ``` [snip] "Service": { "ID": "vault:127.0.0.2:8200", "Service": "vault", "Tags": [ "standby" ], "Address": "127.0.0.2", "Port": 8200, "EnableTagOverride": false, "CreateIndex": 17, "ModifyIndex": 20 }, "Checks": [ { "Node": "vm2", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm2", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "passing", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "", "ServiceID": "vault:127.0.0.2:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 19 } ] } ] ``` Sealed: ``` "Checks": [ { "Node": "vm2", "CheckID": "serfHealth", "Name": "Serf Health Status", "Status": "passing", "Notes": "", "Output": "Agent alive and reachable", "ServiceID": "", "ServiceName": "", "CreateIndex": 3, "ModifyIndex": 3 }, { "Node": "vm2", "CheckID": "vault-sealed-check", "Name": "Vault Sealed Status", "Status": "critical", "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", "Output": "Vault Sealed", "ServiceID": "vault:127.0.0.2:8200", "ServiceName": "vault", "CreateIndex": 19, "ModifyIndex": 38 } ] ``` --- command/server.go | 6 + physical/consul.go | 293 +++++++++++++- physical/consul_test.go | 361 ++++++++++++++++++ .../hashicorp/consul/lib/cluster.go | 22 +- 4 files changed, 673 insertions(+), 9 deletions(-) diff --git a/command/server.go b/command/server.go index 0af41cddd266..87b55750196b 100644 --- a/command/server.go +++ b/command/server.go @@ -203,6 +203,9 @@ func (c *ServerCommand) Run(args []string) int { if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" { coreConfig.AdvertiseAddr = envAA + if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { + consulBackend.UpdateAdvertiseAddr(envAA) + } } // Attempt to detect the advertise address, if possible @@ -220,6 +223,9 @@ func (c *ServerCommand) Run(args []string) int { c.Ui.Error("Failed to detect advertise address.") } else { coreConfig.AdvertiseAddr = advertise + if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { + consulBackend.UpdateAdvertiseAddr(advertise) + } } } diff --git a/physical/consul.go b/physical/consul.go index 00a59af3b598..4a639a982d09 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -3,8 +3,11 @@ package physical import ( "fmt" "io/ioutil" + "net" + "net/url" "strconv" "strings" + "sync" "time" "crypto/tls" @@ -12,18 +15,50 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/errwrap" "github.com/hashicorp/go-cleanhttp" ) +const ( + // checkJitterFactor specifies the jitter factor used to stagger checks + checkJitterFactor = 16 + + // checkMinBuffer specifies provides a guarantee that a check will not + // be executed too close to the TTL check timeout + checkMinBuffer = 100 * time.Millisecond + + // defaultCheckTimeout changes the timeout of TTL checks + defaultCheckTimeout = 5 * time.Second + + // defaultCheckInterval specifies the default interval used to send + // checks + defaultCheckInterval = 4 * time.Second + + // defaultServiceName is the default Consul service name used when + // advertising a Vault instance. + defaultServiceName = "vault" +) + // ConsulBackend is a physical backend that stores data at specific // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ConsulBackend struct { - path string - client *api.Client - kv *api.KV - permitPool *PermitPool + path string + client *api.Client + kv *api.KV + permitPool *PermitPool + serviceLock sync.RWMutex + service *api.AgentServiceRegistration + sealedCheck *api.AgentCheckRegistration + advertiseAddr string + consulClientConf *api.Config + serviceName string + running bool + active bool + sealed bool + checkTimeout time.Duration + checkTimer *time.Timer } // newConsulBackend constructs a Consul backend using the given API client @@ -43,6 +78,28 @@ func newConsulBackend(conf map[string]string) (Backend, error) { path = strings.TrimPrefix(path, "/") } + // Get the service name to advertise in Consul + service, ok := conf["service"] + if !ok { + service = defaultServiceName + } + + checkTimeout := defaultCheckTimeout + checkTimeoutStr, ok := conf["check_timeout"] + if ok { + d, err := time.ParseDuration(checkTimeoutStr) + if err != nil { + return nil, err + } + + min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) + if d < min { + return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min) + } + + checkTimeout = d + } + // Configure the client consulConf := api.DefaultConfig() @@ -84,14 +141,234 @@ func newConsulBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ConsulBackend{ - path: path, - client: client, - kv: client.KV(), - permitPool: NewPermitPool(maxParInt), + path: path, + client: client, + kv: client.KV(), + permitPool: NewPermitPool(maxParInt), + consulClientConf: consulConf, + serviceName: service, + checkTimeout: checkTimeout, + checkTimer: time.NewTimer(checkTimeout), } return c, nil } +// UpdateAdvertiseAddr provides a pre-initialization hook for updating +// Consul's advertise address. +func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error { + if c.running { + return fmt.Errorf("service registration unable to update advertise address, backend already running") + } + + url, err := url.Parse(addr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse URL "%v": {{err}}`, addr), err) + } + + _, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + _, err = strconv.ParseInt(portStr, 10, 0) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse port "%v": {{err}}`, portStr), err) + } + + c.advertiseAddr = addr + return nil +} + +// serviceTags returns all of the relevant tags for Consul. Assumes +// c.serviceLock held for writing. +func serviceTags(active bool) []string { + activeTag := "standby" + if active { + activeTag = "active" + } + return []string{activeTag} +} + +func (c *ConsulBackend) AdvertiseActive(active bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + c.service.Tags = serviceTags(active) + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } + + // Save a cached copy of the active state: no way to query Core + c.active = active + + return nil +} + +func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.sealed = sealed + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + // Push a TTL check immediately to update the state + c.runCheck() + + return nil +} + +func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + if c.running { + return fmt.Errorf("service registration routine already running") + } + + url, err := url.Parse(c.advertiseAddr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) + } + + host, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + port, err := strconv.ParseInt(portStr, 10, 0) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + } + + serviceID, err := c.serviceID() + if err != nil { + return err + } + + c.service = &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: int(port), + Address: host, + EnableTagOverride: false, + } + + checkStatus := "failing" + if !c.sealed { + checkStatus = "passing" + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: checkStatus, + }, + } + + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } + + if err := agent.CheckRegister(c.sealedCheck); err != nil { + return errwrap.Wrapf("service registration check registration failed: {{err}}", err) + } + + go c.checkRunner(shutdownCh) + c.running = true + + // Deregister upon shutdown + go func() { + shutdown: + for { + select { + case <-shutdownCh: + // wtb logger: log.Printf("[DEBUG]: Shutting down consul backend") + break shutdown + } + } + + if err := agent.ServiceDeregister(serviceID); err != nil { + // wtb logger: log.Printf("[WARNING]: service deregistration failed: {{err}}", err) + } + c.running = false + }() + + return nil +} + +// checkRunner periodically runs TTL checks +func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) { + defer c.checkTimer.Stop() + + for { + select { + case <-c.checkTimer.C: + go func() { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.runCheck() + }() + case <-shutdownCh: + return + } + } +} + +// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held +// exclusively. +func (c *ConsulBackend) runCheck() { + // Reset timer before calling run check in order to not slide the + // window of the next check. + c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)) + + // Run a TTL check + agent := c.client.Agent() + if !c.sealed { + agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing) + } else { + agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical) + } +} + +// checkID returns the ID used for a Consul Check. Assume at least a read +// lock is held. +func (c *ConsulBackend) checkID() string { + return "vault-sealed-check" +} + +// serviceID returns the Vault ServiceID for use in Consul. Assume at least +// a read lock is held. +func (c *ConsulBackend) serviceID() (string, error) { + url, err := url.Parse(c.advertiseAddr) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) + } + + host, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + port, err := strconv.ParseInt(portStr, 10, 0) + if err != nil { + return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + } + + return fmt.Sprintf("%s:%s:%d", c.serviceName, host, int(port)), nil +} + func setupTLSConfig(conf map[string]string) (*tls.Config, error) { serverName := strings.Split(conf["address"], ":") diff --git a/physical/consul_test.go b/physical/consul_test.go index b4d5d6bb9d00..b343de035600 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -3,12 +3,373 @@ package physical import ( "fmt" "os" + "reflect" "testing" "time" "github.com/hashicorp/consul/api" ) +type consulConf map[string]string + +var ( + addrCount int = 0 +) + +func testHostIP() string { + a := addrCount + addrCount++ + return fmt.Sprintf("127.0.0.%d", a) +} + +func testConsulBackend(t *testing.T) *ConsulBackend { + return testConsulBackendConfig(t, &consulConf{}) +} + +func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { + const serviceID = "vaultTestService" + be, err := newConsulBackend(*conf) + if err != nil { + t.Fatalf("Expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend") + } + + c.service = &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: 8200, + Address: testHostIP(), + EnableTagOverride: false, + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: api.HealthPassing, + }, + } + + return c +} + +func testConsul_testConsulBackend(t *testing.T) { + c := testConsulBackend(t) + if c == nil { + t.Fatalf("bad") + } + + if c.active != false { + t.Fatalf("bad") + } + + if c.sealed != false { + t.Fatalf("bad") + } + + if c.service == nil { + t.Fatalf("bad") + } + + if c.sealedCheck == nil { + t.Fatalf("bad") + } +} + +func TestConsul_newConsulBackend(t *testing.T) { + tests := []struct { + Name string + Config map[string]string + Fail bool + checkTimeout time.Duration + path string + service string + address string + scheme string + token string + max_parallel int + }{ + { + Name: "Valid default config", + Config: map[string]string{}, + checkTimeout: 5 * time.Second, + path: "vault", + service: "vault", + address: "127.0.0.1", + scheme: "http", + token: "", + max_parallel: 4, + }, + { + Name: "Valid modified config", + Config: map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + }, + checkTimeout: 6 * time.Second, + path: "seaTech/", + service: "astronomy", + address: "127.0.0.2", + scheme: "https", + token: "deadbeef-cafeefac-deadc0de-feedface", + max_parallel: 4, + }, + { + Config: map[string]string{ + "check_timeout": "99ms", + }, + Fail: true, + }, + } + + for _, test := range tests { + be, err := newConsulBackend(test.Config) + if test.Fail && err == nil { + t.Fatalf("Expected config %s to fail", test.Name) + } else if !test.Fail && err != nil { + t.Fatalf("Expected config %s to not fail: %v", test.Name, err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend") + } + + if test.checkTimeout != c.checkTimeout { + t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout) + } + + if test.path != c.path { + t.Errorf("bad: %v != %v", test.path, c.path) + } + + if test.service != c.serviceName { + t.Errorf("bad: %v != %v", test.service, c.serviceName) + } + + if test.address != c.consulClientConf.Address { + t.Errorf("bad: %v != %v", test.address, c.consulClientConf.Address) + } + + if test.scheme != c.consulClientConf.Scheme { + t.Errorf("bad: %v != %v", test.scheme, c.consulClientConf.Scheme) + } + + if test.token != c.consulClientConf.Token { + t.Errorf("bad: %v != %v", test.token, c.consulClientConf.Token) + } + + // FIXME(sean@): Unable to test max_parallel + // if test.max_parallel != cap(c.permitPool) { + // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) + // } + } +} + +func TestConsul_serviceTags(t *testing.T) { + tests := []struct { + active bool + tags []string + }{ + { + active: true, + tags: []string{"active"}, + }, + { + active: false, + tags: []string{"standby"}, + }, + } + + for _, test := range tests { + tags := serviceTags(test.active) + if !reflect.DeepEqual(tags[:], test.tags[:]) { + t.Errorf("Bad %v: %v %v", test.active, tags, test.tags) + } + } +} + +func TestConsul_UpdateAdvertiseAddr(t *testing.T) { + tests := []struct { + addr string + pass bool + }{ + { + addr: "http://127.0.0.1:8200/", + pass: true, + }, + { + addr: "http://127.0.0.1:8200", + pass: true, + }, + { + addr: "127.0.0.1:8200", + pass: false, + }, + { + addr: "127.0.0.1", + pass: false, + }, + } + for _, test := range tests { + c := testConsulBackend(t) + if c == nil { + t.Fatalf("bad") + } + + err := c.UpdateAdvertiseAddr(test.addr) + if test.pass { + if err != nil { + t.Fatalf("bad: %v", err) + } + } else { + if err == nil { + t.Fatalf("bad, expected fail") + } else { + continue + } + } + + if c.advertiseAddr != test.addr { + t.Fatalf("bad: %v != %v", c.advertiseAddr, test.addr) + } + } +} + +func TestConsul_AdvertiseActive(t *testing.T) { + c := testConsulBackend(t) + + if c.active != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } +} + +func TestConsul_AdvertiseSealed(t *testing.T) { + c := testConsulBackend(t) + + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.sealed != true { + t.Fatalf("bad") + } +} + +func TestConsul_checkID(t *testing.T) { + c := testConsulBackend(t) + if c.checkID() != "vault-sealed-check" { + t.Errorf("bad") + } +} + +func TestConsul_serviceID(t *testing.T) { + passingTests := []struct { + advertiseAddr string + serviceName string + expected string + }{ + { + advertiseAddr: "http://127.0.0.1:8200", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + advertiseAddr: "http://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + advertiseAddr: "https://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + } + + for _, test := range passingTests { + c := testConsulBackendConfig(t, &consulConf{ + "service": test.serviceName, + }) + + if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { + t.Fatalf("bad: %v", err) + } + + serviceID, err := c.serviceID() + if err != nil { + t.Fatalf("bad: %v", err) + } + + if serviceID != test.expected { + t.Fatalf("bad: %v != %v", serviceID, test.expected) + } + } +} + func TestConsulBackend(t *testing.T) { addr := os.Getenv("CONSUL_ADDR") if addr == "" { diff --git a/vendor/github.com/hashicorp/consul/lib/cluster.go b/vendor/github.com/hashicorp/consul/lib/cluster.go index 5df39251d8b4..a95232c5737b 100644 --- a/vendor/github.com/hashicorp/consul/lib/cluster.go +++ b/vendor/github.com/hashicorp/consul/lib/cluster.go @@ -10,12 +10,32 @@ import ( // servicing Consul TTL Checks in advance of the TTL. func DurationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration { d := intv - buffer - d -= RandomStagger(time.Duration(int64(d) / jitter)) + if jitter == 0 { + d -= RandomStagger(d) + } else { + d -= RandomStagger(time.Duration(int64(d) / jitter)) + } return d } +// DurationMinusBufferDomain returns the domain of valid durations from a +// call to DurationMinusBuffer. This function is used to check user +// specified input values to DurationMinusBuffer. +func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) { + max = intv - buffer + if jitter == 0 { + min = max + } else { + min = max - time.Duration(int64(max)/jitter) + } + return min, max +} + // Returns a random stagger interval between 0 and the duration func RandomStagger(intv time.Duration) time.Duration { + if intv == 0 { + return 0 + } return time.Duration(uint64(rand.Int63()) % uint64(intv)) } From afa6c22fec2fa55389f984a867da9020bae2a2a4 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:16:00 -0700 Subject: [PATCH 05/21] `go fmt` the PostgreSQL backend --- physical/postgresql.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/physical/postgresql.go b/physical/postgresql.go index 94d77294399b..e753e007d651 100644 --- a/physical/postgresql.go +++ b/physical/postgresql.go @@ -69,8 +69,8 @@ func newPostgreSQLBackend(conf map[string]string) (Backend, error) { "put": put_statement, "get": "SELECT value FROM " + quoted_table + " WHERE path = $1 AND key = $2", "delete": "DELETE FROM " + quoted_table + " WHERE path = $1 AND key = $2", - "list": "SELECT key FROM " + quoted_table + " WHERE path = $1" + - "UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1", + "list": "SELECT key FROM " + quoted_table + " WHERE path = $1" + + "UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1", } for name, query := range statements { if err := m.prepare(name, query); err != nil { @@ -97,18 +97,18 @@ func (m *PostgreSQLBackend) splitKey(fullPath string) (string, string, string) { var path string pieces := strings.Split(fullPath, "/") - depth := len(pieces) - key := pieces[depth-1] - + depth := len(pieces) + key := pieces[depth-1] + if depth == 1 { parentPath = "" - path = "/" + path = "/" } else if depth == 2 { parentPath = "/" - path = "/" + pieces[0] + "/" + path = "/" + pieces[0] + "/" } else { parentPath = "/" + strings.Join(pieces[:depth-2], "/") + "/" - path = "/" + strings.Join(pieces[:depth-1], "/") + "/" + path = "/" + strings.Join(pieces[:depth-1], "/") + "/" } return parentPath, path, key From c92f9cb9ab55ea919dd97078313f560af3eb1537 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:16:36 -0700 Subject: [PATCH 06/21] Don't export the builtin backends --- physical/physical.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/physical/physical.go b/physical/physical.go index 881d9eae9645..5702ce3cdc1c 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -91,9 +91,9 @@ type Entry struct { type Factory func(map[string]string) (Backend, error) // NewBackend returns a new backend with the given type and configuration. -// The backend is looked up in the BuiltinBackends variable. +// The backend is looked up in the builtinBackends variable. func NewBackend(t string, conf map[string]string) (Backend, error) { - f, ok := BuiltinBackends[t] + f, ok := builtinBackends[t] if !ok { return nil, fmt.Errorf("unknown physical backend type: %s", t) } @@ -102,7 +102,7 @@ func NewBackend(t string, conf map[string]string) (Backend, error) { // BuiltinBackends is the list of built-in physical backends that can // be used with NewBackend. -var BuiltinBackends = map[string]Factory{ +var builtinBackends = map[string]Factory{ "inmem": func(map[string]string) (Backend, error) { return NewInmem(), nil }, From e54c990f50b02521616a573bbbf160d54b87c93b Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:17:07 -0700 Subject: [PATCH 07/21] Detect type conversion failure --- physical/consul.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/physical/consul.go b/physical/consul.go index 4a639a982d09..1ef63c3c11d6 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -511,7 +511,10 @@ func (c *ConsulBackend) DetectHostAddr() (string, error) { if err != nil { return "", err } - addr := self["Member"]["Addr"].(string) + addr, ok := self["Member"]["Addr"].(string) + if !ok { + return "", fmt.Errorf("Unable to convert an address to string") + } return addr, nil } From 9a2115181b211fa8c9fb0e86172b6412b061b037 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 17:18:18 -0700 Subject: [PATCH 08/21] Improve error handling re: homedir expansion Useful if the HOME envvar is not set because `vault` was launched in a clean environment (e.g. `env -i vault ...`). --- command/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/command/config.go b/command/config.go index ffe7903b9d63..ee866faade0b 100644 --- a/command/config.go +++ b/command/config.go @@ -53,9 +53,10 @@ func LoadConfig(path string) (*DefaultConfig, error) { path = v } + // NOTE: requires HOME env var to be set path, err := homedir.Expand(path) if err != nil { - return nil, fmt.Errorf("Error expanding config path: %s", err) + return nil, fmt.Errorf("Error expanding config path %s: %s", path, err) } contents, err := ioutil.ReadFile(path) From 40a3c534f47b476399c4ab7285243749797054be Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 18:05:56 -0700 Subject: [PATCH 09/21] Compare the correct values when validating check_timeout --- physical/consul.go | 2 +- physical/consul_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index 1ef63c3c11d6..5b957407dd05 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -93,7 +93,7 @@ func newConsulBackend(conf map[string]string) (Backend, error) { } min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) - if d < min { + if min < checkMinBuffer { return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min) } diff --git a/physical/consul_test.go b/physical/consul_test.go index b343de035600..6fb516844330 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -128,10 +128,11 @@ func TestConsul_newConsulBackend(t *testing.T) { max_parallel: 4, }, { + Name: "check timeout too short", + Fail: true, Config: map[string]string{ "check_timeout": "99ms", }, - Fail: true, }, } From 53dd43650e7736d03817dac5848f62e3e0665d60 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 19:53:21 -0700 Subject: [PATCH 10/21] Various refactoring to clean up code organization Brought to you by: Dept of 2nd thoughts before pushing enter on `git push` --- command/server.go | 11 +- physical/consul.go | 146 ++++++++++++----------- physical/consul_test.go | 132 +++++++++++--------- physical/physical.go | 10 +- website/source/docs/config/index.html.md | 3 + 5 files changed, 167 insertions(+), 135 deletions(-) diff --git a/command/server.go b/command/server.go index 87b55750196b..ae26b534d100 100644 --- a/command/server.go +++ b/command/server.go @@ -203,9 +203,6 @@ func (c *ServerCommand) Run(args []string) int { if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" { coreConfig.AdvertiseAddr = envAA - if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { - consulBackend.UpdateAdvertiseAddr(envAA) - } } // Attempt to detect the advertise address, if possible @@ -223,9 +220,6 @@ func (c *ServerCommand) Run(args []string) int { c.Ui.Error("Failed to detect advertise address.") } else { coreConfig.AdvertiseAddr = advertise - if consulBackend, ok := (backend).(*physical.ConsulBackend); ok { - consulBackend.UpdateAdvertiseAddr(advertise) - } } } @@ -296,6 +290,11 @@ func (c *ServerCommand) Run(args []string) int { if coreConfig.HAPhysical != nil { sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) if ok { + if err := sd.UpdateAdvertiseAddr(coreConfig.AdvertiseAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error configuring service discovery: %v", err)) + return 1 + } + if err := sd.RunServiceDiscovery(c.ShutdownCh); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) return 1 diff --git a/physical/consul.go b/physical/consul.go index 5b957407dd05..e2f81c39171d 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -44,21 +44,23 @@ const ( // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ConsulBackend struct { - path string - client *api.Client - kv *api.KV - permitPool *PermitPool - serviceLock sync.RWMutex - service *api.AgentServiceRegistration - sealedCheck *api.AgentCheckRegistration - advertiseAddr string - consulClientConf *api.Config - serviceName string - running bool - active bool - sealed bool - checkTimeout time.Duration - checkTimer *time.Timer + path string + client *api.Client + kv *api.KV + permitPool *PermitPool + serviceLock sync.RWMutex + service *api.AgentServiceRegistration + sealedCheck *api.AgentCheckRegistration + advertiseHost string + advertisePort int + consulClientConf *api.Config + serviceName string + running bool + active bool + sealed bool + disableRegistration bool + checkTimeout time.Duration + checkTimer *time.Timer } // newConsulBackend constructs a Consul backend using the given API client @@ -78,6 +80,17 @@ func newConsulBackend(conf map[string]string) (Backend, error) { path = strings.TrimPrefix(path, "/") } + // Allow admins to disable consul integration + disableReg, ok := conf["disable_registration"] + var disableRegistration bool + if ok && disableReg != "" { + b, err := strconv.ParseBool(disableReg) + if err != nil { + return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err) + } + disableRegistration = b + } + // Get the service name to advertise in Consul service, ok := conf["service"] if !ok { @@ -141,14 +154,15 @@ func newConsulBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ConsulBackend{ - path: path, - client: client, - kv: client.KV(), - permitPool: NewPermitPool(maxParInt), - consulClientConf: consulConf, - serviceName: service, - checkTimeout: checkTimeout, - checkTimer: time.NewTimer(checkTimeout), + path: path, + client: client, + kv: client.KV(), + permitPool: NewPermitPool(maxParInt), + consulClientConf: consulConf, + serviceName: service, + checkTimeout: checkTimeout, + checkTimer: time.NewTimer(checkTimeout), + disableRegistration: disableRegistration, } return c, nil } @@ -160,21 +174,13 @@ func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error { return fmt.Errorf("service registration unable to update advertise address, backend already running") } - url, err := url.Parse(addr) + host, port, err := parseAdvertiseAddr(addr) if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse URL "%v": {{err}}`, addr), err) + return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise address "%v": {{err}}`, addr), err) } - _, portStr, err := net.SplitHostPort(url.Host) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) - } - _, err = strconv.ParseInt(portStr, 10, 0) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse port "%v": {{err}}`, portStr), err) - } - - c.advertiseAddr = addr + c.advertiseHost = host + c.advertisePort = int(port) return nil } @@ -197,10 +203,12 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error { return nil } - c.service.Tags = serviceTags(active) - agent := c.client.Agent() - if err := agent.ServiceRegister(c.service); err != nil { - return errwrap.Wrapf("service registration failed: {{err}}", err) + if !c.disableRegistration { + c.service.Tags = serviceTags(active) + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } } // Save a cached copy of the active state: no way to query Core @@ -219,8 +227,10 @@ func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { return nil } - // Push a TTL check immediately to update the state - c.runCheck() + if !c.disableRegistration { + // Push a TTL check immediately to update the state + c.runCheck() + } return nil } @@ -229,35 +239,22 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err err c.serviceLock.Lock() defer c.serviceLock.Unlock() - if c.running { - return fmt.Errorf("service registration routine already running") - } - - url, err := url.Parse(c.advertiseAddr) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) + if c.disableRegistration { + return nil } - host, portStr, err := net.SplitHostPort(url.Host) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) - } - port, err := strconv.ParseInt(portStr, 10, 0) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + if c.running { + return fmt.Errorf("service registration routine already running") } - serviceID, err := c.serviceID() - if err != nil { - return err - } + serviceID := c.serviceID() c.service = &api.AgentServiceRegistration{ ID: serviceID, Name: c.serviceName, Tags: serviceTags(c.active), - Port: int(port), - Address: host, + Port: c.advertisePort, + Address: c.advertiseHost, EnableTagOverride: false, } @@ -351,22 +348,31 @@ func (c *ConsulBackend) checkID() string { // serviceID returns the Vault ServiceID for use in Consul. Assume at least // a read lock is held. -func (c *ConsulBackend) serviceID() (string, error) { - url, err := url.Parse(c.advertiseAddr) - if err != nil { - return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err) +func (c *ConsulBackend) serviceID() string { + return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) +} + +func parseAdvertiseAddr(addr string) (host string, port int, err error) { + if addr == "" { + return "", -1, fmt.Errorf("advertise address must not be empty") } - host, portStr, err := net.SplitHostPort(url.Host) + url, err := url.Parse(addr) if err != nil { - return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + return "", -2, errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) } - port, err := strconv.ParseInt(portStr, 10, 0) + + var portStr string + host, portStr, err = net.SplitHostPort(url.Host) if err != nil { - return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err) + return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + portNum, err := strconv.ParseInt(portStr, 10, 0) + if err != nil || portNum < 1 || portNum > 65535 { + return "", -4, errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) } - return fmt.Sprintf("%s:%s:%d", c.serviceName, host, int(port)), nil + return host, int(portNum), nil } func setupTLSConfig(conf map[string]string) (*tls.Config, error) { diff --git a/physical/consul_test.go b/physical/consul_test.go index 6fb516844330..2c9e77d89dbf 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -86,67 +86,81 @@ func testConsul_testConsulBackend(t *testing.T) { func TestConsul_newConsulBackend(t *testing.T) { tests := []struct { - Name string - Config map[string]string - Fail bool - checkTimeout time.Duration - path string - service string - address string - scheme string - token string - max_parallel int + name string + consulConfig map[string]string + fail bool + advertiseAddr string + checkTimeout time.Duration + path string + service string + address string + scheme string + token string + max_parallel int + disableReg bool }{ { - Name: "Valid default config", - Config: map[string]string{}, - checkTimeout: 5 * time.Second, - path: "vault", - service: "vault", - address: "127.0.0.1", - scheme: "http", - token: "", - max_parallel: 4, + name: "Valid default config", + consulConfig: map[string]string{}, + checkTimeout: 5 * time.Second, + advertiseAddr: "http://127.0.0.1:8200", + path: "vault/", + service: "vault", + address: "127.0.0.1:8500", + scheme: "http", + token: "", + max_parallel: 4, + disableReg: false, }, { - Name: "Valid modified config", - Config: map[string]string{ - "path": "seaTech/", - "service": "astronomy", - "check_timeout": "6s", - "address": "127.0.0.2", - "scheme": "https", - "token": "deadbeef-cafeefac-deadc0de-feedface", - "max_parallel": "4", + name: "Valid modified config", + consulConfig: map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "advertiseAddr": "http://127.0.0.2:8200", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + "disable_registration": "false", }, - checkTimeout: 6 * time.Second, - path: "seaTech/", - service: "astronomy", - address: "127.0.0.2", - scheme: "https", - token: "deadbeef-cafeefac-deadc0de-feedface", - max_parallel: 4, + checkTimeout: 6 * time.Second, + path: "seaTech/", + service: "astronomy", + advertiseAddr: "http://127.0.0.2:8200", + address: "127.0.0.2", + scheme: "https", + token: "deadbeef-cafeefac-deadc0de-feedface", + max_parallel: 4, }, { - Name: "check timeout too short", - Fail: true, - Config: map[string]string{ + name: "check timeout too short", + fail: true, + consulConfig: map[string]string{ "check_timeout": "99ms", }, }, } for _, test := range tests { - be, err := newConsulBackend(test.Config) - if test.Fail && err == nil { - t.Fatalf("Expected config %s to fail", test.Name) - } else if !test.Fail && err != nil { - t.Fatalf("Expected config %s to not fail: %v", test.Name, err) + be, err := newConsulBackend(test.consulConfig) + if test.fail { + if err == nil { + t.Fatalf(`Expected config "%s" to fail`, test.name) + } else { + continue + } + } else if !test.fail && err != nil { + t.Fatalf("Expected config %s to not fail: %v", test.name, err) } c, ok := be.(*ConsulBackend) if !ok { - t.Fatalf("Expected ConsulBackend") + t.Fatalf("Expected ConsulBackend: %s", test.name) + } + if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { + t.Fatalf("bad: %v", err) } if test.checkTimeout != c.checkTimeout { @@ -154,7 +168,7 @@ func TestConsul_newConsulBackend(t *testing.T) { } if test.path != c.path { - t.Errorf("bad: %v != %v", test.path, c.path) + t.Errorf("bad: %s %v != %v", test.name, test.path, c.path) } if test.service != c.serviceName { @@ -162,7 +176,7 @@ func TestConsul_newConsulBackend(t *testing.T) { } if test.address != c.consulClientConf.Address { - t.Errorf("bad: %v != %v", test.address, c.consulClientConf.Address) + t.Errorf("bad: %s %v != %v", test.name, test.address, c.consulClientConf.Address) } if test.scheme != c.consulClientConf.Scheme { @@ -206,14 +220,20 @@ func TestConsul_serviceTags(t *testing.T) { func TestConsul_UpdateAdvertiseAddr(t *testing.T) { tests := []struct { addr string + host string + port int pass bool }{ { addr: "http://127.0.0.1:8200/", + host: "127.0.0.1", + port: 8200, pass: true, }, { addr: "http://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, pass: true, }, { @@ -244,8 +264,12 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) { } } - if c.advertiseAddr != test.addr { - t.Fatalf("bad: %v != %v", c.advertiseAddr, test.addr) + if c.advertiseHost != test.host { + t.Fatalf("bad: %v != %v", c.advertiseHost, test.host) + } + + if c.advertisePort != test.port { + t.Fatalf("bad: %v != %v", c.advertisePort, test.port) } } } @@ -330,21 +354,25 @@ func TestConsul_checkID(t *testing.T) { func TestConsul_serviceID(t *testing.T) { passingTests := []struct { + name string advertiseAddr string serviceName string expected string }{ { + name: "valid host w/o slash", advertiseAddr: "http://127.0.0.1:8200", serviceName: "sea-tech-astronomy", expected: "sea-tech-astronomy:127.0.0.1:8200", }, { + name: "valid host w/ slash", advertiseAddr: "http://127.0.0.1:8200/", serviceName: "sea-tech-astronomy", expected: "sea-tech-astronomy:127.0.0.1:8200", }, { + name: "valid https host w/ slash", advertiseAddr: "https://127.0.0.1:8200/", serviceName: "sea-tech-astronomy", expected: "sea-tech-astronomy:127.0.0.1:8200", @@ -357,14 +385,10 @@ func TestConsul_serviceID(t *testing.T) { }) if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { - t.Fatalf("bad: %v", err) - } - - serviceID, err := c.serviceID() - if err != nil { - t.Fatalf("bad: %v", err) + t.Fatalf("bad: %s %v", test.name, err) } + serviceID := c.serviceID() if serviceID != test.expected { t.Fatalf("bad: %v != %v", serviceID, test.expected) } diff --git a/physical/physical.go b/physical/physical.go index 5702ce3cdc1c..c2242c067c71 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -43,11 +43,6 @@ type HABackend interface { type AdvertiseDetect interface { // DetectHostAddr is used to detect the host address DetectHostAddr() (string, error) - - // UpdateAdvertiseAddr allows for a non-Running backend to update the - // advertise address. HABackends may want to present a different - // address that wasn't available when a Backend was created. - UpdateAdvertiseAddr(addr string) error } // ServiceDiscovery is an optional interface that an HABackend can implement. @@ -65,6 +60,11 @@ type ServiceDiscovery interface { // Run executes any background service discovery tasks until the // shutdown channel is closed. RunServiceDiscovery(ShutdownChannel) error + + // UpdateAdvertiseAddr allows for a non-Running backend to update the + // advertise address. HABackends may want to present a different + // address that wasn't available when a Backend was created. + UpdateAdvertiseAddr(addr string) error } type Lock interface { diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 237efd1dba35..62a495947e83 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -200,6 +200,9 @@ For Consul, the following options are supported: * `scheme` (optional) - "http" or "https" for talking to Consul. + * `disable_registration` (optional) - If true, then Vault will not register + itself with Vault. Defaults to "false". + * `token` (optional) - An access token to use to write data to Consul. * `max_parallel` (optional) - The maximum number of connections to Consul; From 3e43da258a87620d51b3927b6c1ee88ac429bc5c Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 19:53:51 -0700 Subject: [PATCH 11/21] Use spaces in tests to be consistent The rest of the tests here use spaces, not tabs --- command/server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command/server_test.go b/command/server_test.go index 5c7a58dc9eb9..9ace8611bfeb 100644 --- a/command/server_test.go +++ b/command/server_test.go @@ -32,13 +32,13 @@ listener "tcp" { consulhcl = ` backend "consul" { prefix = "foo/" - advertise_addr = "http://127.0.0.1:8200" + advertise_addr = "http://127.0.0.1:8200" } ` haconsulhcl = ` ha_backend "consul" { prefix = "bar/" - advertise_addr = "http://127.0.0.1:8200" + advertise_addr = "http://127.0.0.1:8200" } ` From 1601508e523980502c4f09186f5d2a5599e15958 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 19:54:46 -0700 Subject: [PATCH 12/21] Consistently skip Consul checks Hide all Consul checks behind `CONSUL_HTTP_ADDR` env vs `CONSUL_ADDR` which is non-standard. --- physical/consul_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/physical/consul_test.go b/physical/consul_test.go index 2c9e77d89dbf..a25bd1129f7b 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -275,6 +275,11 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) { } func TestConsul_AdvertiseActive(t *testing.T) { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + t.Skipf("No consul process running, skipping test") + } + c := testConsulBackend(t) if c.active != false { @@ -303,6 +308,11 @@ func TestConsul_AdvertiseActive(t *testing.T) { } func TestConsul_AdvertiseSealed(t *testing.T) { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + t.Skipf("No consul process running, skipping test") + } + c := testConsulBackend(t) if c.sealed != false { @@ -396,9 +406,9 @@ func TestConsul_serviceID(t *testing.T) { } func TestConsulBackend(t *testing.T) { - addr := os.Getenv("CONSUL_ADDR") + addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { - t.SkipNow() + t.Skipf("No consul process running, skipping test") } conf := api.DefaultConfig() @@ -427,9 +437,9 @@ func TestConsulBackend(t *testing.T) { } func TestConsulHABackend(t *testing.T) { - addr := os.Getenv("CONSUL_ADDR") + addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { - t.SkipNow() + t.Skipf("No consul process running, skipping test") } conf := api.DefaultConfig() From 529f3e50c4fce1fc8e5db2a72d9e67275ebbf91f Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 19:56:28 -0700 Subject: [PATCH 13/21] Provide documentation and example output --- website/source/docs/config/index.html.md | 86 ++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 62a495947e83..7b7f0b2cfb6c 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -200,9 +200,15 @@ For Consul, the following options are supported: * `scheme` (optional) - "http" or "https" for talking to Consul. + * `check_timeout` (optional) - The check interval used to send health check + information to Consul. Defaults to "5s". + * `disable_registration` (optional) - If true, then Vault will not register itself with Vault. Defaults to "false". + * `service` (optional) - The name of the service to register with Consul. + Defaults to "vault". + * `token` (optional) - An access token to use to write data to Consul. * `max_parallel` (optional) - The maximum number of connections to Consul; @@ -230,6 +236,86 @@ settings](https://www.consul.io/docs/agent/encryption.html): [key_file](https://www.consul.io/docs/agent/options.html#key_file) setting in Consul. +``` +// Sample Consul Backend configuration with local Consul Agent +backend "consul" { + // address MUST match Consul's `addresses.http` config value (or + // `addresses.https` depending on the scheme provided below). + address = "127.0.0.1:8500" + #address = "unix:///tmp/.consul.http.sock" + + // scheme defaults to "http" (suitable for loopback and UNIX sockets), but + // should be "https" when Consul exists on a remote node (a non-standard + // deployment). All decryption happen within Vault so this value does not + // change Vault's Threat Model. + scheme = "http" + + // token is a Consul ACL Token that has write privileges to the path + // specified below. Use of a Consul ACL Token is a best pracitce. + token = "[redacted]" // Vault's Consul ACL Token + + // path must be writable by the Consul ACL Token + path = "vault/" +} +``` + +Once properly configured, an unsealed Vault installation should be available +on the network at `active.vault.service.consul`. Unsealed Vault instances in +the standby state are available at `standby.vault.service.consul`. All +unsealed Vault instances are available as healthy in the +`vault.service.consul` pool. Sealed Vault instances will mark themselves as +critical to avoid showing up by default in Consul's service discovery. + +``` +% dig active.vault.service.consul srv +; <<>> DiG 9.8.3-P1 <<>> active.vault.service.consul srv +; (1 server found) +;; global options: +cmd +;; Got answer: +;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 11331 +;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1 +;; WARNING: recursion requested but not available + +;; QUESTION SECTION: +;active.vault.service.consul. IN SRV + +;; ANSWER SECTION: +active.vault.service.consul. 0 IN SRV 1 1 8200 vault1.node.dc1.consul. + +;; ADDITIONAL SECTION: +vault1.node.dc1.consul. 0 IN A 172.17.33.46 + +;; Query time: 0 msec +;; SERVER: 127.0.0.1#53(127.0.0.1) +;; WHEN: Sat Apr 23 17:33:14 2016 +;; MSG SIZE rcvd: 172 +% dig +short standby.vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +% dig +short vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault1.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +% dig +short vault.service.consul a +172.17.33.46 +172.17.34.32 +172.17.35.29 +vault1% vault seal +% dig +short vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +vault1% vault unseal +Key (will be hidden): +Sealed: false +Key Shares: 5 +Key Threshold: 3 +Unseal Progress: 0 +% dig +short vault.service.consul srv +1 1 8200 vault1.node.dc1.consul. +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +``` + #### Backend Reference: etcd (Community-Supported) For etcd, the following options are supported: From 38a3ea3978faf2b2ebe4e49e1814d5649c53f1c1 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 20:32:06 -0700 Subject: [PATCH 14/21] Disable service registration for consul HA tests --- command/server_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command/server_test.go b/command/server_test.go index 9ace8611bfeb..5834750882ab 100644 --- a/command/server_test.go +++ b/command/server_test.go @@ -33,12 +33,14 @@ listener "tcp" { backend "consul" { prefix = "foo/" advertise_addr = "http://127.0.0.1:8200" + disable_registration = "true" } ` haconsulhcl = ` ha_backend "consul" { prefix = "bar/" advertise_addr = "http://127.0.0.1:8200" + disable_registration = "true" } ` From f1c170e003ecd7e724262c400e6181fe1555ad21 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sat, 23 Apr 2016 20:48:22 -0700 Subject: [PATCH 15/21] Add a small bit of wording re: `disable_registration` Consul service registration for Vault requires Consul 0.6.4. --- website/source/docs/config/index.html.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 7b7f0b2cfb6c..a4ba86bed4ef 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -204,7 +204,9 @@ For Consul, the following options are supported: information to Consul. Defaults to "5s". * `disable_registration` (optional) - If true, then Vault will not register - itself with Vault. Defaults to "false". + itself with Vault. If the Consul Agent for Vault and the Consul Servers + are older than `0.6.4` it is required to set this to "true" due to API + incompatibilities. Defaults to "false". * `service` (optional) - The name of the service to register with Consul. Defaults to "vault". From f4e1594ae96d6c97af506162d433aca6fdfcc276 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sun, 24 Apr 2016 07:04:51 -0700 Subject: [PATCH 16/21] Persistently retry to update service registration If the local Consul agent is not available while attempting to step down from active or up to active, retry once a second. Allow for concurrent changes to the state with a single registration updater. Fix standby initialization. --- physical/consul.go | 56 +++++++++++++++++++++++++++++++++------------- vault/core.go | 28 ++++++++++++----------- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index e2f81c39171d..8851bac6e8ec 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "crypto/tls" @@ -38,6 +39,10 @@ const ( // defaultServiceName is the default Consul service name used when // advertising a Vault instance. defaultServiceName = "vault" + + // registrationRetryInterval specifies the retry duration to use when + // a registration to the Consul agent fails. + registrationRetryInterval = 1 * time.Second ) // ConsulBackend is a physical backend that stores data at specific @@ -51,13 +56,14 @@ type ConsulBackend struct { serviceLock sync.RWMutex service *api.AgentServiceRegistration sealedCheck *api.AgentCheckRegistration + registrationLock int64 advertiseHost string advertisePort int consulClientConf *api.Config serviceName string running bool active bool - sealed bool + unsealed bool disableRegistration bool checkTimeout time.Duration checkTimer *time.Timer @@ -184,8 +190,7 @@ func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error { return nil } -// serviceTags returns all of the relevant tags for Consul. Assumes -// c.serviceLock held for writing. +// serviceTags returns all of the relevant tags for Consul. func serviceTags(active bool) []string { activeTag := "standby" if active { @@ -203,24 +208,43 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error { return nil } - if !c.disableRegistration { - c.service.Tags = serviceTags(active) - agent := c.client.Agent() - if err := agent.ServiceRegister(c.service); err != nil { - return errwrap.Wrapf("service registration failed: {{err}}", err) - } - } - // Save a cached copy of the active state: no way to query Core c.active = active + // Ensure serial registration to the Consul agent. Allow for + // concurrent calls to update active status while a single task + // attempts, until successful, to update the Consul Agent. + if !c.disableRegistration && atomic.CompareAndSwapInt64(&c.registrationLock, 0, 1) { + defer atomic.CompareAndSwapInt64(&c.registrationLock, 1, 0) + + // Retry agent registration until successful + registration_complete: + for { + c.service.Tags = serviceTags(c.active) + agent := c.client.Agent() + err := agent.ServiceRegister(c.service) + if err == nil { + break registration_complete + } + + // wtb logger c.logger.Printf("[WARN] service registration failed: %v", err) + c.serviceLock.Unlock() + time.Sleep(registrationRetryInterval) + c.serviceLock.Lock() + + if !c.running { + return err + } + } + } + return nil } func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { c.serviceLock.Lock() defer c.serviceLock.Unlock() - c.sealed = sealed + c.unsealed = !sealed // Vault is still bootstrapping if c.service == nil { @@ -258,9 +282,9 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err err EnableTagOverride: false, } - checkStatus := "failing" - if !c.sealed { - checkStatus = "passing" + checkStatus := api.HealthCritical + if c.unsealed { + checkStatus = api.HealthPassing } c.sealedCheck = &api.AgentCheckRegistration{ @@ -333,7 +357,7 @@ func (c *ConsulBackend) runCheck() { // Run a TTL check agent := c.client.Agent() - if !c.sealed { + if c.unsealed { agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing) } else { agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical) diff --git a/vault/core.go b/vault/core.go index 4590ae9d39d6..4c209e06651f 100644 --- a/vault/core.go +++ b/vault/core.go @@ -946,7 +946,7 @@ func (c *Core) Unseal(key []byte) (bool, error) { sd, ok := c.ha.(physical.ServiceDiscovery) if ok { go func() { - if err := sd.AdvertiseSealed(c.sealed); err != nil { + if err := sd.AdvertiseSealed(false); err != nil { c.logger.Printf("[WARN] core: failed to advertise unsealed status: %v", err) } }() @@ -1101,7 +1101,7 @@ func (c *Core) sealInternal() error { sd, ok := c.ha.(physical.ServiceDiscovery) if ok { go func() { - if err := sd.AdvertiseSealed(c.sealed); err != nil { + if err := sd.AdvertiseSealed(true); err != nil { c.logger.Printf("[WARN] core: failed to advertise sealed status: %v", err) } }() @@ -1224,16 +1224,6 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) { defer close(manualStepDownCh) c.logger.Printf("[INFO] core: entering standby mode") - // Advertise ourselves as a standby - sd, ok := c.ha.(physical.ServiceDiscovery) - if ok { - go func() { - if err := sd.AdvertiseActive(false); err != nil { - c.logger.Printf("[WARN] core: failed to advertise standby status: %v", err) - } - }() - } - // Monitor for key rotation keyRotateDone := make(chan struct{}) keyRotateStop := make(chan struct{}) @@ -1465,7 +1455,19 @@ func (c *Core) cleanLeaderPrefix(uuid string, leaderLostCh <-chan struct{}) { // clearLeader is used to clear our leadership entry func (c *Core) clearLeader(uuid string) error { key := coreLeaderPrefix + uuid - return c.barrier.Delete(key) + err := c.barrier.Delete(key) + + // Advertise ourselves as a standby + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseActive(false); err != nil { + c.logger.Printf("[WARN] core: failed to advertise standby status: %v", err) + } + }() + } + + return err } // emitMetrics is used to periodically expose metrics while runnig From 85ca7b32ca283a5bced0befab23fc1af1a368480 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Sun, 24 Apr 2016 07:15:20 -0700 Subject: [PATCH 17/21] Update tests to chase sealed -> unsealed transition --- physical/consul_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/physical/consul_test.go b/physical/consul_test.go index a25bd1129f7b..333c38ca8165 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -71,7 +71,7 @@ func testConsul_testConsulBackend(t *testing.T) { t.Fatalf("bad") } - if c.sealed != false { + if c.unsealed != false { t.Fatalf("bad") } @@ -315,42 +315,42 @@ func TestConsul_AdvertiseSealed(t *testing.T) { c := testConsulBackend(t) - if c.sealed != false { + if c.unsealed == true { t.Fatalf("bad") } if err := c.AdvertiseSealed(true); err != nil { t.Fatalf("bad: %v", err) } - if c.sealed != true { + if c.unsealed == true { t.Fatalf("bad") } if err := c.AdvertiseSealed(true); err != nil { t.Fatalf("bad: %v", err) } - if c.sealed != true { + if c.unsealed == true { t.Fatalf("bad") } if err := c.AdvertiseSealed(false); err != nil { t.Fatalf("bad: %v", err) } - if c.sealed != false { + if c.unsealed == false { t.Fatalf("bad") } if err := c.AdvertiseSealed(false); err != nil { t.Fatalf("bad: %v", err) } - if c.sealed != false { + if c.unsealed == false { t.Fatalf("bad") } if err := c.AdvertiseSealed(true); err != nil { t.Fatalf("bad: %v", err) } - if c.sealed != true { + if c.unsealed == true { t.Fatalf("bad") } } From 9647f2e0673bd96a42ad37fee2afd1e52c36a0ff Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 25 Apr 2016 13:46:28 -0700 Subject: [PATCH 18/21] Collapse UpdateAdvertiseAddr() into RunServiceDiscovery() --- command/server.go | 7 +----- physical/consul.go | 52 +++++++++++++++++++++-------------------- physical/consul_test.go | 42 +++++++++++++++++++++------------ physical/physical.go | 7 +----- 4 files changed, 56 insertions(+), 52 deletions(-) diff --git a/command/server.go b/command/server.go index ae26b534d100..bd0521f0f3fe 100644 --- a/command/server.go +++ b/command/server.go @@ -290,12 +290,7 @@ func (c *ServerCommand) Run(args []string) int { if coreConfig.HAPhysical != nil { sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) if ok { - if err := sd.UpdateAdvertiseAddr(coreConfig.AdvertiseAddr); err != nil { - c.Ui.Error(fmt.Sprintf("Error configuring service discovery: %v", err)) - return 1 - } - - if err := sd.RunServiceDiscovery(c.ShutdownCh); err != nil { + if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) return 1 } diff --git a/physical/consul.go b/physical/consul.go index 8851bac6e8ec..aa67e6e2486d 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -173,23 +173,6 @@ func newConsulBackend(conf map[string]string) (Backend, error) { return c, nil } -// UpdateAdvertiseAddr provides a pre-initialization hook for updating -// Consul's advertise address. -func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error { - if c.running { - return fmt.Errorf("service registration unable to update advertise address, backend already running") - } - - host, port, err := parseAdvertiseAddr(addr) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise address "%v": {{err}}`, addr), err) - } - - c.advertiseHost = host - c.advertisePort = int(port) - return nil -} - // serviceTags returns all of the relevant tags for Consul. func serviceTags(active bool) []string { activeTag := "standby" @@ -218,13 +201,13 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error { defer atomic.CompareAndSwapInt64(&c.registrationLock, 1, 0) // Retry agent registration until successful - registration_complete: for { c.service.Tags = serviceTags(c.active) agent := c.client.Agent() err := agent.ServiceRegister(c.service) if err == nil { - break registration_complete + // Success + return nil } // wtb logger c.logger.Printf("[WARN] service registration failed: %v", err) @@ -233,11 +216,13 @@ func (c *ConsulBackend) AdvertiseActive(active bool) error { c.serviceLock.Lock() if !c.running { + // Shutting down return err } } } + // Successful concurrent update to active state return nil } @@ -259,7 +244,15 @@ func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { return nil } -func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) { +func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { + c.advertiseHost, c.advertisePort, err = c.parseAdvertiseAddr(addr) + if err != nil { + return err + } + return nil +} + +func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) (err error) { c.serviceLock.Lock() defer c.serviceLock.Unlock() @@ -267,8 +260,8 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err err return nil } - if c.running { - return fmt.Errorf("service registration routine already running") + if err := c.setAdvertiseAddr(advertiseAddr); err != nil { + return err } serviceID := c.serviceID() @@ -376,7 +369,7 @@ func (c *ConsulBackend) serviceID() string { return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) } -func parseAdvertiseAddr(addr string) (host string, port int, err error) { +func (c *ConsulBackend) parseAdvertiseAddr(addr string) (host string, port int, err error) { if addr == "" { return "", -1, fmt.Errorf("advertise address must not be empty") } @@ -389,10 +382,19 @@ func parseAdvertiseAddr(addr string) (host string, port int, err error) { var portStr string host, portStr, err = net.SplitHostPort(url.Host) if err != nil { - return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + if url.Scheme == "http" { + portStr = "80" + } else if url.Scheme == "https" { + portStr = "443" + } else if url.Scheme == "unix" { + portStr = "0" + host = url.Path + } else { + return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } } portNum, err := strconv.ParseInt(portStr, 10, 0) - if err != nil || portNum < 1 || portNum > 65535 { + if err != nil || portNum < 0 || portNum > 65535 { return "", -4, errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) } diff --git a/physical/consul_test.go b/physical/consul_test.go index 333c38ca8165..364c238bbd0c 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -27,7 +27,6 @@ func testConsulBackend(t *testing.T) *ConsulBackend { } func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { - const serviceID = "vaultTestService" be, err := newConsulBackend(*conf) if err != nil { t.Fatalf("Expected Consul to initialize: %v", err) @@ -38,8 +37,10 @@ func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { t.Fatalf("Expected ConsulBackend") } + c.consulClientConf = api.DefaultConfig() + c.service = &api.AgentServiceRegistration{ - ID: serviceID, + ID: c.serviceID(), Name: c.serviceName, Tags: serviceTags(c.active), Port: 8200, @@ -51,7 +52,7 @@ func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { ID: c.checkID(), Name: "Vault Sealed Status", Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", - ServiceID: serviceID, + ServiceID: c.serviceID(), AgentServiceCheck: api.AgentServiceCheck{ TTL: c.checkTimeout.String(), Status: api.HealthPassing, @@ -159,7 +160,10 @@ func TestConsul_newConsulBackend(t *testing.T) { if !ok { t.Fatalf("Expected ConsulBackend: %s", test.name) } - if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { + c.disableRegistration = true + + var shutdownCh ShutdownChannel + if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr); err != nil { t.Fatalf("bad: %v", err) } @@ -217,7 +221,7 @@ func TestConsul_serviceTags(t *testing.T) { } } -func TestConsul_UpdateAdvertiseAddr(t *testing.T) { +func TestConsul_parseAdvertiseAddr(t *testing.T) { tests := []struct { addr string host string @@ -236,6 +240,18 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) { port: 8200, pass: true, }, + { + addr: "https://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "unix:///tmp/.vault.addr.sock", + host: "/tmp/.vault.addr.sock", + port: 0, + pass: true, + }, { addr: "127.0.0.1:8200", pass: false, @@ -247,11 +263,7 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) { } for _, test := range tests { c := testConsulBackend(t) - if c == nil { - t.Fatalf("bad") - } - - err := c.UpdateAdvertiseAddr(test.addr) + host, port, err := c.parseAdvertiseAddr(test.addr) if test.pass { if err != nil { t.Fatalf("bad: %v", err) @@ -264,12 +276,12 @@ func TestConsul_UpdateAdvertiseAddr(t *testing.T) { } } - if c.advertiseHost != test.host { - t.Fatalf("bad: %v != %v", c.advertiseHost, test.host) + if host != test.host { + t.Fatalf("bad: %v != %v", host, test.host) } - if c.advertisePort != test.port { - t.Fatalf("bad: %v != %v", c.advertisePort, test.port) + if port != test.port { + t.Fatalf("bad: %v != %v", port, test.port) } } } @@ -394,7 +406,7 @@ func TestConsul_serviceID(t *testing.T) { "service": test.serviceName, }) - if err := c.UpdateAdvertiseAddr(test.advertiseAddr); err != nil { + if err := c.setAdvertiseAddr(test.advertiseAddr); err != nil { t.Fatalf("bad: %s %v", test.name, err) } diff --git a/physical/physical.go b/physical/physical.go index c2242c067c71..420a218fc294 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -59,12 +59,7 @@ type ServiceDiscovery interface { // Run executes any background service discovery tasks until the // shutdown channel is closed. - RunServiceDiscovery(ShutdownChannel) error - - // UpdateAdvertiseAddr allows for a non-Running backend to update the - // advertise address. HABackends may want to present a different - // address that wasn't available when a Backend was created. - UpdateAdvertiseAddr(addr string) error + RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) error } type Lock interface { From 341abcae3ac21d8ce3e2ce6741e64494e93ef055 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 25 Apr 2016 14:33:24 -0700 Subject: [PATCH 19/21] Change to the pre-0.6.4 Consul Check API Consul is never going to pass in more than 1K of output. This mitigates the pre-0.6.4 concern. --- physical/consul.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index aa67e6e2486d..6835b500fd37 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -351,9 +351,9 @@ func (c *ConsulBackend) runCheck() { // Run a TTL check agent := c.client.Agent() if c.unsealed { - agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing) + agent.PassTTL(c.checkID(), "Vault Unsealed") } else { - agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical) + agent.FailTTL(c.checkID(), "Vault Sealed") } } From 00d1e5abd739b1ec6680fec615f77ee798da71bd Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 25 Apr 2016 14:33:24 -0700 Subject: [PATCH 20/21] Change to the pre-0.6.4 Consul Check API Consul is never going to pass in more than 1K of output. This mitigates the pre-0.6.4 concern. --- website/source/docs/config/index.html.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index a4ba86bed4ef..7b7f0b2cfb6c 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -204,9 +204,7 @@ For Consul, the following options are supported: information to Consul. Defaults to "5s". * `disable_registration` (optional) - If true, then Vault will not register - itself with Vault. If the Consul Agent for Vault and the Consul Servers - are older than `0.6.4` it is required to set this to "true" due to API - incompatibilities. Defaults to "false". + itself with Vault. Defaults to "false". * `service` (optional) - The name of the service to register with Consul. Defaults to "vault". From 4db16355ec24bd6132e5468b139ad6b1fbf3a3c1 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 25 Apr 2016 18:05:50 -0700 Subject: [PATCH 21/21] Rewriting history before it gets away from me --- physical/consul.go | 34 +++++++++++++--------------------- physical/consul_test.go | 16 ++++++++-------- 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/physical/consul.go b/physical/consul.go index 6835b500fd37..949db16178d2 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -58,7 +58,7 @@ type ConsulBackend struct { sealedCheck *api.AgentCheckRegistration registrationLock int64 advertiseHost string - advertisePort int + advertisePort int64 consulClientConf *api.Config serviceName string running bool @@ -244,14 +244,6 @@ func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { return nil } -func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { - c.advertiseHost, c.advertisePort, err = c.parseAdvertiseAddr(addr) - if err != nil { - return err - } - return nil -} - func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) (err error) { c.serviceLock.Lock() defer c.serviceLock.Unlock() @@ -270,7 +262,7 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertis ID: serviceID, Name: c.serviceName, Tags: serviceTags(c.active), - Port: c.advertisePort, + Port: int(c.advertisePort), Address: c.advertiseHost, EnableTagOverride: false, } @@ -369,36 +361,36 @@ func (c *ConsulBackend) serviceID() string { return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) } -func (c *ConsulBackend) parseAdvertiseAddr(addr string) (host string, port int, err error) { +func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { if addr == "" { - return "", -1, fmt.Errorf("advertise address must not be empty") + return fmt.Errorf("advertise address must not be empty") } url, err := url.Parse(addr) if err != nil { - return "", -2, errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) + return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) } var portStr string - host, portStr, err = net.SplitHostPort(url.Host) + c.advertiseHost, portStr, err = net.SplitHostPort(url.Host) if err != nil { if url.Scheme == "http" { portStr = "80" } else if url.Scheme == "https" { portStr = "443" } else if url.Scheme == "unix" { - portStr = "0" - host = url.Path + portStr = "-1" + c.advertiseHost = url.Path } else { - return "", -3, errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) } } - portNum, err := strconv.ParseInt(portStr, 10, 0) - if err != nil || portNum < 0 || portNum > 65535 { - return "", -4, errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) + c.advertisePort, err = strconv.ParseInt(portStr, 10, 0) + if err != nil || c.advertisePort < -1 || c.advertisePort > 65535 { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) } - return host, int(portNum), nil + return nil } func setupTLSConfig(conf map[string]string) (*tls.Config, error) { diff --git a/physical/consul_test.go b/physical/consul_test.go index 364c238bbd0c..33e1821aee5f 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -221,11 +221,11 @@ func TestConsul_serviceTags(t *testing.T) { } } -func TestConsul_parseAdvertiseAddr(t *testing.T) { +func TestConsul_setAdvertiseAddr(t *testing.T) { tests := []struct { addr string host string - port int + port int64 pass bool }{ { @@ -249,7 +249,7 @@ func TestConsul_parseAdvertiseAddr(t *testing.T) { { addr: "unix:///tmp/.vault.addr.sock", host: "/tmp/.vault.addr.sock", - port: 0, + port: -1, pass: true, }, { @@ -263,7 +263,7 @@ func TestConsul_parseAdvertiseAddr(t *testing.T) { } for _, test := range tests { c := testConsulBackend(t) - host, port, err := c.parseAdvertiseAddr(test.addr) + err := c.setAdvertiseAddr(test.addr) if test.pass { if err != nil { t.Fatalf("bad: %v", err) @@ -276,12 +276,12 @@ func TestConsul_parseAdvertiseAddr(t *testing.T) { } } - if host != test.host { - t.Fatalf("bad: %v != %v", host, test.host) + if c.advertiseHost != test.host { + t.Fatalf("bad: %v != %v", c.advertiseHost, test.host) } - if port != test.port { - t.Fatalf("bad: %v != %v", port, test.port) + if c.advertisePort != test.port { + t.Fatalf("bad: %v != %v", c.advertisePort, test.port) } } }