Skip to content

Commit

Permalink
Merge pull request #222 from zlabjp/service-without-selectors
Browse files Browse the repository at this point in the history
Support Service without selectors
  • Loading branch information
tatsuhiro-t authored Sep 15, 2022
2 parents c61a252 + 95f836f commit 71a355f
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 15 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ If `--internal-default-backend` is used, the controller configures
nghttpx to act as a default backend. In this case, the default
backend service is not necessary.

## Services without selectors

nghttpx supports [Services without
selectors](https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors).

## Logs

The access, and error log of nghttpx are written to stdout, and stderr
Expand Down
67 changes: 58 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func NewLoadBalancerController(clientset clientset.Interface, nghttpx nghttpx.Se

allNSInformers := informers.NewSharedInformerFactory(lbc.clientset, noResyncPeriod)

if !config.EnableEndpointSlice {
{
f := allNSInformers.Core().V1().Endpoints()
lbc.epLister = f.Lister()
lbc.epInformer = f.Informer()
Expand All @@ -261,8 +261,9 @@ func NewLoadBalancerController(clientset clientset.Interface, nghttpx nghttpx.Se
UpdateFunc: lbc.updateEndpointsNotification,
DeleteFunc: lbc.deleteEndpointsNotification,
})
}

} else {
if config.EnableEndpointSlice {
epSliceInformers := informers.NewSharedInformerFactoryWithOptions(lbc.clientset, noResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector =
Expand Down Expand Up @@ -1367,15 +1368,65 @@ func (lbc *LoadBalancerController) getEndpoints(svc *corev1.Service, svcPort *co
svc.Namespace, svc.Name, svcPort.Port, svcPort.TargetPort.String())

switch {
case lbc.epLister != nil:
return lbc.getEndpointsFromEndpoints(svc, svcPort, backendConfig)
case len(svc.Spec.Selector) == 0:
return lbc.getEndpointsWithoutServiceSelectors(svc, svcPort, backendConfig)
case lbc.epSliceLister != nil:
return lbc.getEndpointsFromEndpointSlice(svc, svcPort, backendConfig)
default:
panic("unreachable")
return lbc.getEndpointsFromEndpoints(svc, svcPort, backendConfig)
}
}

func (lbc *LoadBalancerController) getEndpointsWithoutServiceSelectors(svc *corev1.Service, svcPort *corev1.ServicePort, backendConfig *nghttpx.BackendConfig) ([]nghttpx.Backend, error) {
var targetPort int32

switch {
case svcPort.TargetPort.IntVal != 0:
targetPort = svcPort.TargetPort.IntVal
case svcPort.TargetPort.StrVal == "":
targetPort = svcPort.Port
default:
return nil, fmt.Errorf("Service %v/%v must have integer target port if specified: %v", svc.Namespace, svc.Name, svcPort.TargetPort)
}

ep, err := lbc.epLister.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
klog.Errorf("unexpected error obtaining service endpoints: %v", err)
return nil, err
}

var backends []nghttpx.Backend

for i := range ep.Subsets {
ss := &ep.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]

if port.Protocol != "" && port.Protocol != corev1.ProtocolTCP {
continue
}

for i := range ss.Addresses {
epAddr := &ss.Addresses[i]

if targetPort != port.Port {
continue
}

ip := net.ParseIP(epAddr.IP)
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
}

backends = append(backends, lbc.createBackend(svc, epAddr.IP, targetPort, backendConfig))
}
}
}

klog.V(3).Infof("endpoints found: %+v", backends)
return backends, nil
}

func (lbc *LoadBalancerController) getEndpointsFromEndpoints(svc *corev1.Service, svcPort *corev1.ServicePort, backendConfig *nghttpx.BackendConfig) ([]nghttpx.Backend, error) {
ep, err := lbc.epLister.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
Expand Down Expand Up @@ -1602,6 +1653,7 @@ func (lbc *LoadBalancerController) Run(ctx context.Context) {
go lbc.cmInformer.Run(ctrlCtx.Done())
go lbc.podInformer.Run(ctrlCtx.Done())
go lbc.ingClassInformer.Run(ctrlCtx.Done())
go lbc.epInformer.Run(ctrlCtx.Done())

hasSynced := []cache.InformerSynced{
lbc.ingInformer.HasSynced,
Expand All @@ -1610,12 +1662,9 @@ func (lbc *LoadBalancerController) Run(ctx context.Context) {
lbc.cmInformer.HasSynced,
lbc.podInformer.HasSynced,
lbc.ingClassInformer.HasSynced,
lbc.epInformer.HasSynced,
}

if lbc.epInformer != nil {
go lbc.epInformer.Run(ctrlCtx.Done())
hasSynced = append(hasSynced, lbc.epInformer.HasSynced)
}
if lbc.epSliceInformer != nil {
go lbc.epSliceInformer.Run(ctrlCtx.Done())
hasSynced = append(hasSynced, lbc.epSliceInformer.HasSynced)
Expand Down
162 changes: 156 additions & 6 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,10 @@ func (f *fixture) setupStore() {
panic(err)
}
}
} else {
for _, ep := range f.epStore {
if err := f.lbc.epInformer.GetIndexer().Add(ep); err != nil {
panic(err)
}
}
for _, ep := range f.epStore {
if err := f.lbc.epInformer.GetIndexer().Add(ep); err != nil {
panic(err)
}
}
for _, svc := range f.svcStore {
Expand Down Expand Up @@ -329,6 +328,9 @@ func newDefaultBackend() (*corev1.Service, *corev1.Endpoints, []*discoveryv1.End
Namespace: defaultBackendNamespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app.kubernetes.io/name": defaultBackendName,
},
Ports: []corev1.ServicePort{
{
Port: 8181,
Expand Down Expand Up @@ -476,6 +478,55 @@ func newDefaultBackend() (*corev1.Service, *corev1.Endpoints, []*discoveryv1.End
return svc, eps, ess
}

// newDefaultBackendWithoutSelectors returns Service and Endpoints for default backend without Service selectors.
func newDefaultBackendWithoutSelectors() (*corev1.Service, *corev1.Endpoints) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: defaultBackendName,
Namespace: defaultBackendNamespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 8181,
TargetPort: intstr.FromInt(8080),
Protocol: corev1.ProtocolTCP,
},
},
},
}
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: defaultBackendName,
Namespace: defaultBackendNamespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.100.1",
},
{
IP: "192.168.100.2",
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: 8081,
},
{
Protocol: corev1.ProtocolTCP,
Port: 8080,
},
},
},
},
}

return svc, eps
}

func newBackend(namespace, name string, addrs []string) (*corev1.Service, *corev1.Endpoints, *discoveryv1.EndpointSlice) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -567,6 +618,55 @@ func newBackend(namespace, name string, addrs []string) (*corev1.Service, *corev
return svc, eps, es
}

func newBackendWithoutSelectors(namespace, name string, addrs []string) (*corev1.Service, *corev1.Endpoints) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 8281,
TargetPort: intstr.FromInt(80),
Protocol: corev1.ProtocolTCP,
},
},
},
}
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: 81,
},
{
Protocol: corev1.ProtocolTCP,
Port: 80,
},
},
},
},
}

endpointAddrs := make([]corev1.EndpointAddress, len(addrs))
for i, addr := range addrs {
endpointAddrs[i] = corev1.EndpointAddress{
IP: addr,
}
}

eps.Subsets[0].Addresses = endpointAddrs

return svc, eps
}

type ingressBuilder struct {
*networkingv1.Ingress
}
Expand Down Expand Up @@ -702,6 +802,7 @@ func TestSyncDefaultBackend(t *testing.T) {
tests := []struct {
desc string
enableEndpointSlice bool
withoutSelectors bool
}{
{
desc: "With Endpoints",
Expand All @@ -710,6 +811,10 @@ func TestSyncDefaultBackend(t *testing.T) {
desc: "With EndpointSlice",
enableEndpointSlice: true,
},
{
desc: "Without selectors",
withoutSelectors: true,
},
}

for _, tt := range tests {
Expand All @@ -721,7 +826,16 @@ func TestSyncDefaultBackend(t *testing.T) {
cm.Data[nghttpx.NghttpxExtraConfigKey] = "Test"
const mrubyContent = "mruby"
cm.Data[nghttpx.NghttpxMrubyFileContentKey] = mrubyContent
svc, eps, ess := newDefaultBackend()
var (
svc *corev1.Service
eps *corev1.Endpoints
ess []*discoveryv1.EndpointSlice
)
if tt.withoutSelectors {
svc, eps = newDefaultBackendWithoutSelectors()
} else {
svc, eps, ess = newDefaultBackend()
}

f.cmStore = append(f.cmStore, cm)
f.svcStore = append(f.svcStore, svc)
Expand Down Expand Up @@ -755,6 +869,9 @@ func TestSyncDefaultBackend(t *testing.T) {
if got, want := us.Address, "192.168.100.1"; got != want {
t.Errorf("0: us.Address = %v, want %v", got, want)
}
if got, want := us.Port, "8080"; got != want {
t.Errorf("0: us.Port = %v, want %v", got, want)
}
}

if got, want := flb.ingConfig.ExtraConfig, cm.Data[nghttpx.NghttpxExtraConfigKey]; got != want {
Expand Down Expand Up @@ -1115,6 +1232,39 @@ func TestSyncEmptyTargetPort(t *testing.T) {
}
}

// TestSyncWithoutSelectors verifies that the controller deals with Service without selectors.
func TestSyncWithoutSelectors(t *testing.T) {
f := newFixture(t)

svc, eps, _ := newDefaultBackend()

bs1, be1 := newBackendWithoutSelectors(metav1.NamespaceDefault, "alpha", []string{"192.168.10.1"})
ing1 := newIngressBuilder(bs1.Namespace, "alpha-ing").
WithRule("/", bs1.Name, serviceBackendPortNumber(bs1.Spec.Ports[0].Port)).
Complete()

f.svcStore = append(f.svcStore, svc, bs1)
f.epStore = append(f.epStore, eps, be1)
f.ingStore = append(f.ingStore, ing1)

f.objects = append(f.objects, svc, eps, bs1, be1, ing1)

f.prepare()
f.run()

flb := f.lbc.nghttpx.(*fakeLoadBalancer)
ingConfig := flb.ingConfig

if got, want := len(ingConfig.Upstreams), 2; got != want {
t.Errorf("len(ingConfig.Upstreams) = %v, want %v", got, want)
}

backend := ingConfig.Upstreams[0].Backends[0]
if got, want := backend.Port, "80"; got != want {
t.Errorf("backend.Port = %v, want %v", got, want)
}
}

// TestValidateIngressClass verifies validateIngressClass.
func TestValidateIngressClass(t *testing.T) {
tests := []struct {
Expand Down

0 comments on commit 71a355f

Please sign in to comment.