Skip to content

Commit

Permalink
Merge pull request kosmos-io#319 from OrangeBao/feature_reachable
Browse files Browse the repository at this point in the history
feat: auto detect interface name
  • Loading branch information
kosmos-robot authored Dec 13, 2023
2 parents 9eecda9 + ba99ed3 commit 2e22100
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 25 deletions.
1 change: 1 addition & 0 deletions deploy/clusterlink-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
command:
- clusterlink-agent
- -kubeconfig=/etc/clusterlink/kubeconfig
- --v=4
env:
- name: CLUSTER_NAME
value: ""
Expand Down
2 changes: 2 additions & 0 deletions deploy/crds/kosmos.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
properties:
clusterLinkOptions:
properties:
autodetectionMethod:
type: string
bridgeCIDRs:
default:
ip: 220.0.0.0/8
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/kosmos/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type ClusterLinkOptions struct {

// +optional
GlobalCIDRsMap map[string]string `json:"globalCIDRsMap,omitempty"`

// +optional
AutodetectionMethod string `json:"autodetectionMethod,omitempty"`
}

type ClusterTreeOptions struct {
Expand Down
100 changes: 88 additions & 12 deletions pkg/clusterlink/agent-manager/auto_detect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,40 @@ package agent
import (
"context"
"fmt"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clusterlink/agent-manager/autodetection"
"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node"
"github.com/kosmos.io/kosmos/pkg/clusterlink/network"
"github.com/kosmos.io/kosmos/pkg/utils"
interfacepolicy "github.com/kosmos.io/kosmos/pkg/utils/interface-policy"
"github.com/kosmos.io/kosmos/pkg/utils/lifted/autodetection"
)

const (
AutoDetectControllerName = "cluster-node-controller"
AutoDetectRequeueTime = 10 * time.Second
)

const (
AUTODETECTION_METHOD_CAN_REACH = "can-reach="
)

type AutoDetectReconciler struct {
client.Client
ClusterName string
Expand All @@ -49,10 +59,6 @@ func (r *AutoDetectReconciler) SetupWithManager(mgr manager.Manager) error {
return false
}

if eventObj.Spec.InterfaceName == network.AutoSelectInterfaceFlag || len(eventObj.Spec.InterfaceName) == 0 {
return false
}

return true
}

Expand All @@ -73,9 +79,65 @@ func (r *AutoDetectReconciler) SetupWithManager(mgr manager.Manager) error {
return skipEvent(genericEvent.Object)
},
})).
Watches(&source.Kind{Type: &kosmosv1alpha1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(r.newClusterMapFunc())).
Complete(r)
}

func (r *AutoDetectReconciler) newClusterMapFunc() handler.MapFunc {
return func(a client.Object) []reconcile.Request {
var requests []reconcile.Request
cluster := a.(*kosmosv1alpha1.Cluster)
klog.V(4).Infof("auto detect cluster change: %s, currentNode cluster name: %s", cluster.Name, r.ClusterName)
if cluster.Name == r.ClusterName {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Name: node.ClusterNodeName(r.ClusterName, r.NodeName),
}})
}
return requests
}
}

func (r *AutoDetectReconciler) detectInterfaceName(ctx context.Context) (string, error) {
var Cluster kosmosv1alpha1.Cluster

if err := r.Get(ctx, types.NamespacedName{
Name: r.ClusterName,
Namespace: "",
}, &Cluster); err != nil {
return "", err
}

if Cluster.Spec.ClusterLinkOptions != nil {
defaultNICName := interfacepolicy.GetInterfaceName(Cluster.Spec.ClusterLinkOptions.NICNodeNames, r.NodeName, Cluster.Spec.ClusterLinkOptions.DefaultNICName)

if defaultNICName != network.AutoSelectInterfaceFlag {
return defaultNICName, nil
}

method := Cluster.Spec.ClusterLinkOptions.AutodetectionMethod
// TODO: set default reachable ip when defaultNICName == * and meth == ""
if method == "" {
method = fmt.Sprintf("%s%s", AUTODETECTION_METHOD_CAN_REACH, "8.8.8.8")
}
if strings.HasPrefix(method, AUTODETECTION_METHOD_CAN_REACH) {
// Autodetect the IP by connecting a UDP socket to a supplied address.
destStr := strings.TrimPrefix(method, AUTODETECTION_METHOD_CAN_REACH)

version := 4
if utils.IsIPv6(destStr) {
version = 6
}

if i, _, err := autodetection.ReachDestination(destStr, version); err != nil {
return "", err
} else {
return i.Name, nil
}
}
}
return "", fmt.Errorf("can not detect nic")
}

func detectIP(interfaceName string) (string, string) {
detectFunc := func(version int) (string, error) {
_, n, err := autodetection.FilteredEnumeration([]string{interfaceName}, nil, nil, version)
Expand All @@ -101,7 +163,9 @@ func detectIP(interfaceName string) (string, string) {
}

func shouldUpdate(old, new kosmosv1alpha1.ClusterNode) bool {
return old.Spec.IP != new.Spec.IP || old.Spec.IP6 != new.Spec.IP6
return old.Spec.IP != new.Spec.IP ||
old.Spec.IP6 != new.Spec.IP6 ||
old.Spec.InterfaceName != new.Spec.InterfaceName
}

func (r *AutoDetectReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -112,6 +176,7 @@ func (r *AutoDetectReconciler) Reconcile(ctx context.Context, request reconcile.
var clusterNode kosmosv1alpha1.ClusterNode
if err := r.Get(ctx, request.NamespacedName, &clusterNode); err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("auto_detect_controller cluster node not found %s", request.NamespacedName)
return reconcile.Result{}, nil
}
klog.Errorf("get clusternode %s error: %v", request.NamespacedName, err)
Expand All @@ -123,17 +188,28 @@ func (r *AutoDetectReconciler) Reconcile(ctx context.Context, request reconcile.
return reconcile.Result{}, nil
}

// only do autodetect when clusterNode.Spec.InterfaceName != * and not nil
if clusterNode.Spec.InterfaceName == network.AutoSelectInterfaceFlag || len(clusterNode.Spec.InterfaceName) == 0 {
return reconcile.Result{}, nil
// update clusterNode
newClusterNode := clusterNode.DeepCopy()

currentInterfaceName, err := r.detectInterfaceName(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("cluster is not found, %s", request.NamespacedName)
return reconcile.Result{}, nil
}
klog.Errorf("get cluster %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: AutoDetectRequeueTime}, nil
}

// update interface
newClusterNode.Spec.InterfaceName = currentInterfaceName

klog.V(4).Infof("auto detect interface name: %s", currentInterfaceName)

// detect IP by Name
ipv4, ipv6 := detectIP(clusterNode.Spec.InterfaceName)
ipv4, ipv6 := detectIP(currentInterfaceName)
klog.V(4).Infof("auto detect ipv4: %s, ipv6: %s", ipv4, ipv6)

// update clusterNode
newClusterNode := clusterNode.DeepCopy()
if ipv4 != "" {
newClusterNode.Spec.IP = ipv4
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/clusterlink/controllers/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clusterlink/network"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/utils"
interfacepolicy "github.com/kosmos.io/kosmos/pkg/utils/interface-policy"
)

const (
Expand Down Expand Up @@ -109,20 +107,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
},
},
}
cluster, err := r.ClusterLinkClient.KosmosV1alpha1().Clusters().Get(ctx, r.ClusterName, metav1.GetOptions{})
if err != nil {
klog.Errorf("get cluster %s err: %v", r.ClusterName, err)
return reconcile.Result{Requeue: true}, nil
}

err = CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error {
err := CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error {
n.Spec.NodeName = node.Name
n.Spec.ClusterName = r.ClusterName
n.Spec.InterfaceName = interfacepolicy.GetInterfaceName(cluster.Spec.ClusterLinkOptions.NICNodeNames, node.Name, cluster.Spec.ClusterLinkOptions.DefaultNICName)
if n.Spec.InterfaceName == network.AutoSelectInterfaceFlag {
n.Spec.IP = internalIP
n.Spec.IP6 = internalIP6
}
// n.Spec.InterfaceName while set by clusterlink-agent
return nil
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/clusterlink/agent/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ spec:
command:
- clusterlink-agent
- --kubeconfig=/etc/clusterlink/kubeconfig
- --v=4
env:
- name: CLUSTER_NAME
value: "{{ .ClusterName }}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// This code is directly lifted from the calico

// For reference:
// https://github.com/projectcalico/calico/blob/master/node/pkg/lifecycle/startup/autodetection/filtered.go

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// This code is directly lifted from the calico

// For reference:
// https://github.com/projectcalico/calico/blob/master/node/pkg/lifecycle/startup/autodetection/interfaces.go

Expand Down
71 changes: 71 additions & 0 deletions pkg/utils/lifted/autodetection/reachaddr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// This code is directly lifted from the calico

// For reference:
// https://github.com/projectcalico/calico/blob/master/node/pkg/lifecycle/startup/autodetection/reachaddr.go

// Copyright (c) 2017 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package autodetection

import (
"fmt"
gonet "net"

"github.com/projectcalico/calico/libcalico-go/lib/net"
log "github.com/sirupsen/logrus"
)

// ReachDestination auto-detects the interface Network by setting up a UDP
// connection to a "reach" destination.
func ReachDestination(dest string, version int) (*Interface, *net.IPNet, error) {
log.Debugf("Auto-detecting IPv%d CIDR by reaching destination %s", version, dest)

// Open a UDP connection to determine which external IP address is
// used to access the supplied destination.
protocol := fmt.Sprintf("udp%d", version)
address := fmt.Sprintf("[%s]:80", dest)
conn, err := gonet.Dial(protocol, address)
if err != nil {
return nil, nil, err
}
defer conn.Close() // nolint: errcheck

// Get the local address as a golang IP and use that to find the matching
// interface CIDR.
addr := conn.LocalAddr()
if addr == nil {
return nil, nil, fmt.Errorf("no address detected by connecting to %s", dest)
}
udpAddr := addr.(*gonet.UDPAddr)
log.WithFields(log.Fields{"IP": udpAddr.IP, "Destination": dest}).Info("Auto-detected address by connecting to remote")

// Get a full list of interface and IPs and find the CIDR matching the
// found IP.
ifaces, err := GetInterfaces(gonet.Interfaces, nil, nil, version)
if err != nil {
return nil, nil, err
}
for _, iface := range ifaces {
log.WithField("Name", iface.Name).Debug("Checking interface CIDRs")
for _, cidr := range iface.Cidrs {
log.WithField("CIDR", cidr.String()).Debug("Checking CIDR")
if cidr.IP.Equal(udpAddr.IP) {
log.WithField("CIDR", cidr.String()).Debug("Found matching interface CIDR")
return &iface, &cidr, nil
}
}
}

return nil, nil, fmt.Errorf("autodetected IPv%d address does not match any addresses found on local interfaces: %s", version, udpAddr.IP.String())
}

0 comments on commit 2e22100

Please sign in to comment.