Skip to content

Commit

Permalink
Merge pull request #869 from hidekazuna/refactor_lb
Browse files Browse the repository at this point in the history
Add some events related to load balancer
  • Loading branch information
k8s-ci-robot authored May 26, 2021
2 parents 4c2b4db + 7001d48 commit 7326419
Showing 1 changed file with 146 additions and 87 deletions.
233 changes: 146 additions & 87 deletions pkg/cloud/services/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,10 @@ func (s *Service) ReconcileLoadBalancer(openStackCluster *infrav1.OpenStackClust
loadBalancerName := getLoadBalancerName(clusterName)
s.logger.Info("Reconciling load balancer", "name", loadBalancerName)

// lb
lb, err := s.checkIfLbExists(loadBalancerName)
lb, err := s.getOrCreateLoadBalancer(openStackCluster, loadBalancerName, openStackCluster.Status.Network.Subnet.ID, clusterName)
if err != nil {
return err
}
if lb == nil {
s.logger.Info(fmt.Sprintf("Creating load balancer in subnet: %q", openStackCluster.Status.Network.Subnet.ID), "name", loadBalancerName)
lbCreateOpts := loadbalancers.CreateOpts{
Name: loadBalancerName,
VipSubnetID: openStackCluster.Status.Network.Subnet.ID,
Description: names.GetDescription(clusterName),
}

mc := metrics.NewMetricPrometheusContext("loadbalancer", "create")
lb, err = loadbalancers.Create(s.loadbalancerClient, lbCreateOpts).Extract()

if mc.ObserveRequest(err) != nil {
record.Warnf(openStackCluster, "FailedCreateLoadBalancer", "Failed to create load balancer %s: %v", loadBalancerName, err)
return err
}
record.Eventf(openStackCluster, "SuccessfulCreateLoadBalancer", "Created load balancer %s with id %s", loadBalancerName, lb.ID)
}
if err := s.waitForLoadBalancerActive(lb.ID); err != nil {
return err
}

floatingIPAddress := openStackCluster.Spec.ControlPlaneEndpoint.Host
if openStackCluster.Spec.APIServerFloatingIP != "" {
Expand All @@ -82,83 +61,21 @@ func (s *Service) ReconcileLoadBalancer(openStackCluster *infrav1.OpenStackClust
return err
}

// lb listener
portList := []int{int(openStackCluster.Spec.ControlPlaneEndpoint.Port)}
portList = append(portList, openStackCluster.Spec.APIServerLoadBalancerAdditionalPorts...)
for _, port := range portList {
lbPortObjectsName := fmt.Sprintf("%s-%d", loadBalancerName, port)

listener, err := s.checkIfListenerExists(lbPortObjectsName)
listener, err := s.getOrCreateListener(openStackCluster, lbPortObjectsName, lb.ID, port)
if err != nil {
return err
}
if listener == nil {
s.logger.Info("Creating load balancer listener", "name", lbPortObjectsName, "lb-id", lb.ID)
listenerCreateOpts := listeners.CreateOpts{
Name: lbPortObjectsName,
Protocol: "TCP",
ProtocolPort: port,
LoadbalancerID: lb.ID,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_listener", "create")
listener, err = listeners.Create(s.loadbalancerClient, listenerCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
return fmt.Errorf("error creating listener: %s", err)
}
}
if err := s.waitForLoadBalancerActive(lb.ID); err != nil {
return err
}

if err := s.waitForListener(listener.ID, "ACTIVE"); err != nil {
return err
}

// lb pool
pool, err := s.checkIfPoolExists(lbPortObjectsName)
pool, err := s.getOrCreatePool(openStackCluster, lbPortObjectsName, listener.ID, lb.ID)
if err != nil {
return err
}
if pool == nil {
s.logger.Info(fmt.Sprintf("Creating load balancer pool for listener %q", listener.ID), "name", lbPortObjectsName, "lb-id", lb.ID)
poolCreateOpts := pools.CreateOpts{
Name: lbPortObjectsName,
Protocol: "TCP",
LBMethod: pools.LBMethodRoundRobin,
ListenerID: listener.ID,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_pool", "create")
pool, err = pools.Create(s.loadbalancerClient, poolCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
return fmt.Errorf("error creating pool: %s", err)
}
}
if err := s.waitForLoadBalancerActive(lb.ID); err != nil {
return err
}

// lb monitor
monitor, err := s.checkIfMonitorExists(lbPortObjectsName)
if err != nil {
return err
}
if monitor == nil {
s.logger.Info(fmt.Sprintf("Creating load balancer monitor for pool %q", pool.ID), "name", lbPortObjectsName, "lb-id", lb.ID)
monitorCreateOpts := monitors.CreateOpts{
Name: lbPortObjectsName,
PoolID: pool.ID,
Type: "TCP",
Delay: 30,
Timeout: 5,
MaxRetries: 3,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_healthmonitor", "create")
_, err = monitors.Create(s.loadbalancerClient, monitorCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
return fmt.Errorf("error creating monitor: %s", err)
}
}
if err = s.waitForLoadBalancerActive(lb.ID); err != nil {
if err := s.getOrCreateMonitor(openStackCluster, lbPortObjectsName, pool.ID, lb.ID); err != nil {
return err
}
}
Expand All @@ -172,6 +89,148 @@ func (s *Service) ReconcileLoadBalancer(openStackCluster *infrav1.OpenStackClust
return nil
}

func (s *Service) getOrCreateLoadBalancer(openStackCluster *infrav1.OpenStackCluster, loadBalancerName, subnetID, clusterName string) (*loadbalancers.LoadBalancer, error) {
lb, err := s.checkIfLbExists(loadBalancerName)
if err != nil {
return nil, err
}

if lb != nil {
return lb, nil
}

s.logger.Info(fmt.Sprintf("Creating load balancer in subnet: %q", subnetID), "name", loadBalancerName)

lbCreateOpts := loadbalancers.CreateOpts{
Name: loadBalancerName,
VipSubnetID: subnetID,
Description: names.GetDescription(clusterName),
}
mc := metrics.NewMetricPrometheusContext("loadbalancer", "create")
lb, err = loadbalancers.Create(s.loadbalancerClient, lbCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
record.Warnf(openStackCluster, "FailedCreateLoadBalancer", "Failed to create load balancer %s: %v", loadBalancerName, err)
return nil, err
}

if err := s.waitForLoadBalancerActive(lb.ID); err != nil {
record.Warnf(openStackCluster, "FailedCreateLoadBalancer", "Failed to create load balancer %s with id %s: wait for load balancer active: %v", loadBalancerName, lb.ID, err)
return nil, err
}

record.Eventf(openStackCluster, "SuccessfulCreateLoadBalancer", "Created load balancer %s with id %s", loadBalancerName, lb.ID)
return lb, nil
}

func (s *Service) getOrCreateListener(openStackCluster *infrav1.OpenStackCluster, listenerName, lbID string, port int) (*listeners.Listener, error) {
listener, err := s.checkIfListenerExists(listenerName)
if err != nil {
return nil, err
}

if listener != nil {
return listener, nil
}

s.logger.Info("Creating load balancer listener", "name", listenerName, "lb-id", lbID)

listenerCreateOpts := listeners.CreateOpts{
Name: listenerName,
Protocol: "TCP",
ProtocolPort: port,
LoadbalancerID: lbID,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_listener", "create")
listener, err = listeners.Create(s.loadbalancerClient, listenerCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
record.Warnf(openStackCluster, "FailedCreateListener", "Failed to create listener %s: %v", listenerName, err)
return nil, err
}

if err := s.waitForLoadBalancerActive(lbID); err != nil {
record.Warnf(openStackCluster, "FailedCreateListener", "Failed to create listener %s with id %s: wait for load balancer active %s: %v", listenerName, listener.ID, lbID, err)
return nil, err
}

if err := s.waitForListener(listener.ID, "ACTIVE"); err != nil {
record.Warnf(openStackCluster, "FailedCreateListener", "Failed to create listener %s with id %s: wait for listener active: %v", listenerName, listener.ID, err)
return nil, err
}

record.Eventf(openStackCluster, "SuccessfulCreateListener", "Created listener %s with id %s", listenerName, listener.ID)
return listener, nil
}

func (s *Service) getOrCreatePool(openStackCluster *infrav1.OpenStackCluster, poolName, listenerID, lbID string) (*pools.Pool, error) {
pool, err := s.checkIfPoolExists(poolName)
if err != nil {
return nil, err
}

if pool != nil {
return pool, nil
}

s.logger.Info(fmt.Sprintf("Creating load balancer pool for listener %q", listenerID), "name", poolName, "lb-id", lbID)

poolCreateOpts := pools.CreateOpts{
Name: poolName,
Protocol: "TCP",
LBMethod: pools.LBMethodRoundRobin,
ListenerID: listenerID,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_pool", "create")
pool, err = pools.Create(s.loadbalancerClient, poolCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
record.Warnf(openStackCluster, "FailedCreatePool", "Failed to create pool %s: %v", poolName, err)
return nil, err
}

if err := s.waitForLoadBalancerActive(lbID); err != nil {
record.Warnf(openStackCluster, "FailedCreatePool", "Failed to create pool %s with id %s: wait for load balancer active %s: %v", poolName, pool.ID, lbID, err)
return nil, err
}

record.Eventf(openStackCluster, "SuccessfulCreatePool", "Created pool %s with id %s", poolName, pool.ID)
return pool, nil
}

func (s *Service) getOrCreateMonitor(openStackCluster *infrav1.OpenStackCluster, monitorName, poolID, lbID string) error {
monitor, err := s.checkIfMonitorExists(monitorName)
if err != nil {
return err
}

if monitor != nil {
return nil
}

s.logger.Info(fmt.Sprintf("Creating load balancer monitor for pool %q", poolID), "name", monitorName, "lb-id", lbID)

monitorCreateOpts := monitors.CreateOpts{
Name: monitorName,
PoolID: poolID,
Type: "TCP",
Delay: 30,
Timeout: 5,
MaxRetries: 3,
}
mc := metrics.NewMetricPrometheusContext("loadbalancer_healthmonitor", "create")
monitor, err = monitors.Create(s.loadbalancerClient, monitorCreateOpts).Extract()
if mc.ObserveRequest(err) != nil {
record.Warnf(openStackCluster, "FailedCreateMonitor", "Failed to create monitor %s: %v", monitorName, err)
return err
}

if err = s.waitForLoadBalancerActive(lbID); err != nil {
record.Warnf(openStackCluster, "FailedCreateMonitor", "Failed to create monitor %s with id %s: wait for load balancer active %s: %v", monitorName, monitor.ID, lbID, err)
return err
}

record.Eventf(openStackCluster, "SuccessfulCreateMonitor", "Created monitor %s with id %s", monitorName, monitor.ID)
return nil
}

func (s *Service) ReconcileLoadBalancerMember(openStackCluster *infrav1.OpenStackCluster, machine *clusterv1.Machine, openStackMachine *infrav1.OpenStackMachine, clusterName, ip string) error {
if !util.IsControlPlaneMachine(machine) {
return nil
Expand Down

0 comments on commit 7326419

Please sign in to comment.