Skip to content
This repository has been archived by the owner on Apr 10, 2024. It is now read-only.

Refactor namespace discovery to have option to use shared informers instead of permanent for loop #22

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsC
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
Expand Down
189 changes: 169 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

var (
Expand All @@ -25,10 +31,13 @@ var (
configAllServiceAccount bool = false
configDockerconfigjson string = ""
configDockerConfigJSONPath string = ""
configSecretName string = "image-pull-secret" // default to image-pull-secret
configSecretName string = "image-pull-secret" // default to image-pull-secret
configSecretNamespace string = "imagepullsecret-patcher" // default to imagepullsecret-patcher
configExcludedNamespaces string = ""
configServiceAccounts string = defaultServiceAccountName
configUseInfromers bool = true
configLoopDuration time.Duration = 10 * time.Second
configRunningInCluster bool = true

dockerConfigJSON string
)
Expand All @@ -51,9 +60,12 @@ func main() {
flag.StringVar(&configDockerconfigjson, "dockerconfigjson", LookupEnvOrString("CONFIG_DOCKERCONFIGJSON", configDockerconfigjson), "json credential for authenicating container registry, exclusive with `dockerconfigjsonpath`")
flag.StringVar(&configDockerConfigJSONPath, "dockerconfigjsonpath", LookupEnvOrString("CONFIG_DOCKERCONFIGJSONPATH", configDockerConfigJSONPath), "path to json file containing credentials for the registry to be distributed, exclusive with `dockerconfigjson`")
flag.StringVar(&configSecretName, "secretname", LookupEnvOrString("CONFIG_SECRETNAME", configSecretName), "set name of managed secrets")
flag.StringVar(&configSecretNamespace, "secretnamespace", LookupEnvOrString("CONFIG_SECRET_NAMESPACE", configSecretNamespace), "namespace where original secret can be found")
flag.StringVar(&configExcludedNamespaces, "excluded-namespaces", LookupEnvOrString("CONFIG_EXCLUDED_NAMESPACES", configExcludedNamespaces), "comma-separated namespaces excluded from processing")
flag.StringVar(&configServiceAccounts, "serviceaccounts", LookupEnvOrString("CONFIG_SERVICEACCOUNTS", configServiceAccounts), "comma-separated list of serviceaccounts to patch")
flag.BoolVar(&configUseInfromers, "use-infromers", LookUpEnvOrBool("CONFIG_USE_INFROMERS", configUseInfromers), "if true, k8s informers to detect when new namespace is created and then it will run patching process, if false it runs in a loop for all namespaces")
flag.DurationVar(&configLoopDuration, "loop-duration", LookupEnvOrDuration("CONFIG_LOOP_DURATION", configLoopDuration), "String defining the loop duration")
flag.BoolVar(&configRunningInCluster, "running-in-cluster", LookUpEnvOrBool("CONFIG_RUNNING_IN_CLUSTER", configRunningInCluster), "if false, will use kubeconfig and current context to connect to k8s API")
flag.Parse()

// setup logrus
Expand All @@ -66,12 +78,29 @@ func main() {
if configDockerconfigjson != "" && configDockerConfigJSONPath != "" {
log.Panic(fmt.Errorf("Cannot specify both `configdockerjson` and `configdockerjsonpath`"))
}

// create k8s clientset from in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
log.Panic(err)
var config *rest.Config
var err error
if configRunningInCluster {
// create k8s config from in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
log.Panic(err)
}
} else {
// create k8s config from local kubeconfig
var kubeconfig *string
var err error
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Panic(err)
}
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err)
Expand All @@ -80,26 +109,145 @@ func main() {
clientset: clientset,
}

for {
log.Debug("Loop started")
loop(k8s)
if configRunOnce {
log.Info("Exiting after single loop per `CONFIG_RUNONCE`")
os.Exit(0)
// Populate secret value to set
dockerConfigJSON, err = getDockerConfigJSON()
if err != nil {
log.Panic(err)
}

if configUseInfromers {
log.Debug("Informer started")
startInformers(k8s)
} else {
for {
log.Debug("Loop started")
loop(k8s)
if configRunOnce {
log.Info("Exiting after single loop per `CONFIG_RUNONCE`")
os.Exit(0)
}
time.Sleep(configLoopDuration)
}
time.Sleep(configLoopDuration)
}
}

func startInformers(k8s *k8sClient) {
factory := informers.NewSharedInformerFactory(k8s.clientset, 0)
namespaceInformer := factory.Core().V1().Namespaces().Informer()
secretInformer := factory.Core().V1().Secrets().Informer()
serviceAccountInformer := factory.Core().V1().ServiceAccounts().Informer()
stopper := make(chan struct{})
defer close(stopper)

secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret := obj.(*v1.Secret)
if secret.Name == configSecretName && secret.Namespace == configSecretNamespace {
log.Debugf("Original secret [%s] in namespace [%s]", secret.Name, secret.Namespace)
}
},
DeleteFunc: func(obj interface{}) {
secret := obj.(*v1.Secret)
if secret.Name == configSecretName && secret.Namespace != configSecretNamespace {
log.Debugf("Deleted secret [%s] in namespace [%s]", secret.Name, secret.Namespace)
var err error
namespace := secret.Namespace
namespaceObj, err := k8s.clientset.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{})
if err != nil {
log.Panic(err)
}
if namespaceIsExcluded(*namespaceObj) {
log.Infof("[%s] Namespace skipped", namespaceObj.Name)
} else {
if namespaceObj.Status.Phase != "Terminating" {
log.Debugf("[%s] Start processing secret", namespace)
// for each namespace, make sure the dockerconfig secret exists
err = processSecret(k8s, namespace)
if err != nil {
// if has error in processing secret, should skip processing service account
log.Error(err)
} else {
log.Debugf("[%s] Start processing service account", namespace)
// get default service account, and patch image pull secret if not exist
err = processServiceAccount(k8s, namespace)
if err != nil {
log.Error(err)
}
}
} else {
log.Debugf("[%s] namespace is in phase %s", namespace, namespaceObj.Status.Phase)
}
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
secret := oldObj.(*v1.Secret)
if secret.Name == configSecretName && secret.Namespace == configSecretNamespace {
log.Debugf("Updated secret [%s] in namespace [%s]", secret.Name, secret.Namespace)
log.Debug("Running update loop")
loop(k8s)
}
},
})

namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var err error
ns := obj.(*corev1.Namespace)
namespace := ns.Name
log.Debugf("[%s] Namespace discovered", namespace)
if namespaceIsExcluded(*ns) {
log.Infof("[%s] Namespace skipped", namespace)
} else {
log.Debugf("[%s] Start processing secret", namespace)
// for each namespace, make sure the dockerconfig secret exists
err = processSecret(k8s, namespace)
if err != nil {
// if has error in processing secret, should skip processing service account
log.Error(err)
}
}
},
DeleteFunc: func(obj interface{}) {
ns := obj.(*corev1.Namespace)
namespace := ns.Name
log.Debugf("[%s] Namespace deleted", namespace)
},
})
serviceAccountInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// var err error
sa := obj.(*corev1.ServiceAccount)
serviceAccount := sa.Name
namespace := sa.Namespace
namespaceObj, err := k8s.clientset.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{})
if err != nil {
log.Panic(err)
}
log.Debugf("[%s] ServiceAccount discovered in namespace %s", serviceAccount, sa.Namespace)
if namespaceIsExcluded(*namespaceObj) {
log.Infof("[%s] Namespace skipped", namespace)
} else {
log.Debugf("[%s] Start processing service account", namespace)
// get default service account, and patch image pull secret if not exist
err = processServiceAccount(k8s, namespace)
if err != nil {
log.Error(err)
}
}
},
})
log.Info("Namespace informer started")
go namespaceInformer.Run(stopper)
log.Info("ServiceAccount informer started")
go serviceAccountInformer.Run(stopper)
log.Info("Secret informer started")
secretInformer.Run(stopper)
}

func loop(k8s *k8sClient) {
var err error

// Populate secret value to set
dockerConfigJSON, err = getDockerConfigJSON()
if err != nil {
log.Panic(err)
}

// get all namespaces
namespaces, err := k8s.clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
Expand All @@ -113,7 +261,7 @@ func loop(k8s *k8sClient) {
log.Infof("[%s] Namespace skipped", namespace)
continue
}
log.Debugf("[%s] Start processing", namespace)
log.Debugf("[%s] Start processing secret", namespace)
// for each namespace, make sure the dockerconfig secret exists
err = processSecret(k8s, namespace)
if err != nil {
Expand All @@ -122,6 +270,7 @@ func loop(k8s *k8sClient) {
continue
}
// get default service account, and patch image pull secret if not exist
log.Debugf("[%s] Start processing service account", namespace)
err = processServiceAccount(k8s, namespace)
if err != nil {
log.Error(err)
Expand Down