Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Service without selectors #222

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ If `--internal-default-backend` is used, the controller configures
nghttpx to act as a default backend. In this case, the default
backend service is not necessary.

## Services without selectors

nghttpx supports [Services without
selectors](https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors).

## Logs

The access, and error log of nghttpx are written to stdout, and stderr
Expand Down
67 changes: 58 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func NewLoadBalancerController(clientset clientset.Interface, nghttpx nghttpx.Se

allNSInformers := informers.NewSharedInformerFactory(lbc.clientset, noResyncPeriod)

if !config.EnableEndpointSlice {
{
f := allNSInformers.Core().V1().Endpoints()
lbc.epLister = f.Lister()
lbc.epInformer = f.Informer()
Expand All @@ -261,8 +261,9 @@ func NewLoadBalancerController(clientset clientset.Interface, nghttpx nghttpx.Se
UpdateFunc: lbc.updateEndpointsNotification,
DeleteFunc: lbc.deleteEndpointsNotification,
})
}

} else {
if config.EnableEndpointSlice {
epSliceInformers := informers.NewSharedInformerFactoryWithOptions(lbc.clientset, noResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector =
Expand Down Expand Up @@ -1367,15 +1368,65 @@ func (lbc *LoadBalancerController) getEndpoints(svc *corev1.Service, svcPort *co
svc.Namespace, svc.Name, svcPort.Port, svcPort.TargetPort.String())

switch {
case lbc.epLister != nil:
return lbc.getEndpointsFromEndpoints(svc, svcPort, backendConfig)
case len(svc.Spec.Selector) == 0:
return lbc.getEndpointsWithoutServiceSelectors(svc, svcPort, backendConfig)
case lbc.epSliceLister != nil:
return lbc.getEndpointsFromEndpointSlice(svc, svcPort, backendConfig)
default:
panic("unreachable")
return lbc.getEndpointsFromEndpoints(svc, svcPort, backendConfig)
}
}

func (lbc *LoadBalancerController) getEndpointsWithoutServiceSelectors(svc *corev1.Service, svcPort *corev1.ServicePort, backendConfig *nghttpx.BackendConfig) ([]nghttpx.Backend, error) {
var targetPort int32

switch {
case svcPort.TargetPort.IntVal != 0:
targetPort = svcPort.TargetPort.IntVal
case svcPort.TargetPort.StrVal == "":
targetPort = svcPort.Port
default:
return nil, fmt.Errorf("Service %v/%v must have integer target port if specified: %v", svc.Namespace, svc.Name, svcPort.TargetPort)
}

ep, err := lbc.epLister.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
klog.Errorf("unexpected error obtaining service endpoints: %v", err)
return nil, err
}

var backends []nghttpx.Backend

for i := range ep.Subsets {
ss := &ep.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]

if port.Protocol != "" && port.Protocol != corev1.ProtocolTCP {
continue
}

for i := range ss.Addresses {
epAddr := &ss.Addresses[i]

if targetPort != port.Port {
continue
}

ip := net.ParseIP(epAddr.IP)
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
}

backends = append(backends, lbc.createBackend(svc, epAddr.IP, targetPort, backendConfig))
}
}
}

klog.V(3).Infof("endpoints found: %+v", backends)
return backends, nil
}

func (lbc *LoadBalancerController) getEndpointsFromEndpoints(svc *corev1.Service, svcPort *corev1.ServicePort, backendConfig *nghttpx.BackendConfig) ([]nghttpx.Backend, error) {
ep, err := lbc.epLister.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
Expand Down Expand Up @@ -1602,6 +1653,7 @@ func (lbc *LoadBalancerController) Run(ctx context.Context) {
go lbc.cmInformer.Run(ctrlCtx.Done())
go lbc.podInformer.Run(ctrlCtx.Done())
go lbc.ingClassInformer.Run(ctrlCtx.Done())
go lbc.epInformer.Run(ctrlCtx.Done())

hasSynced := []cache.InformerSynced{
lbc.ingInformer.HasSynced,
Expand All @@ -1610,12 +1662,9 @@ func (lbc *LoadBalancerController) Run(ctx context.Context) {
lbc.cmInformer.HasSynced,
lbc.podInformer.HasSynced,
lbc.ingClassInformer.HasSynced,
lbc.epInformer.HasSynced,
}

if lbc.epInformer != nil {
go lbc.epInformer.Run(ctrlCtx.Done())
hasSynced = append(hasSynced, lbc.epInformer.HasSynced)
}
if lbc.epSliceInformer != nil {
go lbc.epSliceInformer.Run(ctrlCtx.Done())
hasSynced = append(hasSynced, lbc.epSliceInformer.HasSynced)
Expand Down
162 changes: 156 additions & 6 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,10 @@ func (f *fixture) setupStore() {
panic(err)
}
}
} else {
for _, ep := range f.epStore {
if err := f.lbc.epInformer.GetIndexer().Add(ep); err != nil {
panic(err)
}
}
for _, ep := range f.epStore {
if err := f.lbc.epInformer.GetIndexer().Add(ep); err != nil {
panic(err)
}
}
for _, svc := range f.svcStore {
Expand Down Expand Up @@ -329,6 +328,9 @@ func newDefaultBackend() (*corev1.Service, *corev1.Endpoints, []*discoveryv1.End
Namespace: defaultBackendNamespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app.kubernetes.io/name": defaultBackendName,
},
Ports: []corev1.ServicePort{
{
Port: 8181,
Expand Down Expand Up @@ -476,6 +478,55 @@ func newDefaultBackend() (*corev1.Service, *corev1.Endpoints, []*discoveryv1.End
return svc, eps, ess
}

// newDefaultBackendWithoutSelectors returns Service and Endpoints for default backend without Service selectors.
func newDefaultBackendWithoutSelectors() (*corev1.Service, *corev1.Endpoints) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: defaultBackendName,
Namespace: defaultBackendNamespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 8181,
TargetPort: intstr.FromInt(8080),
Protocol: corev1.ProtocolTCP,
},
},
},
}
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: defaultBackendName,
Namespace: defaultBackendNamespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.100.1",
},
{
IP: "192.168.100.2",
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: 8081,
},
{
Protocol: corev1.ProtocolTCP,
Port: 8080,
},
},
},
},
}

return svc, eps
}

func newBackend(namespace, name string, addrs []string) (*corev1.Service, *corev1.Endpoints, *discoveryv1.EndpointSlice) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -567,6 +618,55 @@ func newBackend(namespace, name string, addrs []string) (*corev1.Service, *corev
return svc, eps, es
}

func newBackendWithoutSelectors(namespace, name string, addrs []string) (*corev1.Service, *corev1.Endpoints) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 8281,
TargetPort: intstr.FromInt(80),
Protocol: corev1.ProtocolTCP,
},
},
},
}
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: 81,
},
{
Protocol: corev1.ProtocolTCP,
Port: 80,
},
},
},
},
}

endpointAddrs := make([]corev1.EndpointAddress, len(addrs))
for i, addr := range addrs {
endpointAddrs[i] = corev1.EndpointAddress{
IP: addr,
}
}

eps.Subsets[0].Addresses = endpointAddrs

return svc, eps
}

type ingressBuilder struct {
*networkingv1.Ingress
}
Expand Down Expand Up @@ -702,6 +802,7 @@ func TestSyncDefaultBackend(t *testing.T) {
tests := []struct {
desc string
enableEndpointSlice bool
withoutSelectors bool
}{
{
desc: "With Endpoints",
Expand All @@ -710,6 +811,10 @@ func TestSyncDefaultBackend(t *testing.T) {
desc: "With EndpointSlice",
enableEndpointSlice: true,
},
{
desc: "Without selectors",
withoutSelectors: true,
},
}

for _, tt := range tests {
Expand All @@ -721,7 +826,16 @@ func TestSyncDefaultBackend(t *testing.T) {
cm.Data[nghttpx.NghttpxExtraConfigKey] = "Test"
const mrubyContent = "mruby"
cm.Data[nghttpx.NghttpxMrubyFileContentKey] = mrubyContent
svc, eps, ess := newDefaultBackend()
var (
svc *corev1.Service
eps *corev1.Endpoints
ess []*discoveryv1.EndpointSlice
)
if tt.withoutSelectors {
svc, eps = newDefaultBackendWithoutSelectors()
} else {
svc, eps, ess = newDefaultBackend()
}

f.cmStore = append(f.cmStore, cm)
f.svcStore = append(f.svcStore, svc)
Expand Down Expand Up @@ -755,6 +869,9 @@ func TestSyncDefaultBackend(t *testing.T) {
if got, want := us.Address, "192.168.100.1"; got != want {
t.Errorf("0: us.Address = %v, want %v", got, want)
}
if got, want := us.Port, "8080"; got != want {
t.Errorf("0: us.Port = %v, want %v", got, want)
}
}

if got, want := flb.ingConfig.ExtraConfig, cm.Data[nghttpx.NghttpxExtraConfigKey]; got != want {
Expand Down Expand Up @@ -1115,6 +1232,39 @@ func TestSyncEmptyTargetPort(t *testing.T) {
}
}

// TestSyncWithoutSelectors verifies that the controller deals with Service without selectors.
func TestSyncWithoutSelectors(t *testing.T) {
f := newFixture(t)

svc, eps, _ := newDefaultBackend()

bs1, be1 := newBackendWithoutSelectors(metav1.NamespaceDefault, "alpha", []string{"192.168.10.1"})
ing1 := newIngressBuilder(bs1.Namespace, "alpha-ing").
WithRule("/", bs1.Name, serviceBackendPortNumber(bs1.Spec.Ports[0].Port)).
Complete()

f.svcStore = append(f.svcStore, svc, bs1)
f.epStore = append(f.epStore, eps, be1)
f.ingStore = append(f.ingStore, ing1)

f.objects = append(f.objects, svc, eps, bs1, be1, ing1)

f.prepare()
f.run()

flb := f.lbc.nghttpx.(*fakeLoadBalancer)
ingConfig := flb.ingConfig

if got, want := len(ingConfig.Upstreams), 2; got != want {
t.Errorf("len(ingConfig.Upstreams) = %v, want %v", got, want)
}

backend := ingConfig.Upstreams[0].Backends[0]
if got, want := backend.Port, "80"; got != want {
t.Errorf("backend.Port = %v, want %v", got, want)
}
}

// TestValidateIngressClass verifies validateIngressClass.
func TestValidateIngressClass(t *testing.T) {
tests := []struct {
Expand Down