Skip to content

Commit

Permalink
test:add kubeproxy test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyueyan committed May 7, 2023
1 parent d835a0e commit ad25da2
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 24 deletions.
7 changes: 7 additions & 0 deletions cmd/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "minik8s/pkg/controller"

func main() {
controller.Run()
}
2 changes: 1 addition & 1 deletion pkg/controller/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package controller

import "minik8s/utils"

func run() {
func Run() {
var sp svcPodHandler
var ss svcServiceHandler
go utils.Sync(ss)
Expand Down
20 changes: 18 additions & 2 deletions pkg/controller/svccontroller.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package controller

import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"log"
"minik8s/pkg/apiobject"
"minik8s/utils"
)
Expand All @@ -18,7 +19,7 @@ var IPMap = [1 << 8]bool{false}
var IPStart = "10.10.0."

var svcToEndpoints = map[string]*[]*apiobject.Endpoint{}
var svcList map[string]*apiobject.Service
var svcList = map[string]*apiobject.Service{}

type svcServiceHandler struct {
}
Expand Down Expand Up @@ -54,6 +55,8 @@ func (s svcServiceHandler) HandleCreate(message []byte) {
}
}
svcToEndpoints[svc.Spec.ClusterIP] = &edptList

log.Info("[svc controller] Create service. Cluster IP:", svc.Spec.ClusterIP)
}

func (s svcServiceHandler) HandleDelete(message []byte) {
Expand All @@ -66,10 +69,14 @@ func (s svcServiceHandler) HandleDelete(message []byte) {
utils.DeleteObject(utils.ENDPOINT, edpt.Data.Namespace, edpt.Data.Name)
}

log.Info("[svc controller] Delete service. Cluster IP:", svc.Spec.ClusterIP)
}

func (s svcServiceHandler) HandleUpdate(message []byte) {
svc := &apiobject.Service{}
svc.UnmarshalJSON(message)

log.Info("[svc controller] Update service. Cluster IP:", svc.Spec.ClusterIP)
}

func (s svcServiceHandler) GetType() utils.ObjType {
Expand All @@ -91,6 +98,8 @@ func (s svcPodHandler) HandleCreate(message []byte) {
}

func (s svcPodHandler) HandleDelete(message []byte) {
logInfo := "[svc controller] Delete endpoints.\n"

pod := &apiobject.Pod{}
pod.UnmarshalJSON(message)
// delete corresponding endpoints
Expand All @@ -101,13 +110,15 @@ func (s svcPodHandler) HandleDelete(message []byte) {
if edpt.Spec.DestIP == pod.Status.IP {
edpt := (*edptList)[key]
utils.DeleteObject(utils.ENDPOINT, edpt.Data.Namespace, edpt.Data.Name)
logInfo += fmt.Sprintf("srcIP:%s:%d, dstIP:%s:%d\n", edpt.Spec.SvcIP, edpt.Spec.SvcPort, edpt.Spec.DestIP, edpt.Spec.DestPort)
} else {
newEdptList = append(newEdptList, edpt)
}
}
svcToEndpoints[svc.Spec.ClusterIP] = &newEdptList
}

log.Info(logInfo)
}

func (s svcPodHandler) HandleUpdate(message []byte) {
Expand Down Expand Up @@ -144,6 +155,8 @@ func findDstPort(targetPort string, containers []apiobject.Container) int32 {
}

func createEndpoints(edptList *[]*apiobject.Endpoint, svc *apiobject.Service, pod *apiobject.Pod) {
logInfo := "[svc controller] Create endpoints.\n"

for _, port := range svc.Spec.Ports {
dstPort := findDstPort(port.TargetPort, pod.Spec.Containers)
spec := apiobject.EndpointSpec{
Expand All @@ -162,5 +175,8 @@ func createEndpoints(edptList *[]*apiobject.Endpoint, svc *apiobject.Service, po
}
utils.CreateObject(edpt, utils.ENDPOINT, svc.Data.Namespace)
*edptList = append(*edptList, edpt)
logInfo += fmt.Sprintf("srcIP:%s:%d, dstIP:%s:%d\n", svc.Spec.ClusterIP, port.Port, pod.Status.IP, dstPort)
}

log.Info(logInfo)
}
10 changes: 10 additions & 0 deletions pkg/controller/svccontroller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package controller

import (
"minik8s/utils"
"testing"
)

func TestSvcController(t *testing.T) {
utils.ApiServerIp = "localhost:8080"
}
25 changes: 14 additions & 11 deletions pkg/kubeproxy/ipvs/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ipvs
import (
"fmt"
"github.com/mqliang/libipvs"
"log"
log "github.com/sirupsen/logrus"
"net"
"os/exec"
"strconv"
Expand Down Expand Up @@ -33,25 +33,27 @@ func TestConfig() {

func AddService(ip string, port uint16) {
svc := addService(ip, port)
serviceIP := ip + ":" + string(port)
serviceIP := ip + ":" + strconv.Itoa(int(port))
Services[serviceIP] = &ServiceNode{
Service: svc,
Visited: true,
Service: svc,
Visited: true,
Endpoints: map[string]*EndpointNode{},
}
log.Info("[kubeproxy] Add service ", serviceIP)
}

func addService(ip string, port uint16) *libipvs.Service {
// Create a service struct and add it to the ipvs.
// Equal to the cmd: ipvsadm -A -t 10.10.0.1:8410 -s rr
svc := libipvs.Service{
svc := &libipvs.Service{
Address: net.ParseIP(ip),
AddressFamily: syscall.AF_INET,
Protocol: libipvs.Protocol(syscall.IPPROTO_TCP),
Port: port,
SchedName: libipvs.RoundRobin,
}

if err := handler.NewService(&svc); err != nil {
if err := handler.NewService(svc); err != nil {
fmt.Println(err.Error())
}

Expand All @@ -71,13 +73,14 @@ func addService(ip string, port uint16) *libipvs.Service {
fmt.Println(err.Error())
}

return &svc
return svc
}

func DeleteService(key string) {
node := Services[key]
deleteService(node.Service)
delete(Services, key)
log.Info("[kubeproxy] Delete service ", key)
}

func deleteService(svc *libipvs.Service) {
Expand All @@ -92,11 +95,12 @@ func AddEndpoint(key string, ip string, port uint16) {
log.Fatal("[proxy] Add Endpoint: service doesn't exist!")
}
dst := bindEndpoint(svc.Service, ip, port)
podIP := ip + ":" + string(port)
podIP := ip + ":" + strconv.Itoa(int(port))
svc.Endpoints[podIP] = &EndpointNode{
Endpoint: dst,
Visited: true,
}
log.Info("[kubeproxy] Add endpoint ", podIP, " service:", key)
}

func bindEndpoint(svc *libipvs.Service, ip string, port uint16) *libipvs.Destination {
Expand All @@ -106,8 +110,7 @@ func bindEndpoint(svc *libipvs.Service, ip string, port uint16) *libipvs.Destina
Port: port,
}

print(svc.Address.String() + ":" + strconv.Itoa(int(svc.Port)))

//print(svc.Address.String() + ":" + strconv.Itoa(int(svc.Port)))
args := []string{"-a", "-t", svc.Address.String() + ":" + strconv.Itoa(int(svc.Port)), "-r", ip + ":" + strconv.Itoa(int(port)), "-m"}
_, err := exec.Command("ipvsadm", args...).CombinedOutput()
if err != nil {
Expand All @@ -123,7 +126,7 @@ func DeleteEndpoint(svcKey string, dstKey string) {
unbindEndpoint(svc.Service, dst)
delete(svc.Endpoints, dstKey)
}

log.Info("[kubeproxy] Delete endpoint ", dstKey, " service:", svcKey)
}

func unbindEndpoint(svc *libipvs.Service, dst *libipvs.Destination) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubeproxy/ipvs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ type EndpointNode struct {
Visited bool
}

var Services map[string]*ServiceNode
var Services = make(map[string]*ServiceNode)

//var Endpoints map[string]EndpointNode
9 changes: 5 additions & 4 deletions pkg/kubeproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"minik8s/pkg/apiobject"
"minik8s/pkg/kubeproxy/ipvs"
"minik8s/utils"
"strconv"
)

func Run() {
Expand Down Expand Up @@ -39,7 +40,7 @@ func (p proxyServiceHandler) HandleDelete(message []byte) {
svc.UnmarshalJSON(message)

for _, p := range svc.Spec.Ports {
key := svc.Spec.ClusterIP + ":" + string(p.Port)
key := svc.Spec.ClusterIP + ":" + strconv.Itoa(int(p.Port))
ipvs.DeleteService(key)
}

Expand Down Expand Up @@ -68,16 +69,16 @@ func (e proxyEndpointHandler) HandleCreate(message []byte) {
edpt := &apiobject.Endpoint{}
edpt.UnmarshalJSON(message)

key := edpt.Spec.SvcIP + ":" + string(edpt.Spec.SvcPort)
key := edpt.Spec.SvcIP + ":" + strconv.Itoa(int(edpt.Spec.SvcPort))
ipvs.AddEndpoint(key, edpt.Spec.DestIP, uint16(edpt.Spec.DestPort))
}

func (e proxyEndpointHandler) HandleDelete(message []byte) {
edpt := &apiobject.Endpoint{}
edpt.UnmarshalJSON(message)

svcKey := edpt.Spec.SvcIP + ":" + string(edpt.Spec.SvcPort)
dstKey := edpt.Spec.DestIP + ":" + string(edpt.Spec.DestPort)
svcKey := edpt.Spec.SvcIP + ":" + strconv.Itoa(int(edpt.Spec.SvcPort))
dstKey := edpt.Spec.DestIP + ":" + strconv.Itoa(int(edpt.Spec.DestPort))
ipvs.DeleteEndpoint(svcKey, dstKey)
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/kubeproxy/proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kubeproxy

import (
"minik8s/pkg/kubeproxy/ipvs"
"testing"
)

func TestProxy(t *testing.T) {
ipvs.Init()
var e proxyEndpointHandler
var s proxyServiceHandler
/* test add service and add endpoint */
svcJson := "{\n \"apiVersion\": \"v1\",\n \"kind\": \"Service\",\n \"metadata\": {\n \"name\": \"service-practice\",\n \"resourcesVersion\": \"UPDATE\"\n },\n \"spec\": {\n \"selector\": {\n \"app\": \"deploy-practice-update\"\n },\n \"type\": \"ClusterIP\",\n \"ports\": [\n {\n \"name\": \"service-port1\",\n \"protocol\": \"TCP\",\n \"port\": 8080,\n \"targetPort\": \"p1\"\n },\n {\n \"name\": \"service-port2\",\n \"protocol\": \"TCP\",\n \"port\": 3000,\n \"targetPort\": \"p2\"\n }\n ],\n \"clusterIP\": \"10.10.0.2\"\n }\n}"
s.HandleUpdate([]byte(svcJson))
edptJson := "{\n \"metadata\": {\n \"name\": \"my-service\"\n },\n \"spec\": {\n \"svcIP\": \"10.10.0.2\",\n \"svcPort\": 8080,\n \"dstIP\": \"10.2.17.54\",\n \"dstPort\": 12345\n }\n}"
edptJson2 := "{\n \"metadata\": {\n \"name\": \"my-service\"\n },\n \"spec\": {\n \"svcIP\": \"10.10.0.2\",\n \"svcPort\": 8080,\n \"dstIP\": \"10.2.18.54\",\n \"dstPort\": 12345\n }\n}"
e.HandleCreate([]byte(edptJson))
e.HandleCreate([]byte(edptJson2))

if svc, ok := ipvs.Services["10.10.0.2:8080"]; !ok {
t.Error("Add Service Fail")
} else {
if _, ok := svc.Endpoints["10.2.17.54:12345"]; !ok {
t.Error("Add Endpoint Fail")
}
if _, ok := svc.Endpoints["10.2.18.54:12345"]; !ok {
t.Error("Add Endpoint Fail")
}
}

/* test delete endpoint */
e.HandleDelete([]byte(edptJson))
svc := ipvs.Services["10.10.0.2:8080"]
if _, ok := svc.Endpoints["10.2.17.54:12345"]; ok {
t.Error("Add Endpoint Fail")
}
if _, ok := svc.Endpoints["10.2.18.54:12345"]; !ok {
t.Error("Add Endpoint Fail")
}

/* test delete service */
s.HandleDelete([]byte(svcJson))
if _, ok := ipvs.Services["10.10.0.2:8080"]; ok {
t.Error("Add Endpoint Fail")
}
e.HandleDelete([]byte(edptJson2))
}
10 changes: 5 additions & 5 deletions utils/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package utils

import (
"fmt"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/glog"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"minik8s/pkg/apiobject"
)
Expand Down Expand Up @@ -59,7 +59,7 @@ func CreateObject(obj apiobject.Object, ty ObjType, ns string) {
//POST /api/v1/namespaces/{namespace}/{resource}"
url := fmt.Sprintf("http://%s/api/v1/namespaces/%s/%s", ApiServerIp, ns, ty)
if info, err := SendRequest("POST", res, url); err != nil {
glog.Error("create object ", info)
log.Error("create object ", info)
}
}

Expand All @@ -68,15 +68,15 @@ func UpdateObject(obj apiobject.Object, ty ObjType, ns string, name string) {
//POST /api/v1/namespaces/{namespace}/{resource}/{name}/update"
url := fmt.Sprintf("http://%s/api/v1/namespaces/%s/%s/%s/update", ApiServerIp, ns, ty, name)
if info, err := SendRequest("POST", res, url); err != nil {
glog.Error("create object ", info)
log.Error("create object ", info)
}
}

func DeleteObject(ty ObjType, ns string, name string) {
//DELETE /api/v1/namespaces/{namespace}/{resource}"
url := fmt.Sprintf("http://%s/api/v1/namespaces/%s/%s/%s", ApiServerIp, ns, ty, name)
if info, err := SendRequest("DELETE", nil, url); err != nil {
glog.Error("delete object ", info)
log.Error("delete object ", info)
}
}

Expand All @@ -85,7 +85,7 @@ func GetObjects(ty ObjType) string {
url := fmt.Sprintf("http://%s/api/v1/%s", ApiServerIp, ty)
var str []byte
if info, err := SendRequest("GET", str, url); err != nil {
glog.Error("get object ", info)
log.Error("get object ", info)
return info
} else {
return info
Expand Down

0 comments on commit ad25da2

Please sign in to comment.