Skip to content

Commit

Permalink
feat:devbox controller support websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
bearslyricattack committed Dec 2, 2024
1 parent d55540e commit bbaa0fb
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 46 deletions.
8 changes: 6 additions & 2 deletions controllers/devbox/api/v1alpha1/devbox_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ const (
type NetworkType string

const (
NetworkTypeNodePort NetworkType = "NodePort"
NetworkTypeTailnet NetworkType = "Tailnet"
NetworkTypeNodePort NetworkType = "NodePort"
NetworkTypeTailnet NetworkType = "Tailnet"
NetworkTypeWebSocket NetworkType = "WebSocket"
)

type ResourceList map[ResourceName]resource.Quantity
Expand Down Expand Up @@ -123,6 +124,9 @@ type NetworkStatus struct {
// +kubebuilder:validation:Optional
NodePort int32 `json:"nodePort"`

// +kubebuilder:validation:Optional
WebSocket string `json:"webSocket"`

// todo TailNet
// +kubebuilder:validation:Optional
TailNet string `json:"tailnet"`
Expand Down
277 changes: 237 additions & 40 deletions controllers/devbox/internal/controller/devbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
networkingv1 "k8s.io/api/networking/v1"
"time"

devboxv1alpha1 "github.com/labring/sealos/controllers/devbox/api/v1alpha1"
Expand Down Expand Up @@ -49,8 +50,8 @@ type DevboxReconciler struct {
RequestMemoryRate float64
RequestEphemeralStorage string
LimitEphemeralStorage string

DebugMode bool
WebSocketImage string
DebugMode bool

client.Client
Scheme *runtime.Scheme
Expand Down Expand Up @@ -125,20 +126,13 @@ func (r *DevboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
logger.Info("sync secret success")
r.Recorder.Eventf(devbox, corev1.EventTypeNormal, "Sync secret success", "Sync secret success")

// create service if network type is NodePort
if devbox.Spec.NetworkSpec.Type == devboxv1alpha1.NetworkTypeNodePort {
logger.Info("syncing service")
if err := r.Get(ctx, req.NamespacedName, devbox); err != nil {
return ctrl.Result{}, err
}
if err := r.syncService(ctx, devbox, recLabels); err != nil {
logger.Error(err, "sync service failed")
r.Recorder.Eventf(devbox, corev1.EventTypeWarning, "Sync service failed", "%v", err)
return ctrl.Result{}, err
}
logger.Info("sync service success")
r.Recorder.Eventf(devbox, corev1.EventTypeNormal, "Sync service success", "Sync service success")
logger.Info("syncing network")
if err := r.syncNetwork(ctx, devbox, recLabels); err != nil {
logger.Error(err, "sync network failed")
r.Recorder.Eventf(devbox, corev1.EventTypeWarning, "Sync network failed", "%v", err)
return ctrl.Result{}, err
}
logger.Info("sync network success")

// create or update pod
logger.Info("syncing pod")
Expand Down Expand Up @@ -342,31 +336,8 @@ func (r *DevboxReconciler) syncPod(ctx context.Context, devbox *devboxv1alpha1.D
return nil
}

func (r *DevboxReconciler) syncService(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string) error {
runtimecr, err := r.getRuntime(ctx, devbox)
if err != nil {
return err
}
var servicePorts []corev1.ServicePort
for _, port := range runtimecr.Spec.Config.Ports {
servicePorts = append(servicePorts, corev1.ServicePort{
Name: port.Name,
Port: port.ContainerPort,
TargetPort: intstr.FromInt32(port.ContainerPort),
Protocol: port.Protocol,
})
}
if len(servicePorts) == 0 {
//use the default value
servicePorts = []corev1.ServicePort{
{
Name: "devbox-ssh-port",
Port: 22,
TargetPort: intstr.FromInt32(22),
Protocol: corev1.ProtocolTCP,
},
}
}
func (r *DevboxReconciler) syncNodePortNetwork(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string, servicePorts []corev1.ServicePort) error {
var err error
expectServiceSpec := corev1.ServiceSpec{
Selector: recLabels,
Type: corev1.ServiceTypeNodePort,
Expand Down Expand Up @@ -439,6 +410,232 @@ func (r *DevboxReconciler) getRuntime(ctx context.Context, devbox *devboxv1alpha
return runtimecr, nil
}

func (r *DevboxReconciler) syncNetwork(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string) error {
runtimecr, err := r.getRuntime(ctx, devbox)
if err != nil {
return err
}
var servicePorts []corev1.ServicePort
for _, port := range runtimecr.Spec.Config.Ports {
servicePorts = append(servicePorts, corev1.ServicePort{
Name: port.Name,
Port: port.ContainerPort,
TargetPort: intstr.FromInt32(port.ContainerPort),
Protocol: port.Protocol,
})
}
if len(servicePorts) == 0 {
//use the default value
servicePorts = []corev1.ServicePort{
{
Name: "devbox-ssh-port",
Port: 22,
TargetPort: intstr.FromInt32(22),
Protocol: corev1.ProtocolTCP,
},
}
}
switch devbox.Spec.NetworkSpec.Type {
case devboxv1alpha1.NetworkTypeNodePort:
return r.syncNodePortNetwork(ctx, devbox, recLabels, servicePorts)
case devboxv1alpha1.NetworkTypeWebSocket:
return r.syncWebSocketNetwork(ctx, devbox, recLabels, servicePorts)
}
return nil
}

func (r *DevboxReconciler) syncWebSocketNetwork(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string, servicePorts []corev1.ServicePort) error {

devbox.Status.Network.Type = devboxv1alpha1.NetworkTypeWebSocket
if devbox.Status.Network.WebSocket == "" {
// generate a random string as the subdomain.
// TODO: what if the subdomain is already used by other devboxes?...
}
if err := r.Status().Update(ctx, devbox); err != nil {
return err
}

if err := r.syncPodSvc(ctx, devbox, recLabels, servicePorts); err != nil {
return err
}
if err := r.syncProxyPod(ctx, devbox, recLabels, servicePorts); err != nil {
return err
}
if err := r.syncProxySvc(ctx, devbox, recLabels, servicePorts); err != nil {
return err
}
if err := r.syncProxyIngress(ctx, devbox); err != nil {
return err
}

return nil
}

func (r *DevboxReconciler) syncProxyIngress(ctx context.Context, devbox *devboxv1alpha1.Devbox) error {
wsIngress := &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: devbox.Name + "-proxy-ingress",
Namespace: devbox.Namespace,
},
Spec: networkingv1.IngressSpec{
Rules: []networkingv1.IngressRule{
{
Host: devbox.Name + ".sealoshzh.site",
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: "/",
PathType: new(networkingv1.PathType),
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: devbox.Name + "-proxy-svc",
Port: networkingv1.ServiceBackendPort{
Number: 22,
},
},
},
},
},
},
},
},
},
},
}
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, wsIngress, func() error {
return controllerutil.SetControllerReference(devbox, wsIngress, r.Scheme)
}); err != nil {
return err
}
return nil
}

func (r *DevboxReconciler) syncProxySvc(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string, servicePorts []corev1.ServicePort) error {
runtimecr, err := r.getRuntime(ctx, devbox)
if err != nil {
return err
}

expectServiceSpec := corev1.ServiceSpec{
Selector: helper.GenerateProxyPodLabels(devbox, runtimecr),
Type: corev1.ServiceTypeClusterIP,
Ports: servicePorts,
}
proxySvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: devbox.Name + "-proxy-svc",
Namespace: devbox.Namespace,
Labels: helper.GenerateProxyPodLabels(devbox, runtimecr),
},
}

if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, proxySvc, func() error {
// only update some specific fields
proxySvc.Spec.Selector = expectServiceSpec.Selector
proxySvc.Spec.Type = expectServiceSpec.Type
if len(proxySvc.Spec.Ports) == 0 {
proxySvc.Spec.Ports = expectServiceSpec.Ports
} else {
proxySvc.Spec.Ports[0].Name = expectServiceSpec.Ports[0].Name
proxySvc.Spec.Ports[0].Port = expectServiceSpec.Ports[0].Port
proxySvc.Spec.Ports[0].TargetPort = expectServiceSpec.Ports[0].TargetPort
proxySvc.Spec.Ports[0].Protocol = expectServiceSpec.Ports[0].Protocol
}
return controllerutil.SetControllerReference(devbox, proxySvc, r.Scheme)
}); err != nil {
return err
}
return nil
}

func (r *DevboxReconciler) syncProxyPod(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string, servicePorts []corev1.ServicePort) error {
runtimecr, err := r.getRuntime(ctx, devbox)
if err != nil {
return err
}

sshPort := "22"
for _, port := range servicePorts {
if port.Name == "devbox-ssh-port" {
sshPort = port.TargetPort.String()
break
}
}

wsPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: devbox.Name + "-proxy-pod",
Namespace: devbox.Namespace,
Labels: helper.GenerateProxyPodLabels(devbox, runtimecr),
Annotations: helper.GeneratePodAnnotations(devbox, runtimecr),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ws-proxy",
Image: r.WebSocketImage,
Args: []string{
"server",
"--port=",
fmt.Sprintf("--port=%s", sshPort),
fmt.Sprintf("--proxy=%s", devbox.Name+"-pod-svc:22"),
"-v=true",
},
Resources: helper.GenerateProxyPodResourceRequirements(),
},
},
},
}
// if devbox is running, create the pod
if devbox.Spec.State == devboxv1alpha1.DevboxStateRunning {
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, wsPod, func() error {
return controllerutil.SetControllerReference(devbox, wsPod, r.Scheme)
}); err != nil {
return err
}
}
return nil
}

func (r *DevboxReconciler) syncPodSvc(ctx context.Context, devbox *devboxv1alpha1.Devbox, recLabels map[string]string, servicePorts []corev1.ServicePort) error {
runtimecr, err := r.getRuntime(ctx, devbox)
if err != nil {
return err
}

expectServiceSpec := corev1.ServiceSpec{
Selector: recLabels,
Type: corev1.ServiceTypeClusterIP,
Ports: servicePorts,
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: devbox.Name + "-pod-svc",
Namespace: devbox.Namespace,
Labels: helper.GenerateProxyPodLabels(devbox, runtimecr),
},
}
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
// only update some specific fields
service.Spec.Selector = expectServiceSpec.Selector
service.Spec.Type = expectServiceSpec.Type
if len(service.Spec.Ports) == 0 {
service.Spec.Ports = expectServiceSpec.Ports
} else {
service.Spec.Ports[0].Name = expectServiceSpec.Ports[0].Name
service.Spec.Ports[0].Port = expectServiceSpec.Ports[0].Port
service.Spec.Ports[0].TargetPort = expectServiceSpec.Ports[0].TargetPort
service.Spec.Ports[0].Protocol = expectServiceSpec.Ports[0].Protocol
}
return controllerutil.SetControllerReference(devbox, service, r.Scheme)
}); err != nil {
return err
}
return nil

}

// create a new pod, add predicated status to nextCommitHistory
func (r *DevboxReconciler) createPod(ctx context.Context, devbox *devboxv1alpha1.Devbox, expectPod *corev1.Pod, nextCommitHistory *devboxv1alpha1.CommitHistory) error {
nextCommitHistory.Status = devboxv1alpha1.CommitStatusPending
Expand Down
Loading

0 comments on commit bbaa0fb

Please sign in to comment.