Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

enhance the external access support #64

Merged
merged 3 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion docs/external-access.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,24 @@ inter.broker.listener.name=INTERNAL
[ ... lines removed for clarity ...]
```

#####
# External-dns and third-party annotations

KUDO Kafka supports adding annotations to the services created for external access.

For example for external-dns, user can use the parameter `EXTERNAL_SERVICE_ANNOTATIONS` with value:

```
- external-dns.alpha.kubernetes.io/hostname: my-broker-0.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-1.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-2.example.org
```

This will add the annotation to first three services used for external access. If the KUDO Kafka cluster is of 5 brokers, the parameter value will look like:

```
- external-dns.alpha.kubernetes.io/hostname: my-broker-0.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-1.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-2.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-3.example.org
- external-dns.alpha.kubernetes.io/hostname: my-broker-4.example.org
```
5 changes: 5 additions & 0 deletions docs/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## latest

- Support node internal IP for external access
- Support annotations for external access services

## v1.3.2

- Apache Kafka upgraded to 2.5.1

## v1.3.1
Expand Down
18 changes: 16 additions & 2 deletions images/kafka/kafka-utils/pkgs/mocks/environment_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions images/kafka/kafka-utils/pkgs/service/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Environment interface {
GetNamespace() string
GetExternalIngressPort() string
GetNodeName() string
GetNodePortIPType() string
}

type EnvironmentImpl struct{}
Expand All @@ -28,3 +29,7 @@ func (c *EnvironmentImpl) GetExternalIngressPort() string {
func (c *EnvironmentImpl) GetNodeName() string {
return os.Getenv("NODE_NAME")
}

func (c *EnvironmentImpl) GetNodePortIPType() string {
return os.Getenv("EXTERNAL_NODEPORT_IP_TYPE")
}
55 changes: 44 additions & 11 deletions images/kafka/kafka-utils/pkgs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,33 @@ func (c *KafkaService) WriteIngressToPath(path string) error {
}
case v1.ServiceTypeNodePort:
log.Infoln("detected ", v1.ServiceTypeNodePort)
externalIP, err := c.getNodeExternalIP()
if err != nil {
return err
}
log.Infoln("detected ExternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
switch c.Env.GetNodePortIPType() {
case "EXTERNAL":
externalIP, err := c.getNodeExternalIP()
if err != nil {
return err
}
log.Infoln("detected ExternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
}
case "INTERNAL":
externalIP, err := c.getNodeInternalIP()
if err != nil {
return err
}
log.Infoln("detected InternalIP: ", externalIP)
ingressStatus = []v1.LoadBalancerIngress{
{
IP: externalIP,
Hostname: "",
},
}
default:
return fmt.Errorf("NodePortIPType '%s' not supported", c.Env.GetNodePortIPType())
}
for _, port := range kafkaService.Spec.Ports {
c.Port = port.NodePort
Expand Down Expand Up @@ -189,6 +206,7 @@ func (c *KafkaService) writeListenerSecurityProtocolMap(ingresses []v1.LoadBalan
log.Infof("created the %s file", path)
return nil
}

func (c *KafkaService) getNodeExternalIP() (string, error) {
node, err := c.Client.CoreV1().Nodes().Get(c.Env.GetNodeName(), metav1.GetOptions{})
if err != nil {
Expand All @@ -200,8 +218,23 @@ func (c *KafkaService) getNodeExternalIP() (string, error) {
return address.Address, nil
}
}
return "", fmt.Errorf("no node found with name '%s'", c.Env.GetNodeName())
return "", fmt.Errorf("no external IP found for node '%s'", c.Env.GetNodeName())
}

func (c *KafkaService) getNodeInternalIP() (string, error) {
node, err := c.Client.CoreV1().Nodes().Get(c.Env.GetNodeName(), metav1.GetOptions{})
if err != nil {
log.Errorf("error fetching nodes internal IP :%s", err)
return "", err
}
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
return address.Address, nil
}
}
return "", fmt.Errorf("no internal IP found for node '%s'", c.Env.GetNodeName())
}

func (c *KafkaService) writeListenerDNS(ingresses []v1.LoadBalancerIngress, path string) error {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
Expand Down
Loading