Skip to content

Commit

Permalink
Merge pull request #119038 from serathius/automated-cherry-pick-of-#1…
Browse files Browse the repository at this point in the history
…18460-origin-release-1.26

Automated cherry pick of #118460: Make etcd component status consistent with health probes

Kubernetes-commit: 728dfdf2985fe228a3bc88071ee9c039d0daaeef
  • Loading branch information
k8s-publishing-bot committed Jul 11, 2023
2 parents e4f93e2 + ad1485b commit e711f1b
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 15 deletions.
8 changes: 8 additions & 0 deletions pkg/server/options/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) str
return resource.Group + "/" + resource.Resource
}

func (s *SimpleStorageFactory) Configs() []storagebackend.Config {
return serverstorage.Configs(s.StorageConfig)
}

func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
// nothing should ever call this method but we still provide a functional implementation
return serverstorage.Backends(s.StorageConfig)
Expand Down Expand Up @@ -471,6 +475,10 @@ func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource
return t.delegate.ResourcePrefix(resource)
}

func (t *transformerStorageFactory) Configs() []storagebackend.Config {
return t.delegate.Configs()
}

func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
return t.delegate.Backends()
}
49 changes: 45 additions & 4 deletions pkg/server/storage/storage_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"io/ioutil"
"strings"

"k8s.io/klog/v2"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)

// Backend describes the storage servers, the information here should be enough
Expand All @@ -52,8 +51,12 @@ type StorageFactory interface {
// centralized control over the shape of etcd directories
ResourcePrefix(groupResource schema.GroupResource) string

// Configs gets configurations for all of registered storage destinations.
Configs() []storagebackend.Config

// Backends gets all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Deprecated: Use Configs instead
Backends() []Backend
}

Expand Down Expand Up @@ -276,14 +279,52 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
return storageConfig.ForResource(groupResource), nil
}

// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Configs implements StorageFactory.
func (s *DefaultStorageFactory) Configs() []storagebackend.Config {
return configs(s.StorageConfig, s.Overrides)
}

// Configs gets configurations for all of registered storage destinations.
func Configs(storageConfig storagebackend.Config) []storagebackend.Config {
return configs(storageConfig, nil)
}

// Returns all storage configurations including those for group resource overrides
func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config {
locations := sets.NewString()
configs := []storagebackend.Config{}
for _, loc := range storageConfig.Transport.ServerList {
// copy
newConfig := storageConfig
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}

for _, override := range grOverrides {
for _, loc := range override.etcdLocation {
if locations.Has(loc) {
continue
}
// copy
newConfig := storageConfig
override.Apply(&newConfig, &StorageCodecConfig{})
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}
}
return configs
}

// Backends implements StorageFactory.
func (s *DefaultStorageFactory) Backends() []Backend {
return backends(s.StorageConfig, s.Overrides)
}

// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func Backends(storageConfig storagebackend.Config) []Backend {
return backends(storageConfig, nil)
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/server/storage/storage_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) {

}
}

func TestConfigs(t *testing.T) {
exampleinstall.Install(scheme)
defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"}

testCases := []struct {
resource schema.GroupResource
servers []string
wantConfigs []storagebackend.Config
}{
{
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true},
},
},
}

for i, test := range testCases {
defaultConfig := storagebackend.Config{
Prefix: "/registry",
Transport: storagebackend.TransportConfig{
ServerList: defaultEtcdLocations,
},
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil)
if len(test.servers) > 0 {
storageFactory.SetEtcdLocation(test.resource, test.servers)
}

got := storageFactory.Configs()
if !reflect.DeepEqual(test.wantConfigs, got) {
t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got)
continue
}
}
}
1 change: 1 addition & 0 deletions pkg/storage/etcd3/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type etcdHealth struct {
}

// EtcdHealthCheck decodes data returned from etcd /healthz handler.
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
Expand Down
61 changes: 50 additions & 11 deletions pkg/storage/storagebackend/factory/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,18 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
// retry in a loop in the background until we successfully create the client, storing the client or error encountered

lock := sync.RWMutex{}
var client *clientv3.Client
var prober *etcd3Prober
clientErr := fmt.Errorf("etcd client connection not yet established")

go wait.PollUntil(time.Second, func() (bool, error) {
newClient, err := newETCD3Client(c.Transport)
newProber, err := newETCD3Prober(c)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
select {
case <-stopCh:
if err == nil {
newClient.Close()
newProber.Close()
}
return true, nil
default:
Expand All @@ -173,7 +173,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
clientErr = err
return false, nil
}
client = newClient
prober = newProber
clientErr = nil
return true, nil
}, stopCh)
Expand All @@ -185,8 +185,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan

lock.Lock()
defer lock.Unlock()
if client != nil {
client.Close()
if prober != nil {
prober.Close()
clientErr = fmt.Errorf("server is shutting down")
}
}()
Expand Down Expand Up @@ -214,17 +214,56 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
now := time.Now()
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
if err != nil {
err = fmt.Errorf("error getting data from etcd: %w", err)
}
err := prober.Probe(ctx)
lastError.Store(err, now)
return err
}, nil
}

func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
client, err := newETCD3Client(c.Transport)
if err != nil {
return nil, err
}
return &etcd3Prober{
client: client,
prefix: c.Prefix,
}, nil
}

type etcd3Prober struct {
prefix string

mux sync.RWMutex
client *clientv3.Client
closed bool
}

func (p *etcd3Prober) Close() error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.closed {
p.closed = true
return p.client.Close()
}
return fmt.Errorf("prober was closed")
}

func (p *etcd3Prober) Probe(ctx context.Context) error {
p.mux.RLock()
defer p.mux.RUnlock()
if p.closed {
return fmt.Errorf("prober was closed")
}
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
if err != nil {
return fmt.Errorf("error getting data from etcd: %w", err)
}
return nil
}

var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/storagebackend/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package factory

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -61,3 +62,20 @@ func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() e
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

func CreateProber(c storagebackend.Config) (Prober, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Prober(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

// Prober is an interface that defines the Probe function for doing etcd readiness/liveness checks.
type Prober interface {
Probe(ctx context.Context) error
Close() error
}

0 comments on commit e711f1b

Please sign in to comment.