Skip to content

Commit

Permalink
Merge pull request #4 from heschlie/k8s-backend
Browse files Browse the repository at this point in the history
Refactor of confd and add k8s backend

As we are planning to migrate off of confd in the long run I refactored
the code to be a bit more like an actual go repo and switched to Glide
for dependency management.

I have also added a very naive k8s backend that accounts for everything
we currently store in k8s, we will need to add more when we populate the
rest.

- Moved the contents of `src/github.com/kelseyhightower/confd/*` into
  the top level directory
- Migrated to Glide for dependencies
- Added k8s backend to pull needed "keys" from k8s
- Created `watch()` implementation
  - Grabs a watch from the k8s backend where it can
  - sleeps for 10 seconds where we aren't yet storing values
  - sleeps for 1 minute and logs a debug message when we aren't
    expecting that prefix
  • Loading branch information
heschlie authored Mar 17, 2017
2 parents 5dcbb53 + 9822d93 commit 341aa58
Show file tree
Hide file tree
Showing 1,178 changed files with 689 additions and 480,769 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
bin/
pkg/
.idea/
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/kelseyhightower/confd/backends/redis"
"github.com/kelseyhightower/confd/backends/stackengine"
"github.com/kelseyhightower/confd/backends/zookeeper"
"github.com/kelseyhightower/confd/backends/k8s"
"github.com/kelseyhightower/confd/log"
)

Expand Down Expand Up @@ -49,6 +50,9 @@ func New(config Config) (StoreClient, error) {
return dynamodb.NewDynamoDBClient(table)
case "stackengine":
return stackengine.NewStackEngineClient(backendNodes, config.Scheme, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.AuthToken)
case "k8s":
log.Info("Backend set to k8s")
return k8s.NewK8sClient(config.Kubeconfig)
}
return nil, errors.New("Invalid backend")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type Config struct {
NoDiscover bool
Scheme string
Table string
Kubeconfig string
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
300 changes: 300 additions & 0 deletions backends/k8s/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package k8s

import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"fmt"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/pkg/runtime"
"github.com/kelseyhightower/confd/log"
"github.com/projectcalico/libcalico-go/lib/backend/k8s/thirdparty"
kapiv1 "k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
clientapi "k8s.io/client-go/pkg/api"
kerrors "k8s.io/client-go/pkg/api/errors"
"regexp"
"github.com/projectcalico/libcalico-go/lib/backend/k8s/resources"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"strconv"
"time"
)

const (
ipPool = "/calico/v1/ipam/v4/pool"
global = "/calico/bgp/v1/global"
globalASN = "/calico/bgp/v1/global/as_num"
globalNodeMesh = "/calico/bgp/v1/global/node_mesh"
allNodes = "/calico/bgp/v1/host"
globalLogging = "/calico/bgp/v1/global/loglevel"
)

var (
singleNode = regexp.MustCompile("^/calico/bgp/v1/host/([a-zA-Z0-9._-]*)$")
)

type Client struct {
clientSet *kubernetes.Clientset
tprClient *rest.RESTClient
}

func NewK8sClient(kubeconfig string) (*Client, error) {

log.Debug("Building k8s client")

// Set an explicit path to the kubeconfig if one
// was provided.
loadingRules := clientcmd.ClientConfigLoadingRules{}
if kubeconfig != "" {
log.Debug(fmt.Sprintf("Using kubeconfig: \n%s", kubeconfig))
loadingRules.ExplicitPath = kubeconfig
}

// A kubeconfig file was provided. Use it to load a config, passing through
// any overrides.
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&loadingRules, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, err
}

// Create the clientset
cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
log.Debug(fmt.Sprintf("Created k8s clientSet: %+v", cs))

tprClient, err := buildTPRClient(config)
if err != nil {
return nil, err
}
kubeClient := &Client{
clientSet: cs,
tprClient: tprClient,
}

return kubeClient, nil
}

// GetValues takes the etcd like keys and route it to the appropriate k8s API endpoint.
func (c *Client) GetValues(keys []string) (map[string]string, error) {
var kvps = make(map[string]string)
for _, key := range keys {
log.Debug(fmt.Sprintf("Getting key %s", key))
if m := singleNode.FindStringSubmatch(key); m != nil {
host := m[len(m)-1]
kNode, err := c.clientSet.Nodes().Get(host, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = populateNodeDetails(kNode, kvps)
if err != nil {
return nil, err
}
}
switch key {
case global:
// Default to "info" until this makes it into k8s.
kvps[globalLogging] = "info"
// Default to 64512
kvps[globalASN] = "64512"
// Default to true until peering info is available in k8s.
kvps[globalNodeMesh] = `{"enabled": true}`
case globalNodeMesh:
// This is needed as there are calls to 'global' and directly to 'global/node_mesh'
// Default to true until peering configuration is available in k8s.
kvps[globalNodeMesh] = `{"enabled": true}`
case ipPool:
tprs := thirdparty.IpPoolList{}
err := c.tprClient.Get().
Resource("ippools").
Namespace("kube-system").
Do().Into(&tprs)

// Ignore not found errors, as this simply means ippools does
// not exist.
if err != nil {
if !kerrors.IsNotFound(err) {
return nil, err
}
}

for _, tpr := range tprs.Items {
kvp := resources.ThirdPartyToIPPool(&tpr)
cidr := kvp.Key.(model.IPPoolKey).CIDR

if cidr.Version() == 4 {
kvps[ipPool + "/" + tpr.Metadata.Name] = tpr.Spec.Value
}
}
case allNodes:
nodes, err := c.clientSet.Nodes().List(kapiv1.ListOptions{})
if err != nil {
return nil, err
}

for _, kNode := range nodes.Items {
err := populateNodeDetails(&kNode, kvps)
if err != nil {
return nil, err
}
}
}
}
log.Debug(fmt.Sprintf("%v", kvps))
return kvps, nil
}

func (c *Client) WatchPrefix(prefix string, waitIndex uint64, stopChan chan bool, keys []string) (uint64, error) {

if waitIndex == 0 {
switch prefix {
case global:
// We only have defaults for this, and they won't change in the
// API at this time, so we can safely assume we won't be refreshing.
time.Sleep(10 * time.Second)
return waitIndex, nil
case globalNodeMesh:
// This is currently not changeable in k8s.
time.Sleep(10 * time.Second)
return waitIndex, nil
case allNodes:
// Get all nodes. The k8s client does not expose a way to watch a single Node.
nodes, err := c.clientSet.Nodes().List(kapiv1.ListOptions{})
if err != nil {
return 0, err
}
ver := nodes.ListMeta.ResourceVersion

return convertResourceVersionToUint(ver, prefix)
case ipPool:
tprs := thirdparty.IpPoolList{}
err := c.tprClient.Get().
Name("ippool").
Namespace("kube-system").
Do().Into(&tprs)
if err != nil {
if !kerrors.IsNotFound(err) {
return 0, err
}
}

ver := tprs.Metadata.ResourceVersion
return convertResourceVersionToUint(ver, prefix)
default:
// We aren't tracking this key, default to 10 second refresh.
time.Sleep(60 * time.Second)
log.Debug(fmt.Sprintf("Receieved unknown key: %v", prefix))
return waitIndex + 1, nil
}
}

switch prefix {
case global:
// These are currently not changeable in k8s.
time.Sleep(10 * time.Second)
return waitIndex, nil
case globalNodeMesh:
// This is currently not changeable in k8s.
time.Sleep(10 * time.Second)
return waitIndex, nil
case allNodes:
w, err := c.clientSet.Nodes().Watch(kapiv1.ListOptions{})
if err != nil {
return waitIndex, err
}
event := <- w.ResultChan()
ver := event.Object.(*kapiv1.NodeList).ListMeta.ResourceVersion
w.Stop()
log.Debug(fmt.Sprintf("%d : %s", waitIndex, ver))

return convertResourceVersionToUint(ver, prefix)
case ipPool:
w, err := c.tprClient.Get().
Name("ippool").
Namespace("kube-system").
Watch()
if err != nil {
return waitIndex, err
}
event := <- w.ResultChan()
ver := event.Object.(*thirdparty.IpPoolList).Metadata.ResourceVersion
w.Stop()

return convertResourceVersionToUint(ver, prefix)
default:
// We aren't tracking this key, default to 10 second refresh.
time.Sleep(60 * time.Second)
log.Debug(fmt.Sprintf("Receieved unknown key: %v", prefix))
return waitIndex + 1, nil
}
return waitIndex, nil
}

// buildTPRClient builds a RESTClient configured to interact with Calico ThirdPartyResources.
func buildTPRClient(baseConfig *rest.Config) (*rest.RESTClient, error) {
// Generate config using the base config.
cfg := baseConfig
cfg.GroupVersion = &schema.GroupVersion{
Group: "projectcalico.org",
Version: "v1",
}
cfg.APIPath = "/apis"
cfg.ContentType = runtime.ContentTypeJSON
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: clientapi.Codecs}

cli, err := rest.RESTClientFor(cfg)
if err != nil {
return nil, err
}

// We also need to register resources.
schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
*cfg.GroupVersion,
&thirdparty.GlobalConfig{},
&thirdparty.GlobalConfigList{},
&thirdparty.IpPool{},
&thirdparty.IpPoolList{},
&kapiv1.ListOptions{},
&kapiv1.DeleteOptions{},
)
return nil
})
schemeBuilder.AddToScheme(clientapi.Scheme)

return cli, nil
}

// populateNodeDetails populates the given kvps map with values we track from the k8s Node object.
func populateNodeDetails(kNode *kapiv1.Node, kvps map[string]string) (error) {
cNode, err := resources.K8sNodeToCalico(kNode)
if err != nil {
log.Error("Failed to parse k8s Node into Calico Node")
return err
}
node := cNode.Value.(*model.Node)
nodeKey := allNodes + "/" + kNode.Name

if node.FelixIPv4 != nil {
kvps[nodeKey + "/ip_addr_v4"] = node.FelixIPv4.String()
}
if node.BGPIPv4Net != nil {
kvps[nodeKey + "/network_v4"] = node.BGPIPv4Net.String()
}

return nil
}

// convertResourceVersionToUint converts the k8s string resource version to a uint64 expected by confd.
func convertResourceVersionToUint(rv string, prefix string) (uint64, error) {
i, err := strconv.ParseUint(rv, 10, 64)
if err != nil {
log.Error(fmt.Sprintf("Could not convert '%s' resource version %s to uint64", prefix, rv))
return 0, err
}
return i, nil
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash

echo "Building confd..."
gb build all
go build -o bin/confd
File renamed without changes.
6 changes: 6 additions & 0 deletions ...ithub.com/kelseyhightower/confd/config.go → config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
templateConfig template.Config
backendsConfig backends.Config
watch bool
kubeconfig string
)

// A Config structure is used to configure confd.
Expand All @@ -62,6 +63,7 @@ type Config struct {
Table string `toml:"table"`
LogLevel string `toml:"log-level"`
Watch bool `toml:"watch"`
KubeConfig string `toml:"kubeconfig"`
}

func init() {
Expand All @@ -85,6 +87,7 @@ func init() {
flag.StringVar(&srvDomain, "srv-domain", "", "the name of the resource record")
flag.StringVar(&table, "table", "", "the name of the DynamoDB table (only used with -backend=dynamodb)")
flag.BoolVar(&watch, "watch", false, "enable watch support")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Kubernetes kubeconfig file path")
}

// initConfig initializes the confd configuration by first setting defaults,
Expand Down Expand Up @@ -187,6 +190,7 @@ func initConfig() error {
NoDiscover: config.NoDiscover,
Scheme: config.Scheme,
Table: config.Table,
Kubeconfig: config.KubeConfig,
}
// Template configuration.
templateConfig = template.Config{
Expand Down Expand Up @@ -272,5 +276,7 @@ func setConfigFromFlag(f *flag.Flag) {
config.LogLevel = logLevel
case "watch":
config.Watch = watch
case "kubeconfig":
config.KubeConfig = kubeconfig
}
}
File renamed without changes.
Loading

0 comments on commit 341aa58

Please sign in to comment.