Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
enhance the external access support (#64)
Browse files Browse the repository at this point in the history
* add support for adding annotations to the external services
* add support for node internal IP using the parameter EXTERNAL_NODEPORT_IP_TYPE
  • Loading branch information
zmalik authored Oct 27, 2020
1 parent c159a9b commit 5e7e450
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 135 deletions.
22 changes: 21 additions & 1 deletion docs/external-access.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,24 @@ inter.broker.listener.name=INTERNAL
[ ... lines removed for clarity ...]
```

#####
# External-dns and third-party annotations

KUDO Kafka supports adding annotations to the services created for external access.

For example for external-dns, user can use the parameter `EXTERNAL_SERVICE_ANNOTATIONS` with value:

```
- external-dns.alpha.kubernetes.io/hostname: my-broker-0.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-1.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-2.example.org
```

This will add the annotation to first three services used for external access. If the KUDO Kafka cluster is of 5 brokers, the parameter value will look like:

```
- external-dns.alpha.kubernetes.io/hostname: my-broker-0.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-1.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-2.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-3.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-4.example.org
```
5 changes: 5 additions & 0 deletions docs/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## latest

- Support node internal IP for external access
- Support annotations for external access services

## v1.3.2

- Apache Kafka upgraded to 2.5.1

## v1.3.1
Expand Down
18 changes: 16 additions & 2 deletions images/kafka/kafka-utils/pkgs/mocks/environment_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions images/kafka/kafka-utils/pkgs/service/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Environment interface {
GetNamespace() string
GetExternalIngressPort() string
GetNodeName() string
GetNodePortIPType() string
}

type EnvironmentImpl struct{}
Expand All @@ -28,3 +29,7 @@ func (c *EnvironmentImpl) GetExternalIngressPort() string {
func (c *EnvironmentImpl) GetNodeName() string {
return os.Getenv("NODE_NAME")
}

func (c *EnvironmentImpl) GetNodePortIPType() string {
return os.Getenv("EXTERNAL_NODEPORT_IP_TYPE")
}
55 changes: 44 additions & 11 deletions images/kafka/kafka-utils/pkgs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,33 @@ func (c *KafkaService) WriteIngressToPath(path string) error {
}
case v1.ServiceTypeNodePort:
log.Infoln("detected ", v1.ServiceTypeNodePort)
externalIP, err := c.getNodeExternalIP()
if err != nil {
return err
}
log.Infoln("detected ExternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
switch c.Env.GetNodePortIPType() {
case "EXTERNAL":
externalIP, err := c.getNodeExternalIP()
if err != nil {
return err
}
log.Infoln("detected ExternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
}
case "INTERNAL":
externalIP, err := c.getNodeInternalIP()
if err != nil {
return err
}
log.Infoln("detected InternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
}
default:
return fmt.Errorf("NodePortIPType '%s' not supported", c.Env.GetNodePortIPType())
}
for _, port := range kafkaService.Spec.Ports {
c.Port = port.NodePort
Expand Down Expand Up @@ -189,6 +206,7 @@ func (c *KafkaService) writeListenerSecurityProtocolMap(ingresses []v1.LoadBalan
log.Infof("created the %s file", path)
return nil
}

func (c *KafkaService) getNodeExternalIP() (string, error) {
node, err := c.Client.CoreV1().Nodes().Get(c.Env.GetNodeName(), metav1.GetOptions{})
if err != nil {
Expand All @@ -200,8 +218,23 @@ func (c *KafkaService) getNodeExternalIP() (string, error) {
return address.Address, nil
}
}
return "", fmt.Errorf("no node found with name '%s'", c.Env.GetNodeName())
return "", fmt.Errorf("no external IP found for node '%s'", c.Env.GetNodeName())
}

func (c *KafkaService) getNodeInternalIP() (string, error) {
node, err := c.Client.CoreV1().Nodes().Get(c.Env.GetNodeName(), metav1.GetOptions{})
if err != nil {
log.Errorf("error fetching nodes internal IP :%s", err)
return "", err
}
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
return address.Address, nil
}
}
return "", fmt.Errorf("no internal IP found for node '%s'", c.Env.GetNodeName())
}

func (c *KafkaService) writeListenerDNS(ingresses []v1.LoadBalancerIngress, path string) error {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
Expand Down
Loading

0 comments on commit 5e7e450

Please sign in to comment.