diff --git a/command/config.go b/command/config.go index ffe7903b9d63..ee866faade0b 100644 --- a/command/config.go +++ b/command/config.go @@ -53,9 +53,10 @@ func LoadConfig(path string) (*DefaultConfig, error) { path = v } + // NOTE: requires HOME env var to be set path, err := homedir.Expand(path) if err != nil { - return nil, fmt.Errorf("Error expanding config path: %s", err) + return nil, fmt.Errorf("Error expanding config path %s: %s", path, err) } contents, err := ioutil.ReadFile(path) diff --git a/command/server.go b/command/server.go index 602981a7fc24..bd0521f0f3fe 100644 --- a/command/server.go +++ b/command/server.go @@ -205,7 +205,7 @@ func (c *ServerCommand) Run(args []string) int { coreConfig.AdvertiseAddr = envAA } - // Attempt to detect the advertise address possible + // Attempt to detect the advertise address, if possible var detect physical.AdvertiseDetect if coreConfig.HAPhysical != nil { detect, ok = coreConfig.HAPhysical.(physical.AdvertiseDetect) @@ -286,6 +286,17 @@ func (c *ServerCommand) Run(args []string) int { } } + // If the backend supports service discovery, run service discovery + if coreConfig.HAPhysical != nil { + sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) + if ok { + if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) + return 1 + } + } + } + // Initialize the listeners lns := make([]net.Listener, 0, len(config.Listeners)) for i, lnConfig := range config.Listeners { diff --git a/command/server/config.go b/command/server/config.go index 450f640f8be4..1945040f4585 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -365,7 +365,7 @@ func parseBackends(result *Config, list *ast.ObjectList) error { return multierror.Prefix(err, fmt.Sprintf("backend.%s:", key)) } - // Pull out the advertise address since it's commong to all backends + // Pull out the advertise address since it's common to all backends var advertiseAddr string if v, ok := m["advertise_addr"]; ok { advertiseAddr = v @@ -398,7 +398,7 @@ func parseHABackends(result *Config, list *ast.ObjectList) error { return multierror.Prefix(err, fmt.Sprintf("ha_backend.%s:", key)) } - // Pull out the advertise address since it's commong to all backends + // Pull out the advertise address since it's common to all backends var advertiseAddr string if v, ok := m["advertise_addr"]; ok { advertiseAddr = v diff --git a/command/server_test.go b/command/server_test.go index 5c7a58dc9eb9..5834750882ab 100644 --- a/command/server_test.go +++ b/command/server_test.go @@ -32,13 +32,15 @@ listener "tcp" { consulhcl = ` backend "consul" { prefix = "foo/" - advertise_addr = "http://127.0.0.1:8200" + advertise_addr = "http://127.0.0.1:8200" + disable_registration = "true" } ` haconsulhcl = ` ha_backend "consul" { prefix = "bar/" - advertise_addr = "http://127.0.0.1:8200" + advertise_addr = "http://127.0.0.1:8200" + disable_registration = "true" } ` diff --git a/physical/consul.go b/physical/consul.go index 00a59af3b598..949db16178d2 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -3,8 +3,12 @@ package physical import ( "fmt" "io/ioutil" + "net" + "net/url" "strconv" "strings" + "sync" + "sync/atomic" "time" "crypto/tls" @@ -12,18 +16,57 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/errwrap" "github.com/hashicorp/go-cleanhttp" ) +const ( + // checkJitterFactor specifies the jitter factor used to stagger checks + checkJitterFactor = 16 + + // checkMinBuffer specifies provides a guarantee that a check will not + // be executed too close to the TTL check timeout + checkMinBuffer = 100 * time.Millisecond + + // defaultCheckTimeout changes the timeout of TTL checks + defaultCheckTimeout = 5 * time.Second + + // defaultCheckInterval specifies the default interval used to send + // checks + defaultCheckInterval = 4 * time.Second + + // defaultServiceName is the default Consul service name used when + // advertising a Vault instance. + defaultServiceName = "vault" + + // registrationRetryInterval specifies the retry duration to use when + // a registration to the Consul agent fails. + registrationRetryInterval = 1 * time.Second +) + // ConsulBackend is a physical backend that stores data at specific // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ConsulBackend struct { - path string - client *api.Client - kv *api.KV - permitPool *PermitPool + path string + client *api.Client + kv *api.KV + permitPool *PermitPool + serviceLock sync.RWMutex + service *api.AgentServiceRegistration + sealedCheck *api.AgentCheckRegistration + registrationLock int64 + advertiseHost string + advertisePort int64 + consulClientConf *api.Config + serviceName string + running bool + active bool + unsealed bool + disableRegistration bool + checkTimeout time.Duration + checkTimer *time.Timer } // newConsulBackend constructs a Consul backend using the given API client @@ -43,6 +86,39 @@ func newConsulBackend(conf map[string]string) (Backend, error) { path = strings.TrimPrefix(path, "/") } + // Allow admins to disable consul integration + disableReg, ok := conf["disable_registration"] + var disableRegistration bool + if ok && disableReg != "" { + b, err := strconv.ParseBool(disableReg) + if err != nil { + return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err) + } + disableRegistration = b + } + + // Get the service name to advertise in Consul + service, ok := conf["service"] + if !ok { + service = defaultServiceName + } + + checkTimeout := defaultCheckTimeout + checkTimeoutStr, ok := conf["check_timeout"] + if ok { + d, err := time.ParseDuration(checkTimeoutStr) + if err != nil { + return nil, err + } + + min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) + if min < checkMinBuffer { + return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min) + } + + checkTimeout = d + } + // Configure the client consulConf := api.DefaultConfig() @@ -84,14 +160,239 @@ func newConsulBackend(conf map[string]string) (Backend, error) { // Setup the backend c := &ConsulBackend{ - path: path, - client: client, - kv: client.KV(), - permitPool: NewPermitPool(maxParInt), + path: path, + client: client, + kv: client.KV(), + permitPool: NewPermitPool(maxParInt), + consulClientConf: consulConf, + serviceName: service, + checkTimeout: checkTimeout, + checkTimer: time.NewTimer(checkTimeout), + disableRegistration: disableRegistration, } return c, nil } +// serviceTags returns all of the relevant tags for Consul. +func serviceTags(active bool) []string { + activeTag := "standby" + if active { + activeTag = "active" + } + return []string{activeTag} +} + +func (c *ConsulBackend) AdvertiseActive(active bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + // Save a cached copy of the active state: no way to query Core + c.active = active + + // Ensure serial registration to the Consul agent. Allow for + // concurrent calls to update active status while a single task + // attempts, until successful, to update the Consul Agent. + if !c.disableRegistration && atomic.CompareAndSwapInt64(&c.registrationLock, 0, 1) { + defer atomic.CompareAndSwapInt64(&c.registrationLock, 1, 0) + + // Retry agent registration until successful + for { + c.service.Tags = serviceTags(c.active) + agent := c.client.Agent() + err := agent.ServiceRegister(c.service) + if err == nil { + // Success + return nil + } + + // wtb logger c.logger.Printf("[WARN] service registration failed: %v", err) + c.serviceLock.Unlock() + time.Sleep(registrationRetryInterval) + c.serviceLock.Lock() + + if !c.running { + // Shutting down + return err + } + } + } + + // Successful concurrent update to active state + return nil +} + +func (c *ConsulBackend) AdvertiseSealed(sealed bool) error { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.unsealed = !sealed + + // Vault is still bootstrapping + if c.service == nil { + return nil + } + + if !c.disableRegistration { + // Push a TTL check immediately to update the state + c.runCheck() + } + + return nil +} + +func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) (err error) { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + if c.disableRegistration { + return nil + } + + if err := c.setAdvertiseAddr(advertiseAddr); err != nil { + return err + } + + serviceID := c.serviceID() + + c.service = &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: int(c.advertisePort), + Address: c.advertiseHost, + EnableTagOverride: false, + } + + checkStatus := api.HealthCritical + if c.unsealed { + checkStatus = api.HealthPassing + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: checkStatus, + }, + } + + agent := c.client.Agent() + if err := agent.ServiceRegister(c.service); err != nil { + return errwrap.Wrapf("service registration failed: {{err}}", err) + } + + if err := agent.CheckRegister(c.sealedCheck); err != nil { + return errwrap.Wrapf("service registration check registration failed: {{err}}", err) + } + + go c.checkRunner(shutdownCh) + c.running = true + + // Deregister upon shutdown + go func() { + shutdown: + for { + select { + case <-shutdownCh: + // wtb logger: log.Printf("[DEBUG]: Shutting down consul backend") + break shutdown + } + } + + if err := agent.ServiceDeregister(serviceID); err != nil { + // wtb logger: log.Printf("[WARNING]: service deregistration failed: {{err}}", err) + } + c.running = false + }() + + return nil +} + +// checkRunner periodically runs TTL checks +func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) { + defer c.checkTimer.Stop() + + for { + select { + case <-c.checkTimer.C: + go func() { + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + c.runCheck() + }() + case <-shutdownCh: + return + } + } +} + +// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held +// exclusively. +func (c *ConsulBackend) runCheck() { + // Reset timer before calling run check in order to not slide the + // window of the next check. + c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)) + + // Run a TTL check + agent := c.client.Agent() + if c.unsealed { + agent.PassTTL(c.checkID(), "Vault Unsealed") + } else { + agent.FailTTL(c.checkID(), "Vault Sealed") + } +} + +// checkID returns the ID used for a Consul Check. Assume at least a read +// lock is held. +func (c *ConsulBackend) checkID() string { + return "vault-sealed-check" +} + +// serviceID returns the Vault ServiceID for use in Consul. Assume at least +// a read lock is held. +func (c *ConsulBackend) serviceID() string { + return fmt.Sprintf("%s:%s:%d", c.serviceName, c.advertiseHost, c.advertisePort) +} + +func (c *ConsulBackend) setAdvertiseAddr(addr string) (err error) { + if addr == "" { + return fmt.Errorf("advertise address must not be empty") + } + + url, err := url.Parse(addr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse advertise URL "%v": {{err}}`, addr), err) + } + + var portStr string + c.advertiseHost, portStr, err = net.SplitHostPort(url.Host) + if err != nil { + if url.Scheme == "http" { + portStr = "80" + } else if url.Scheme == "https" { + portStr = "443" + } else if url.Scheme == "unix" { + portStr = "-1" + c.advertiseHost = url.Path + } else { + return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err) + } + } + c.advertisePort, err = strconv.ParseInt(portStr, 10, 0) + if err != nil || c.advertisePort < -1 || c.advertisePort > 65535 { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) + } + + return nil +} + func setupTLSConfig(conf map[string]string) (*tls.Config, error) { serverName := strings.Split(conf["address"], ":") @@ -234,7 +535,10 @@ func (c *ConsulBackend) DetectHostAddr() (string, error) { if err != nil { return "", err } - addr := self["Member"]["Addr"].(string) + addr, ok := self["Member"]["Addr"].(string) + if !ok { + return "", fmt.Errorf("Unable to convert an address to string") + } return addr, nil } diff --git a/physical/consul_test.go b/physical/consul_test.go index b4d5d6bb9d00..33e1821aee5f 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -3,16 +3,424 @@ package physical import ( "fmt" "os" + "reflect" "testing" "time" "github.com/hashicorp/consul/api" ) +type consulConf map[string]string + +var ( + addrCount int = 0 +) + +func testHostIP() string { + a := addrCount + addrCount++ + return fmt.Sprintf("127.0.0.%d", a) +} + +func testConsulBackend(t *testing.T) *ConsulBackend { + return testConsulBackendConfig(t, &consulConf{}) +} + +func testConsulBackendConfig(t *testing.T, conf *consulConf) *ConsulBackend { + be, err := newConsulBackend(*conf) + if err != nil { + t.Fatalf("Expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend") + } + + c.consulClientConf = api.DefaultConfig() + + c.service = &api.AgentServiceRegistration{ + ID: c.serviceID(), + Name: c.serviceName, + Tags: serviceTags(c.active), + Port: 8200, + Address: testHostIP(), + EnableTagOverride: false, + } + + c.sealedCheck = &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: c.serviceID(), + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: api.HealthPassing, + }, + } + + return c +} + +func testConsul_testConsulBackend(t *testing.T) { + c := testConsulBackend(t) + if c == nil { + t.Fatalf("bad") + } + + if c.active != false { + t.Fatalf("bad") + } + + if c.unsealed != false { + t.Fatalf("bad") + } + + if c.service == nil { + t.Fatalf("bad") + } + + if c.sealedCheck == nil { + t.Fatalf("bad") + } +} + +func TestConsul_newConsulBackend(t *testing.T) { + tests := []struct { + name string + consulConfig map[string]string + fail bool + advertiseAddr string + checkTimeout time.Duration + path string + service string + address string + scheme string + token string + max_parallel int + disableReg bool + }{ + { + name: "Valid default config", + consulConfig: map[string]string{}, + checkTimeout: 5 * time.Second, + advertiseAddr: "http://127.0.0.1:8200", + path: "vault/", + service: "vault", + address: "127.0.0.1:8500", + scheme: "http", + token: "", + max_parallel: 4, + disableReg: false, + }, + { + name: "Valid modified config", + consulConfig: map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "advertiseAddr": "http://127.0.0.2:8200", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + "disable_registration": "false", + }, + checkTimeout: 6 * time.Second, + path: "seaTech/", + service: "astronomy", + advertiseAddr: "http://127.0.0.2:8200", + address: "127.0.0.2", + scheme: "https", + token: "deadbeef-cafeefac-deadc0de-feedface", + max_parallel: 4, + }, + { + name: "check timeout too short", + fail: true, + consulConfig: map[string]string{ + "check_timeout": "99ms", + }, + }, + } + + for _, test := range tests { + be, err := newConsulBackend(test.consulConfig) + if test.fail { + if err == nil { + t.Fatalf(`Expected config "%s" to fail`, test.name) + } else { + continue + } + } else if !test.fail && err != nil { + t.Fatalf("Expected config %s to not fail: %v", test.name, err) + } + + c, ok := be.(*ConsulBackend) + if !ok { + t.Fatalf("Expected ConsulBackend: %s", test.name) + } + c.disableRegistration = true + + var shutdownCh ShutdownChannel + if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr); err != nil { + t.Fatalf("bad: %v", err) + } + + if test.checkTimeout != c.checkTimeout { + t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout) + } + + if test.path != c.path { + t.Errorf("bad: %s %v != %v", test.name, test.path, c.path) + } + + if test.service != c.serviceName { + t.Errorf("bad: %v != %v", test.service, c.serviceName) + } + + if test.address != c.consulClientConf.Address { + t.Errorf("bad: %s %v != %v", test.name, test.address, c.consulClientConf.Address) + } + + if test.scheme != c.consulClientConf.Scheme { + t.Errorf("bad: %v != %v", test.scheme, c.consulClientConf.Scheme) + } + + if test.token != c.consulClientConf.Token { + t.Errorf("bad: %v != %v", test.token, c.consulClientConf.Token) + } + + // FIXME(sean@): Unable to test max_parallel + // if test.max_parallel != cap(c.permitPool) { + // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) + // } + } +} + +func TestConsul_serviceTags(t *testing.T) { + tests := []struct { + active bool + tags []string + }{ + { + active: true, + tags: []string{"active"}, + }, + { + active: false, + tags: []string{"standby"}, + }, + } + + for _, test := range tests { + tags := serviceTags(test.active) + if !reflect.DeepEqual(tags[:], test.tags[:]) { + t.Errorf("Bad %v: %v %v", test.active, tags, test.tags) + } + } +} + +func TestConsul_setAdvertiseAddr(t *testing.T) { + tests := []struct { + addr string + host string + port int64 + pass bool + }{ + { + addr: "http://127.0.0.1:8200/", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "http://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "https://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "unix:///tmp/.vault.addr.sock", + host: "/tmp/.vault.addr.sock", + port: -1, + pass: true, + }, + { + addr: "127.0.0.1:8200", + pass: false, + }, + { + addr: "127.0.0.1", + pass: false, + }, + } + for _, test := range tests { + c := testConsulBackend(t) + err := c.setAdvertiseAddr(test.addr) + if test.pass { + if err != nil { + t.Fatalf("bad: %v", err) + } + } else { + if err == nil { + t.Fatalf("bad, expected fail") + } else { + continue + } + } + + if c.advertiseHost != test.host { + t.Fatalf("bad: %v != %v", c.advertiseHost, test.host) + } + + if c.advertisePort != test.port { + t.Fatalf("bad: %v != %v", c.advertisePort, test.port) + } + } +} + +func TestConsul_AdvertiseActive(t *testing.T) { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + t.Skipf("No consul process running, skipping test") + } + + c := testConsulBackend(t) + + if c.active != false { + t.Fatalf("bad") + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(false); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := c.AdvertiseActive(true); err != nil { + t.Fatalf("bad: %v", err) + } +} + +func TestConsul_AdvertiseSealed(t *testing.T) { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + t.Skipf("No consul process running, skipping test") + } + + c := testConsulBackend(t) + + if c.unsealed == true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.unsealed == true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.unsealed == true { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.unsealed == false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(false); err != nil { + t.Fatalf("bad: %v", err) + } + if c.unsealed == false { + t.Fatalf("bad") + } + + if err := c.AdvertiseSealed(true); err != nil { + t.Fatalf("bad: %v", err) + } + if c.unsealed == true { + t.Fatalf("bad") + } +} + +func TestConsul_checkID(t *testing.T) { + c := testConsulBackend(t) + if c.checkID() != "vault-sealed-check" { + t.Errorf("bad") + } +} + +func TestConsul_serviceID(t *testing.T) { + passingTests := []struct { + name string + advertiseAddr string + serviceName string + expected string + }{ + { + name: "valid host w/o slash", + advertiseAddr: "http://127.0.0.1:8200", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + name: "valid host w/ slash", + advertiseAddr: "http://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + { + name: "valid https host w/ slash", + advertiseAddr: "https://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + }, + } + + for _, test := range passingTests { + c := testConsulBackendConfig(t, &consulConf{ + "service": test.serviceName, + }) + + if err := c.setAdvertiseAddr(test.advertiseAddr); err != nil { + t.Fatalf("bad: %s %v", test.name, err) + } + + serviceID := c.serviceID() + if serviceID != test.expected { + t.Fatalf("bad: %v != %v", serviceID, test.expected) + } + } +} + func TestConsulBackend(t *testing.T) { - addr := os.Getenv("CONSUL_ADDR") + addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { - t.SkipNow() + t.Skipf("No consul process running, skipping test") } conf := api.DefaultConfig() @@ -41,9 +449,9 @@ func TestConsulBackend(t *testing.T) { } func TestConsulHABackend(t *testing.T) { - addr := os.Getenv("CONSUL_ADDR") + addr := os.Getenv("CONSUL_HTTP_ADDR") if addr == "" { - t.SkipNow() + t.Skipf("No consul process running, skipping test") } conf := api.DefaultConfig() diff --git a/physical/physical.go b/physical/physical.go index 8f28f00a7cfc..420a218fc294 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -4,6 +4,9 @@ import "fmt" const DefaultParallelOperations = 128 +// ShutdownSignal +type ShutdownChannel chan struct{} + // Backend is the interface required for a physical // backend. A physical backend is used to durably store // data outside of Vault. As such, it is completely untrusted, @@ -42,6 +45,23 @@ type AdvertiseDetect interface { DetectHostAddr() (string, error) } +// ServiceDiscovery is an optional interface that an HABackend can implement. +// If they do, the state of a backend is advertised to the service discovery +// network. +type ServiceDiscovery interface { + // AdvertiseActive is used to reflect whether or not a backend is in + // an active or standby state. + AdvertiseActive(bool) error + + // AdvertiseSealed is used to reflect whether or not a backend is in + // a sealed state or not. + AdvertiseSealed(bool) error + + // Run executes any background service discovery tasks until the + // shutdown channel is closed. + RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string) error +} + type Lock interface { // Lock is used to acquire the given lock // The stopCh is optional and if closed should interrupt the lock @@ -66,9 +86,9 @@ type Entry struct { type Factory func(map[string]string) (Backend, error) // NewBackend returns a new backend with the given type and configuration. -// The backend is looked up in the BuiltinBackends variable. +// The backend is looked up in the builtinBackends variable. func NewBackend(t string, conf map[string]string) (Backend, error) { - f, ok := BuiltinBackends[t] + f, ok := builtinBackends[t] if !ok { return nil, fmt.Errorf("unknown physical backend type: %s", t) } @@ -77,7 +97,7 @@ func NewBackend(t string, conf map[string]string) (Backend, error) { // BuiltinBackends is the list of built-in physical backends that can // be used with NewBackend. -var BuiltinBackends = map[string]Factory{ +var builtinBackends = map[string]Factory{ "inmem": func(map[string]string) (Backend, error) { return NewInmem(), nil }, diff --git a/physical/postgresql.go b/physical/postgresql.go index 94d77294399b..e753e007d651 100644 --- a/physical/postgresql.go +++ b/physical/postgresql.go @@ -69,8 +69,8 @@ func newPostgreSQLBackend(conf map[string]string) (Backend, error) { "put": put_statement, "get": "SELECT value FROM " + quoted_table + " WHERE path = $1 AND key = $2", "delete": "DELETE FROM " + quoted_table + " WHERE path = $1 AND key = $2", - "list": "SELECT key FROM " + quoted_table + " WHERE path = $1" + - "UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1", + "list": "SELECT key FROM " + quoted_table + " WHERE path = $1" + + "UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1", } for name, query := range statements { if err := m.prepare(name, query); err != nil { @@ -97,18 +97,18 @@ func (m *PostgreSQLBackend) splitKey(fullPath string) (string, string, string) { var path string pieces := strings.Split(fullPath, "/") - depth := len(pieces) - key := pieces[depth-1] - + depth := len(pieces) + key := pieces[depth-1] + if depth == 1 { parentPath = "" - path = "/" + path = "/" } else if depth == 2 { parentPath = "/" - path = "/" + pieces[0] + "/" + path = "/" + pieces[0] + "/" } else { parentPath = "/" + strings.Join(pieces[:depth-2], "/") + "/" - path = "/" + strings.Join(pieces[:depth-1], "/") + "/" + path = "/" + strings.Join(pieces[:depth-1], "/") + "/" } return parentPath, path, key diff --git a/vault/core.go b/vault/core.go index 873ad4f7581d..4c209e06651f 100644 --- a/vault/core.go +++ b/vault/core.go @@ -942,6 +942,16 @@ func (c *Core) Unseal(key []byte) (bool, error) { // Success! c.sealed = false + if c.ha != nil { + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseSealed(false); err != nil { + c.logger.Printf("[WARN] core: failed to advertise unsealed status: %v", err) + } + }() + } + } return true, nil } @@ -1087,6 +1097,17 @@ func (c *Core) sealInternal() error { } c.logger.Printf("[INFO] core: vault is sealed") + if c.ha != nil { + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseSealed(true); err != nil { + c.logger.Printf("[WARN] core: failed to advertise sealed status: %v", err) + } + }() + } + } + return nil } @@ -1396,7 +1417,20 @@ func (c *Core) advertiseLeader(uuid string, leaderLostCh <-chan struct{}) error Key: coreLeaderPrefix + uuid, Value: []byte(c.advertiseAddr), } - return c.barrier.Put(ent) + err := c.barrier.Put(ent) + if err != nil { + return err + } + + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseActive(true); err != nil { + c.logger.Printf("[WARN] core: failed to advertise active status: %v", err) + } + }() + } + return nil } func (c *Core) cleanLeaderPrefix(uuid string, leaderLostCh <-chan struct{}) { @@ -1421,7 +1455,19 @@ func (c *Core) cleanLeaderPrefix(uuid string, leaderLostCh <-chan struct{}) { // clearLeader is used to clear our leadership entry func (c *Core) clearLeader(uuid string) error { key := coreLeaderPrefix + uuid - return c.barrier.Delete(key) + err := c.barrier.Delete(key) + + // Advertise ourselves as a standby + sd, ok := c.ha.(physical.ServiceDiscovery) + if ok { + go func() { + if err := sd.AdvertiseActive(false); err != nil { + c.logger.Printf("[WARN] core: failed to advertise standby status: %v", err) + } + }() + } + + return err } // emitMetrics is used to periodically expose metrics while runnig diff --git a/vendor/github.com/hashicorp/consul/lib/cluster.go b/vendor/github.com/hashicorp/consul/lib/cluster.go new file mode 100644 index 000000000000..a95232c5737b --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/cluster.go @@ -0,0 +1,56 @@ +package lib + +import ( + "math/rand" + "time" +) + +// DurationMinusBuffer returns a duration, minus a buffer and jitter +// subtracted from the duration. This function is used primarily for +// servicing Consul TTL Checks in advance of the TTL. +func DurationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration { + d := intv - buffer + if jitter == 0 { + d -= RandomStagger(d) + } else { + d -= RandomStagger(time.Duration(int64(d) / jitter)) + } + return d +} + +// DurationMinusBufferDomain returns the domain of valid durations from a +// call to DurationMinusBuffer. This function is used to check user +// specified input values to DurationMinusBuffer. +func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) { + max = intv - buffer + if jitter == 0 { + min = max + } else { + min = max - time.Duration(int64(max)/jitter) + } + return min, max +} + +// Returns a random stagger interval between 0 and the duration +func RandomStagger(intv time.Duration) time.Duration { + if intv == 0 { + return 0 + } + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} + +// RateScaledInterval is used to choose an interval to perform an action in +// order to target an aggregate number of actions per second across the whole +// cluster. +func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration { + const minRate = 1 / 86400 // 1/(1 * time.Day) + if rate <= minRate { + return min + } + interval := time.Duration(float64(time.Second) * float64(n) / rate) + if interval < min { + return min + } + + return interval +} diff --git a/vendor/github.com/hashicorp/consul/lib/math.go b/vendor/github.com/hashicorp/consul/lib/math.go new file mode 100644 index 000000000000..1d0b6dc0f6b5 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/math.go @@ -0,0 +1,22 @@ +package lib + +func AbsInt(a int) int { + if a > 0 { + return a + } + return a * -1 +} + +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func MinInt(a, b int) int { + if a > b { + return b + } + return a +} diff --git a/vendor/github.com/hashicorp/consul/lib/rand.go b/vendor/github.com/hashicorp/consul/lib/rand.go new file mode 100644 index 000000000000..48307e63f324 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/rand.go @@ -0,0 +1,18 @@ +package lib + +import ( + "math/rand" + "sync" + "time" +) + +var ( + once sync.Once +) + +// SeedMathRand provides weak, but guaranteed seeding, which is better than +// running with Go's default seed of 1. A call to SeedMathRand() is expected +// to be called via init(), but never a second time. +func SeedMathRand() { + once.Do(func() { rand.Seed(time.Now().UTC().UnixNano()) }) +} diff --git a/vendor/github.com/hashicorp/consul/lib/string.go b/vendor/github.com/hashicorp/consul/lib/string.go new file mode 100644 index 000000000000..0780abb632c2 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/string.go @@ -0,0 +1,11 @@ +package lib + +// StrContains checks if a list contains a string +func StrContains(l []string, s string) bool { + for _, v := range l { + if v == s { + return true + } + } + return false +} diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 52226a9f218b..7b7f0b2cfb6c 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -185,7 +185,7 @@ All backends support the following options: nodes to when A is the active node and B and C are standby nodes. This may be the same address across nodes if using a load balancer or service discovery. Most HA backends will attempt to determine the advertise address - if not provided. This can also be set via the `VAULT_ADVERTISE_ADDR` + if not provided. This can also be overridden via the `VAULT_ADVERTISE_ADDR` environment variable. #### Backend Reference: Consul @@ -200,6 +200,15 @@ For Consul, the following options are supported: * `scheme` (optional) - "http" or "https" for talking to Consul. + * `check_timeout` (optional) - The check interval used to send health check + information to Consul. Defaults to "5s". + + * `disable_registration` (optional) - If true, then Vault will not register + itself with Vault. Defaults to "false". + + * `service` (optional) - The name of the service to register with Consul. + Defaults to "vault". + * `token` (optional) - An access token to use to write data to Consul. * `max_parallel` (optional) - The maximum number of connections to Consul; @@ -227,6 +236,86 @@ settings](https://www.consul.io/docs/agent/encryption.html): [key_file](https://www.consul.io/docs/agent/options.html#key_file) setting in Consul. +``` +// Sample Consul Backend configuration with local Consul Agent +backend "consul" { + // address MUST match Consul's `addresses.http` config value (or + // `addresses.https` depending on the scheme provided below). + address = "127.0.0.1:8500" + #address = "unix:///tmp/.consul.http.sock" + + // scheme defaults to "http" (suitable for loopback and UNIX sockets), but + // should be "https" when Consul exists on a remote node (a non-standard + // deployment). All decryption happen within Vault so this value does not + // change Vault's Threat Model. + scheme = "http" + + // token is a Consul ACL Token that has write privileges to the path + // specified below. Use of a Consul ACL Token is a best pracitce. + token = "[redacted]" // Vault's Consul ACL Token + + // path must be writable by the Consul ACL Token + path = "vault/" +} +``` + +Once properly configured, an unsealed Vault installation should be available +on the network at `active.vault.service.consul`. Unsealed Vault instances in +the standby state are available at `standby.vault.service.consul`. All +unsealed Vault instances are available as healthy in the +`vault.service.consul` pool. Sealed Vault instances will mark themselves as +critical to avoid showing up by default in Consul's service discovery. + +``` +% dig active.vault.service.consul srv +; <<>> DiG 9.8.3-P1 <<>> active.vault.service.consul srv +; (1 server found) +;; global options: +cmd +;; Got answer: +;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 11331 +;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1 +;; WARNING: recursion requested but not available + +;; QUESTION SECTION: +;active.vault.service.consul. IN SRV + +;; ANSWER SECTION: +active.vault.service.consul. 0 IN SRV 1 1 8200 vault1.node.dc1.consul. + +;; ADDITIONAL SECTION: +vault1.node.dc1.consul. 0 IN A 172.17.33.46 + +;; Query time: 0 msec +;; SERVER: 127.0.0.1#53(127.0.0.1) +;; WHEN: Sat Apr 23 17:33:14 2016 +;; MSG SIZE rcvd: 172 +% dig +short standby.vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +% dig +short vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault1.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +% dig +short vault.service.consul a +172.17.33.46 +172.17.34.32 +172.17.35.29 +vault1% vault seal +% dig +short vault.service.consul srv +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +vault1% vault unseal +Key (will be hidden): +Sealed: false +Key Shares: 5 +Key Threshold: 3 +Unseal Progress: 0 +% dig +short vault.service.consul srv +1 1 8200 vault1.node.dc1.consul. +1 1 8200 vault3.node.dc1.consul. +1 1 8200 vault2.node.dc1.consul. +``` + #### Backend Reference: etcd (Community-Supported) For etcd, the following options are supported: