Skip to content

Commit

Permalink
Merge pull request #5 from topfreegames/fix/discovery-service-creation
Browse files Browse the repository at this point in the history
Fix discovery service creation
  • Loading branch information
csmartins authored May 2, 2023
2 parents 2755f3f + ccacfae commit b154793
Show file tree
Hide file tree
Showing 5 changed files with 942 additions and 44 deletions.
14 changes: 11 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (c *Cluster) setup(ctx context.Context) error {
}

if shouldCreateCluster {
if c.cluster.Spec.ClusteringMode == "discovery" {
if err := c.setupServices(ctx); err != nil {
c.logger.Errorf("fail to setup etcd services: %v", err)
}
}
return c.create(ctx)
}
return nil
Expand Down Expand Up @@ -378,21 +383,21 @@ func (c *Cluster) Update(cl *api.EtcdCluster) {
func (c *Cluster) setupServices(ctx context.Context) error {
if c.cluster.Spec.Services != nil {
for _, service := range c.cluster.Spec.Services {
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, service.Name, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient(), service)
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, service.Name, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient(), service, c.cluster.Spec.ClusteringMode, k8sutil.CreateSvc)
if err != nil {
return err
}
c.status.ServiceName = append(c.status.ServiceName, service.Name)
}
} else {
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, k8sutil.ClientServiceName(c.cluster.Name), c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient(), nil)
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, k8sutil.ClientServiceName(c.cluster.Name), c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient(), nil, c.cluster.Spec.ClusteringMode, k8sutil.CreateSvc)
if err != nil {
return err
}
c.status.ServiceName = append(c.status.ServiceName, k8sutil.ClientServiceName(c.cluster.Name))
}

return k8sutil.CreatePeerService(ctx, c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient())
return k8sutil.CreatePeerService(ctx, c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner(), c.isSecureClient(), c.cluster.Spec.ClusteringMode, k8sutil.CreateSvc)
}

func (c *Cluster) isPodPVEnabled() bool {
Expand All @@ -409,6 +414,9 @@ func (c *Cluster) createPod(ctx context.Context, members etcdutil.MemberSet, m *
}

pod, err := k8sutil.NewEtcdPod(ctx, c.config.KubeCli, m, members.PeerURLPairs(), c.cluster.Name, c.cluster.Namespace, state, token, c.cluster.Spec, c.cluster.AsOwner())
if err != nil {
return err
}
if c.isPodPVEnabled() {
pvc := k8sutil.NewEtcdPodPVC(m, *c.cluster.Spec.Pod.PersistentVolumeClaimSpec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
_, err := c.config.KubeCli.CoreV1().PersistentVolumeClaims(c.cluster.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Cluster) reconcile(ctx context.Context, pods []*v1.Pod, services map[st

sp := c.cluster.Spec
if c.cluster.Spec.Services != nil {
c.reconcileServices(ctx, services)
c.reconcileServices(ctx, services, k8sutil.CreateSvc)
c.status.SetClusterServiceName(services)
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Cluster) diffServices(services map[string]*v1.Service) ([]*api.ServiceP
return newServices, unknownServices, nil
}

func (c *Cluster) reconcileServices(ctx context.Context, services map[string]*v1.Service) error {
func (c *Cluster) reconcileServices(ctx context.Context, services map[string]*v1.Service, createSvc k8sutil.CreateService) error {

newServices, unknownServices, err := c.diffServices(services)
if err != nil {
Expand All @@ -129,7 +129,7 @@ func (c *Cluster) reconcileServices(ctx context.Context, services map[string]*v1

if len(newServices) > 0 {
for _, service := range newServices {
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, service.Name, c.cluster.GetName(), c.cluster.GetNamespace(), c.cluster.AsOwner(), c.isSecureClient(), service)
err := k8sutil.CreateClientService(ctx, c.config.KubeCli, service.Name, c.cluster.GetName(), c.cluster.GetNamespace(), c.cluster.AsOwner(), c.isSecureClient(), service, c.cluster.Spec.ClusteringMode, createSvc)
if err != nil {
return err
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/cluster/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"testing"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)

Expand Down Expand Up @@ -129,11 +131,29 @@ func TestReconcileServices(t *testing.T) {
},
}

createSvc := func(ctx context.Context, kubecli kubernetes.Interface, svcName string, clstrName string, ns string, ports []v1.ServicePort, owner metav1.OwnerReference, publishNotReadyAddresses bool, service *api.ServicePolicy, annotations map[string]string) error {
labels := k8sutil.LabelsForCluster(clstrName)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Labels: labels,
Annotations: annotations,
},
Spec: v1.ServiceSpec{
Ports: ports,
Selector: labels,
PublishNotReadyAddresses: publishNotReadyAddresses,
},
}
kubecli.(*fake.Clientset).CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
return nil
}

for _, tt := range tests {
c.config.KubeCli.(*fake.Clientset).ClearActions()

c.cluster.Spec.Services = tt.desiredServices
err := c.reconcileServices(context.Background(), tt.currentServices)
err := c.reconcileServices(context.Background(), tt.currentServices, createSvc)
if err != nil {
t.Fatalf("%s: %s", tt.name, err)
}
Expand Down
143 changes: 114 additions & 29 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,40 @@ const (

var ErrDiscoveryTokenNotProvided = errors.New("cluster token not provided, you must provide a token when clustering mode is discovery")

var CreateSvc CreateService = func(ctx context.Context, kubecli kubernetes.Interface, svcName string, clusterName string, ns string, ports []v1.ServicePort, owner metav1.OwnerReference, publishNotReadyAddresses bool, service *api.ServicePolicy, annotations map[string]string) error {
svc := newEtcdServiceManifest(svcName, clusterName, ports, publishNotReadyAddresses, annotations)

applyServicePolicy(svc, service)
addOwnerRefToObject(svc.GetObjectMeta(), owner)
_, err := kubecli.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
done := make(chan string)
go getServiceStatus(ctx, kubecli, svcName, ns, done)
status := <- done
if status != "created" {
return fmt.Errorf("failed to finish the service creation: %v", status)
}
return nil
}

func getServiceStatus(ctx context.Context, kubecli kubernetes.Interface, svcName string, ns string, status chan string) {
for i := 0; i < 20; i++ {
service, err := kubecli.CoreV1().Services(ns).Get(ctx, svcName, metav1.GetOptions{})
if err != nil {
status <- err.Error()
}
if service.Spec.Type == v1.ServiceTypeLoadBalancer && len(service.Status.LoadBalancer.Ingress) == 0 {
time.Sleep(30 * time.Second)
} else {
status <- "created"
return
}
}
status <- "timeout creating service"
}

func GetEtcdVersion(pod *v1.Pod) string {
return pod.Annotations[etcdVersionAnnotationKey]
}
Expand Down Expand Up @@ -157,7 +191,35 @@ func PodWithNodeSelector(p *v1.Pod, ns map[string]string) *v1.Pod {
return p
}

func CreateClientService(ctx context.Context, kubecli kubernetes.Interface, serviceName, clusterName, ns string, owner metav1.OwnerReference, tls bool, service *api.ServicePolicy) error {
func setupClientServiceObject(clusteringMode string) (*api.ServicePolicy, map[string]string) {
annotations := map[string]string{}
service := api.ServicePolicy{}
if clusteringMode == "discovery" {
service.Type = v1.ServiceTypeLoadBalancer
annotations["service.beta.kubernetes.io/aws-load-balancer-nlb-target-type"] = "instance"
annotations["service.beta.kubernetes.io/aws-load-balancer-type"] = "external"
annotations["service.beta.kubernetes.io/aws-load-balancer-scheme"] = "internet-facing"
return &service, annotations
} else {
return nil, annotations
}
}

func setupPeerServiceObject(clusteringMode string) (api.ServicePolicy, map[string]string) {
service := api.ServicePolicy{}
annotations := map[string]string{}
if clusteringMode == "discovery"{
service.Type = v1.ServiceTypeLoadBalancer
annotations["service.beta.kubernetes.io/aws-load-balancer-nlb-target-type"] = "instance"
annotations["service.beta.kubernetes.io/aws-load-balancer-type"] = "external"
} else {
service.Type = v1.ServiceTypeClusterIP
service.ClusterIP = v1.ClusterIPNone
}
return service, annotations
}

func CreateClientService(ctx context.Context, kubecli kubernetes.Interface, serviceName, clusterName, ns string, owner metav1.OwnerReference, tls bool, service *api.ServicePolicy, clusteringMode string, createSvc CreateService) error {

if len(serviceName) == 0 {
return fmt.Errorf("fail to create service: name isn't defined")
Expand All @@ -176,7 +238,7 @@ func CreateClientService(ctx context.Context, kubecli kubernetes.Interface, serv
TargetPort: intstr.FromInt(EtcdClientPort),
Protocol: v1.ProtocolTCP,
}}

var err error = nil
if service != nil {
var clientPorts []v1.ServicePort
Expand All @@ -185,15 +247,17 @@ func CreateClientService(ctx context.Context, kubecli kubernetes.Interface, serv
} else {
clientPorts = defaultPort
}
err = createService(ctx, kubecli, serviceName, clusterName, ns, clientPorts, owner, false, service)
err = createSvc(ctx, kubecli, serviceName, clusterName, ns, clientPorts, owner, false, service, service.Annotations)
} else {
err = createService(ctx, kubecli, serviceName, clusterName, ns, defaultPort, owner, false, nil)
service, annotations := setupClientServiceObject(clusteringMode)

err = createSvc(ctx, kubecli, serviceName, clusterName, ns, defaultPort, owner, false, service, annotations)
}

return err
}

func CreatePeerService(ctx context.Context, kubecli kubernetes.Interface, clusterName, ns string, owner metav1.OwnerReference, tls bool) error {
func CreatePeerService(ctx context.Context, kubecli kubernetes.Interface, clusterName, ns string, owner metav1.OwnerReference, tls bool, clusteringMode string, createSvc CreateService) error {

var EtcdClientPortName string
if tls {
Expand All @@ -213,26 +277,16 @@ func CreatePeerService(ctx context.Context, kubecli kubernetes.Interface, cluste
TargetPort: intstr.FromInt(2380),
Protocol: v1.ProtocolTCP,
}}

service := &api.ServicePolicy{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
}

return createService(ctx, kubecli, clusterName, clusterName, ns, ports, owner, true, service)

service, annotations := setupPeerServiceObject(clusteringMode)
publishNotReadyAddresses := true

return createSvc(ctx, kubecli, clusterName, clusterName, ns, ports, owner, publishNotReadyAddresses, &service, annotations)
}

func createService(ctx context.Context, kubecli kubernetes.Interface, svcName, clusterName, ns string, ports []v1.ServicePort, owner metav1.OwnerReference, publishNotReadyAddresses bool, service *api.ServicePolicy) error {
svc := newEtcdServiceManifest(svcName, clusterName, ports, publishNotReadyAddresses)

applyServicePolicy(svc, service)
addOwnerRefToObject(svc.GetObjectMeta(), owner)
_, err := kubecli.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}
type (
CreateService func(ctx context.Context, kubecli kubernetes.Interface, svcName string, clusterName string, ns string, ports []v1.ServicePort, owner metav1.OwnerReference, publishNotReadyAddresses bool, service *api.ServicePolicy, annotations map[string]string) error
)

// CreateAndWaitPod creates a pod and waits until it is running
func CreateAndWaitPod(ctx context.Context, kubecli kubernetes.Interface, ns string, pod *v1.Pod, timeout time.Duration) (*v1.Pod, error) {
Expand Down Expand Up @@ -268,13 +322,13 @@ func CreateAndWaitPod(ctx context.Context, kubecli kubernetes.Interface, ns stri
return retPod, nil
}

func newEtcdServiceManifest(svcName, clusterName string, ports []v1.ServicePort, publishNotReadyAddresses bool) *v1.Service {
func newEtcdServiceManifest(svcName, clusterName string, ports []v1.ServicePort, publishNotReadyAddresses bool, annotations map[string]string) *v1.Service {
labels := LabelsForCluster(clusterName)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Labels: labels,
Annotations: map[string]string{},
Annotations: annotations,
},
Spec: v1.ServiceSpec{
Ports: ports,
Expand Down Expand Up @@ -361,12 +415,25 @@ func ClientServiceName(clusterName string) string {
return clusterName + "-client"
}

func setupEtcdCommand(dataDir string, m *etcdutil.Member, initialCluster string, clusterState string, clusterToken string, clusteringMode string) (string, error) {
func setupPeerServiceURL(endpoint string) string {
return fmt.Sprintf("http://%s:2380", endpoint)
}

func setupClientServiceURL(endpoint string) string {
return fmt.Sprintf("http://%s:2379", endpoint)
}

func setupEtcdCommand(dataDir string, m *etcdutil.Member, initialCluster string, clusterState string, clusterToken string, clusteringMode string, service v1.Service) (string, error) {
if clusteringMode == "discovery" {
fmt.Printf("Services url list: %v", service.Status.LoadBalancer.Ingress)
serviceUrl := service.Status.LoadBalancer.Ingress[0].Hostname
if serviceUrl == "" {
return "", fmt.Errorf("failed to get service url: %v", service)
}
command := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+
"--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+
"--discovery=%s/%s",
dataDir, m.Name, m.PeerURL(), m.ListenPeerURL(), m.ListenClientURL(), m.ClientURL(), discoveryEndpoint, clusterToken)
dataDir, m.Name, setupPeerServiceURL(serviceUrl), m.ListenPeerURL(), m.ListenClientURL(), setupClientServiceURL(serviceUrl), discoveryEndpoint, clusterToken)
return command, nil
} else {
command := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+
Expand All @@ -381,7 +448,22 @@ func setupEtcdCommand(dataDir string, m *etcdutil.Member, initialCluster string,
}

func newEtcdPod(ctx context.Context, kubecli kubernetes.Interface, m *etcdutil.Member, initialCluster []string, clusterName, clusterNamespace, state, token string, cs api.ClusterSpec) (*v1.Pod, error) {
command, err := setupEtcdCommand(dataDir, m, strings.Join(initialCluster, ","), state, token, cs.ClusteringMode)
var service v1.Service
if cs.ClusteringMode == "discovery" {
services, err := kubecli.CoreV1().Services(clusterNamespace).List(ctx, metav1.ListOptions{})
fmt.Printf("Services list: %v", services)
if err != nil {
fmt.Printf("%s",err.Error())
return nil, err
}
for _, svc := range services.Items {
if svc.ObjectMeta.Name == clusterName {
service = svc
}
}

}
command, err := setupEtcdCommand(dataDir, m, strings.Join(initialCluster, ","), state, token, cs.ClusteringMode, service)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -520,9 +602,12 @@ func podSecurityContext(podPolicy *api.PodPolicy) *v1.PodSecurityContext {

func NewEtcdPod(ctx context.Context, kubecli kubernetes.Interface, m *etcdutil.Member, initialCluster []string, clusterName, clusterNamespace, state, token string, cs api.ClusterSpec, owner metav1.OwnerReference) (*v1.Pod, error) {
pod, err := newEtcdPod(ctx, kubecli, m, initialCluster, clusterName, clusterNamespace, state, token, cs)
if err != nil {
return nil, err
}
applyPodPolicy(clusterName, pod, cs.Pod)
addOwnerRefToObject(pod.GetObjectMeta(), owner)
return pod, err
return pod, nil
}

func MustNewKubeClient() kubernetes.Interface {
Expand Down
Loading

0 comments on commit b154793

Please sign in to comment.