Skip to content

Commit

Permalink
Support for a kubernetes service name as a target (instead of pod sel…
Browse files Browse the repository at this point in the history
…ector)
  • Loading branch information
grs committed Jul 20, 2020
1 parent d66e9b7 commit 16283db
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 42 deletions.
10 changes: 6 additions & 4 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
TransportViewRoleName string = "skupper-view"
TransportEnvConfig string = "QDROUTERD_CONF"
TransportSaslConfig string = "skupper-sasl-config"
DefaultBridgeServerImage string = "quay.io/skupper/bridge-server:0.3.0"
DefaultBridgeServerImage string = "quay.io/gordons/bridge-server:override"
BridgeServerContainerName string = "bridge-server"
)

Expand All @@ -69,8 +69,8 @@ var TransportPrometheusAnnotations = map[string]string{
// Controller constants
const (
ControllerDeploymentName string = "skupper-service-controller"
ControllerComponentName string = "proxy-controller"
DefaultControllerImage string = "quay.io/skupper/service-controller"
ControllerComponentName string = "service-controller"
DefaultControllerImage string = "quay.io/gordons/service-controller:kndemo"
ControllerContainerName string = "service-controller"
DefaultProxyImage string = "quay.io/skupper/proxy"
ControllerServiceAccountName string = "skupper-proxy-controller"
Expand Down Expand Up @@ -98,6 +98,7 @@ const (
AddressQualifier string = BaseQualifier + "/address"
PortQualifier string = BaseQualifier + "/port"
ProxyQualifier string = BaseQualifier + "/proxy"
TargetServiceQualifier string = BaseQualifier + "/target"
ControlledQualifier string = InternalQualifier + "/controlled"
ServiceQualifier string = InternalQualifier + "/service"
OriginQualifier string = InternalQualifier + "/origin"
Expand Down Expand Up @@ -324,8 +325,9 @@ type ServiceInterface struct {

type ServiceInterfaceTarget struct {
Name string `json:"name,omitempty"`
Selector string `json:"selector"`
Selector string `json:"selector,omitempty"`
TargetPort int `json:"targetPort,omitempty"`
Service string `json:"service,omitempty"`
}

type Headless struct {
Expand Down
8 changes: 7 additions & 1 deletion client/van_serviceinterface_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func getServiceInterfaceTarget(targetType string, targetName string, deducePort
}
} else if targetType == "pods" {
return nil, fmt.Errorf("VAN service interfaces for pods not yet implemented")
} else if targetType == "service" {
target := types.ServiceInterfaceTarget{
Name: targetName,
Service: targetName,
}
return &target, nil
} else {
return nil, fmt.Errorf("VAN service interface unsupported target type")
}
Expand Down Expand Up @@ -312,7 +318,7 @@ func removeServiceInterfaceTarget(serviceName string, targetName string, deleteI
}

func (cli *VanClient) VanServiceInterfaceUnbind(ctx context.Context, targetType string, targetName string, address string, deleteIfNoTargets bool) error {
if targetType == "deployment" || targetType == "statefulset" {
if targetType == "deployment" || targetType == "statefulset" || targetType == "service" {
if address == "" {
err := removeServiceInterfaceTarget(targetName, targetName, deleteIfNoTargets, cli)
return err
Expand Down
91 changes: 72 additions & 19 deletions cmd/service-controller/bridges.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type HttpBridge struct {
Http2 bool
Aggregation string
EventChannel bool
HostOverride string
}

func (b *HttpBridge) toMap() map[string]interface{} {
Expand All @@ -69,6 +70,7 @@ func (b *HttpBridge) toMap() map[string]interface{} {
"http2": b.Http2,
"aggregation": b.Aggregation,
"eventChannel": b.EventChannel,
"hostOverride": b.HostOverride,
}
}

Expand All @@ -92,6 +94,7 @@ type BridgeConfiguration struct {

type EgressBindings struct {
selector string
service string
egressPort int
informer cache.SharedIndexInformer
stopper chan struct{}
Expand Down Expand Up @@ -150,6 +153,15 @@ func hasTargetForSelector(si types.ServiceInterface, selector string) bool {
return false
}

func hasTargetForService(si types.ServiceInterface, service string) bool {
for _, t := range si.Targets {
if t.Service == service {
return true
}
}
return false
}

func (c *Controller) updateServiceBindings(required types.ServiceInterface, portAllocations map[string]int) error {
bindings := c.bindings[required.Address]
if bindings == nil {
Expand All @@ -169,7 +181,11 @@ func (c *Controller) updateServiceBindings(required types.ServiceInterface, port
}
sb := newServiceBindings(required.Origin, required.Protocol, required.Address, required.Port, required.Headless, port, required.Aggregate, required.EventChannel)
for _, t := range required.Targets {
sb.addTarget(t.Selector, getTargetPort(required, t), c)
if t.Selector != "" {
sb.addSelectorTarget(t.Selector, getTargetPort(required, t), c)
} else if t.Service != "" {
sb.addServiceTarget(t.Service, getTargetPort(required, t), c)
}
}
c.bindings[required.Address] = sb
} else {
Expand All @@ -195,16 +211,31 @@ func (c *Controller) updateServiceBindings(required types.ServiceInterface, port
}
for _, t := range required.Targets {
targetPort := getTargetPort(required, t)
target := bindings.targets[t.Selector]
if target == nil {
bindings.addTarget(t.Selector, targetPort, c)
} else if target.egressPort != targetPort {
target.egressPort = targetPort
if t.Selector != "" {
target := bindings.targets[t.Selector]
if target == nil {
bindings.addSelectorTarget(t.Selector, targetPort, c)
} else if target.egressPort != targetPort {
target.egressPort = targetPort
}
} else if t.Service != "" {
target := bindings.targets[t.Service]
if target == nil {
bindings.addServiceTarget(t.Service, targetPort, c)
} else if target.egressPort != targetPort {
target.egressPort = targetPort
}
}
}
for k, _ := range bindings.targets {
if !hasTargetForSelector(required, k) {
bindings.removeTarget(k)
for k, v := range bindings.targets {
if v.selector != "" {
if !hasTargetForSelector(required, k) {
bindings.removeSelectorTarget(k)
}
} else if v.service != "" {
if !hasTargetForService(required, k) {
bindings.removeServiceTarget(k)
}
}
}
}
Expand All @@ -225,7 +256,7 @@ func newServiceBindings(origin string, protocol string, address string, publicPo
}
}

func (sb *ServiceBindings) addTarget(selector string, port int, controller *Controller) error {
func (sb *ServiceBindings) addSelectorTarget(selector string, port int, controller *Controller) error {
sb.targets[selector] = &EgressBindings{
selector: selector,
egressPort: port,
Expand All @@ -243,11 +274,25 @@ func (sb *ServiceBindings) addTarget(selector string, port int, controller *Cont
return sb.targets[selector].start()
}

func (sb *ServiceBindings) removeTarget(selector string) {
func (sb *ServiceBindings) removeSelectorTarget(selector string) {
sb.targets[selector].stop()
delete(sb.targets, selector)
}

func (sb *ServiceBindings) addServiceTarget(service string, port int, controller *Controller) error {
sb.targets[service] = &EgressBindings{
service: service,
egressPort: port,
stopper: make(chan struct{}),
}
return nil
}

func (sb *ServiceBindings) removeServiceTarget(service string) {
delete(sb.targets, service)
}


func (sb *ServiceBindings) stop() {
for _, v := range sb.targets {
if v != nil {
Expand Down Expand Up @@ -278,11 +323,15 @@ func (eb *EgressBindings) stop() {
}

func (eb *EgressBindings) updateBridgeConfiguration(protocol string, address string, siteId string, bridges *BridgeConfiguration) {
pods := eb.informer.GetStore().List()
for _, p := range pods {
pod := p.(*corev1.Pod)
log.Printf("Adding pod for %s: %s", address, pod.ObjectMeta.Name)
addEgressBridge(protocol, pod.Status.PodIP, eb.egressPort, address, siteId, bridges)
if eb.selector != "" {
pods := eb.informer.GetStore().List()
for _, p := range pods {
pod := p.(*corev1.Pod)
log.Printf("Adding pod for %s: %s", address, pod.ObjectMeta.Name)
addEgressBridge(protocol, pod.Status.PodIP, eb.egressPort, address, siteId, "", bridges)
}
} else if eb.service != "" {
addEgressBridge(protocol, eb.service, eb.egressPort, address, siteId, eb.service, bridges)
}
}

Expand Down Expand Up @@ -416,18 +465,22 @@ func (m NestedHttpBridgeMap) add(b HttpBridge) {
}
}

func addEgressBridge(protocol string, host string, port int, address string, siteId string, bridges *BridgeConfiguration) (bool, error) {
func addEgressBridge(protocol string, host string, port int, address string, siteId string, hostOverride string, bridges *BridgeConfiguration) (bool, error) {
switch protocol {
case ProtocolHTTP:
bridges.HttpConnectors.add(HttpBridge{
b := HttpBridge{
Bridge: Bridge{
Name: getBridgeName(address, host),
Host: host,
Port: port,
Address: address,
SiteId: siteId,
},
})
}
if hostOverride != "" {
b.HostOverride = hostOverride
}
bridges.HttpConnectors.add(b)
case ProtocolHTTP2:
bridges.Http2Connectors.add(HttpBridge{
Bridge: Bridge{
Expand Down
34 changes: 34 additions & 0 deletions cmd/service-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsv1informer "k8s.io/client-go/informers/apps/v1"
Expand Down Expand Up @@ -267,10 +268,43 @@ func (c *Controller) createHeadlessServiceFor(desired *ServiceBindings) error {
return err
}

func equivalentSelectors(a map[string]string, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if v2, ok := b[k]; !ok || v != v2 {
return false
}
}
for k, v := range b {
if v2, ok := a[k]; !ok || v != v2 {
return false
}
}
return true
}

func (c *Controller) checkServiceFor(desired *ServiceBindings, actual *corev1.Service) error {
//selector, port, targetPort
// TODO: check services changes
log.Printf("We need to check service changes for %s", actual.ObjectMeta.Name)
update := false
if actual.Spec.Ports[0].Port != int32(desired.publicPort) {
update = true
actual.Spec.Ports[0].Port = int32(desired.publicPort)
}
if actual.Spec.Ports[0].TargetPort.IntValue() != desired.ingressPort {
update = true
actual.Spec.Ports[0].TargetPort = intstr.FromInt(desired.ingressPort)
}
if !equivalentSelectors(actual.Spec.Selector, kube.GetLabelsForRouter()) {
actual.Spec.Selector = kube.GetLabelsForRouter()
}
if update {
_, err := c.vanClient.KubeClient.CoreV1().Services(c.vanClient.Namespace).Update(actual)
return err
}
return nil
}

Expand Down
30 changes: 21 additions & 9 deletions cmd/service-controller/definition_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,27 @@ func (m *DefinitionMonitor) getServiceDefinitionFromAnnotatedService(service *co
} else {
svc.Address = service.ObjectMeta.Name
}
target := types.ServiceInterfaceTarget{
Name: service.ObjectMeta.Name,
Selector: utils.StringifySelector(service.Spec.Selector),
}
if targetPort := deduceTargetPortFromService(service); targetPort != 0 {
target.TargetPort = targetPort
}
svc.Targets = []types.ServiceInterfaceTarget{
target,
if target, ok := service.ObjectMeta.Annotations[types.TargetServiceQualifier]; ok {
svc.Targets = []types.ServiceInterfaceTarget{
types.ServiceInterfaceTarget{
Name: target,
Service: target,
},
}
} else if service.Spec.Selector != nil {
target := types.ServiceInterfaceTarget{
Name: service.ObjectMeta.Name,
Selector: utils.StringifySelector(service.Spec.Selector),
}
if targetPort := deduceTargetPortFromService(service); targetPort != 0 {
target.TargetPort = targetPort
}
svc.Targets = []types.ServiceInterfaceTarget{
target,
}
} else {
log.Printf("Ignoring annotated service %s; no selector defined", service.ObjectMeta.Name)
return svc, false
}
svc.Origin = "annotation"
return svc, true
Expand Down
29 changes: 22 additions & 7 deletions cmd/skupper/skupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ type ExposeOptions struct {
func expose(cli *client.VanClient, ctx context.Context, targetType string, targetName string, options ExposeOptions) error {
serviceName := options.Address
if serviceName == "" {
serviceName = targetName
if targetType == "service" {
return fmt.Errorf("The --address option is required for target type 'service'")
} else {
serviceName = targetName
}
}
service, err := cli.VanServiceInterfaceInspect(ctx, serviceName)
if service == nil {
Expand Down Expand Up @@ -91,8 +95,8 @@ func exposeTarget() func(*cobra.Command, []string) error {
parts := strings.Split(args[0], "/")
targetType = parts[0]
}
if targetType != "deployment" && targetType != "statefulset" && targetType != "pods" {
return fmt.Errorf("expose target type must be one of 'deployment', 'statefulset' or 'pods'")
if targetType != "deployment" && targetType != "statefulset" && targetType != "pods" && targetType != "service" {
return fmt.Errorf("expose target type must be one of 'deployment', 'statefulset', 'service' or 'pods'")
}
return nil
}
Expand Down Expand Up @@ -440,7 +444,7 @@ func main() {

exposeOpts := ExposeOptions{}
var cmdExpose = &cobra.Command{
Use: "expose [deployment <name>|pods <selector>|statefulset <statefulsetname>]",
Use: "expose [deployment <name>|pods <selector>|statefulset <statefulsetname>|service <name>]",
Short: "Expose a set of pods through a Skupper address",
Args: exposeTarget(),
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -461,7 +465,12 @@ func main() {
if err == nil {
address := exposeOpts.Address
if address == "" {
address = targetType
if args[0] == "service" {
fmt.Printf("--address option is required for target type 'service'")
os.Exit(1)
} else {
address = targetType
}
}
fmt.Printf("%s %s exposed as %s\n", targetType, targetName, address)
} else if errors.IsNotFound(err) {
Expand All @@ -481,7 +490,7 @@ func main() {

var unexposeAddress string
var cmdUnexpose = &cobra.Command{
Use: "unexpose [deployment <name>|pods <selector>|statefulset <statefulsetname>]",
Use: "unexpose [deployment <name>|pods <selector>|statefulset <statefulsetname>|service <name>]",
Short: "Unexpose a set of pods previously exposed through a Skupper address",
Args: exposeTarget(),
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -531,7 +540,13 @@ func main() {
if t.Name != "" {
name = fmt.Sprintf("name=%s", t.Name)
}
fmt.Printf(" => %s %s", t.Selector, name)
if t.Selector != "" {
fmt.Printf(" => %s %s", t.Selector, name)
} else if t.Service != "" {
fmt.Printf(" => %s %s", t.Service, name)
} else {
fmt.Printf(" => %s (no selector)", name)
}
fmt.Println()
}
}
Expand Down
Loading

0 comments on commit 16283db

Please sign in to comment.