Skip to content

Commit

Permalink
Merge pull request #362 from l1b0k/feat/node-controller
Browse files Browse the repository at this point in the history
feat: add node controller
  • Loading branch information
l1b0k authored May 31, 2022
2 parents dbfceb2 + 915851c commit 4dcb68e
Show file tree
Hide file tree
Showing 34 changed files with 774 additions and 318 deletions.
31 changes: 12 additions & 19 deletions cmd/terway-controlplane/terway-controlplane.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 Terway Authors.
Copyright 2021-2022 Terway Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@ import (
"flag"
"fmt"
"math/rand"
"os"
"time"

"github.com/AliyunContainerService/terway/pkg/aliyun/client"
Expand All @@ -30,18 +29,17 @@ import (
"github.com/AliyunContainerService/terway/pkg/cert"
register "github.com/AliyunContainerService/terway/pkg/controller"
_ "github.com/AliyunContainerService/terway/pkg/controller/all"
"github.com/AliyunContainerService/terway/pkg/controller/endpoint"
"github.com/AliyunContainerService/terway/pkg/controller/vswitch"
"github.com/AliyunContainerService/terway/pkg/controller/webhook"
terwayMetric "github.com/AliyunContainerService/terway/pkg/metric"
"github.com/AliyunContainerService/terway/pkg/utils"
"github.com/AliyunContainerService/terway/types/controlplane"
"k8s.io/client-go/util/flowcontrol"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -133,25 +131,20 @@ func main() {
panic(err)
}

if cfg.RegisterEndpoint {
ipStr := os.Getenv("MY_POD_IP")
if ipStr == "" {
panic("podIP is not found")
}
// if enable Service name should equal cfg.ControllerName
ep := endpoint.New(cfg.ControllerName, cfg.ControllerNamespace, ipStr, int32(cfg.WebhookPort))
err = mgr.Add(ep)
if err != nil {
panic(err)
}
ctrlCtx := &register.ControllerCtx{
Config: cfg,
VSwitchPool: vSwitchCtrl,
AliyunClient: aliyunClient,
}

for name := range register.Controllers {
err = register.Controllers[name](mgr, aliyunClient, vSwitchCtrl)
if err != nil {
panic(err)
if controlplane.IsControllerEnabled(name, register.Controllers[name].Enable, cfg.Controllers) {
err = register.Controllers[name].Creator(mgr, ctrlCtx)
if err != nil {
panic(err)
}
log.Info("register controller", "controller", name)
}
log.Info("register controller", "controller", name)
}

log.Info("controller started")
Expand Down
51 changes: 27 additions & 24 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,19 +1329,19 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
if err != nil {
return nil, errors.Wrapf(err, "error create aliyun client")
}
err = aliyun.UpdateFromAPI(aliyunClient.ClientSet.ECS(), ins.InstanceType)
if err != nil {
return nil, err
}
limit, ok := aliyun.GetLimit(ins.InstanceType)
if !ok {
return nil, fmt.Errorf("upable get instance limit")
}
if !limit.SupportIPv6() {
ipFamily.IPv6 = false
serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType)

if !config.DisableDevicePlugin {
limit, err := aliyun.GetLimit(aliyunClient, ins.InstanceType)
if err != nil {
return nil, fmt.Errorf("upable get instance limit, %w", err)
}
if !limit.SupportIPv6() {
ipFamily.IPv6 = false
serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType)
}
}
ecs := aliyun.NewAliyunImpl(aliyunClient, config.EnableENITrunking, ipFamily)

ecs := aliyun.NewAliyunImpl(aliyunClient, config.EnableENITrunking && !config.WaitTrunkENI, ipFamily)

netSrv.enableTrunk = config.EnableENITrunking

Expand Down Expand Up @@ -1553,18 +1553,21 @@ func validateConfig(cfg *types.Configure) error {

func getPoolConfig(cfg *types.Configure) (*types.PoolConfig, error) {
poolConfig := &types.PoolConfig{
MaxPoolSize: cfg.MaxPoolSize,
MinPoolSize: cfg.MinPoolSize,
MaxENI: cfg.MaxENI,
MinENI: cfg.MinENI,
AccessID: cfg.AccessID,
AccessSecret: cfg.AccessSecret,
EniCapRatio: cfg.EniCapRatio,
EniCapShift: cfg.EniCapShift,
SecurityGroups: cfg.GetSecurityGroups(),
VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy,
EnableENITrunking: cfg.EnableENITrunking,
ENICapPolicy: cfg.ENICapPolicy,
MaxPoolSize: cfg.MaxPoolSize,
MinPoolSize: cfg.MinPoolSize,
MaxENI: cfg.MaxENI,
MinENI: cfg.MinENI,
AccessID: cfg.AccessID,
AccessSecret: cfg.AccessSecret,
EniCapRatio: cfg.EniCapRatio,
EniCapShift: cfg.EniCapShift,
SecurityGroups: cfg.GetSecurityGroups(),
VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy,
EnableENITrunking: cfg.EnableENITrunking,
ENICapPolicy: cfg.ENICapPolicy,
DisableDevicePlugin: cfg.DisableDevicePlugin,
WaitTrunkENI: cfg.WaitTrunkENI,
DisableSecurityGroupCheck: cfg.DisableSecurityGroupCheck,
}
if len(poolConfig.SecurityGroups) > 5 {
return nil, fmt.Errorf("security groups should not be more than 5, current %d", len(poolConfig.SecurityGroups))
Expand Down
120 changes: 71 additions & 49 deletions daemon/eni-multi-ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type eniIPFactory struct {
ipResultChan chan *ENIIP
sync.RWMutex
// metrics
metricENICount prometheus.Gauge
metricENICount prometheus.Gauge
disableSecurityGroupCheck bool

ipFamily *types.IPFamily
}
Expand Down Expand Up @@ -288,7 +289,7 @@ func (f *eniIPFactory) popResult() (ip *types.ENIIP, err error) {
eni.lock.Lock()
eni.ips = append(eni.ips, result)
eni.lock.Unlock()
metric.ENIIPFactoryIPCount.WithLabelValues(f.name, eni.MAC, fmt.Sprint(eni.MaxIPs)).Inc()
metric.ENIIPFactoryIPCount.WithLabelValues(f.name, eni.MAC, fmt.Sprint(f.eniMaxIP)).Inc()
return result.ENIIP, nil
}
}
Expand Down Expand Up @@ -402,7 +403,7 @@ func (f *eniIPFactory) Dispose(res types.NetworkResource) (err error) {
return fmt.Errorf("ENI have pending ips to be allocate")
}
// block ip allocate
eni.pending = eni.MaxIPs
eni.pending = f.eniMaxIP
eni.lock.Unlock()

f.Lock()
Expand Down Expand Up @@ -460,7 +461,7 @@ func (f *eniIPFactory) Dispose(res types.NetworkResource) (err error) {
}
}
eni.lock.Unlock()
metric.ENIIPFactoryIPCount.WithLabelValues(f.name, eni.MAC, fmt.Sprint(eni.MaxIPs)).Dec()
metric.ENIIPFactoryIPCount.WithLabelValues(f.name, eni.MAC, fmt.Sprint(f.eniMaxIP)).Dec()
return nil
}

Expand Down Expand Up @@ -664,7 +665,6 @@ func (f *eniIPFactory) initialENI(eni *ENI, ipCount int) {
eni.lock.Lock()
if utils.IsWindowsOS() {
// NB(thxCode): don't assign the primary IP of the assistant eni.
eni.ENI.MaxIPs--
ipv4s, ipv6s = dropPrimaryIP(eni.ENI, ipv4s, ipv6s)
}
eniIPLog.Infof("allocate status on async eni: %+v, pending: %v, ips: %v, backlog: %v",
Expand Down Expand Up @@ -859,52 +859,72 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
ipResultChan: make(chan *ENIIP, maxIPBacklog),
ipFamily: ipFamily,
}
limit, ok := aliyun.GetLimit(aliyun.GetInstanceMeta().InstanceType)
if !ok {
return nil, fmt.Errorf("error get max eni for eniip factory")
}
maxEni := limit.Adapters
maxEni = int(float64(maxEni)*poolConfig.EniCapRatio) + poolConfig.EniCapShift - 1
var capacity, maxEni, memberENIPod, memberLimit int

ipPerENI := limit.IPv4PerAdapter
if utils.IsWindowsOS() {
// NB(thxCode): don't assign the primary IP of one assistant eni.
ipPerENI--
}
factory.eniMaxIP = ipPerENI
if !poolConfig.DisableDevicePlugin {
limit, err := aliyun.GetLimit(ecs, aliyun.GetInstanceMeta().InstanceType)
if err != nil {
return nil, fmt.Errorf("error get max eni for eniip factory, %w", err)
}
maxEni = limit.Adapters
maxEni = int(float64(maxEni)*poolConfig.EniCapRatio) + poolConfig.EniCapShift - 1

if poolConfig.MaxENI != 0 && poolConfig.MaxENI < maxEni {
maxEni = poolConfig.MaxENI
}
capacity := maxEni * ipPerENI
if capacity < 0 {
capacity = 0
}
memberENIPod := limit.MemberAdapterLimit
if memberENIPod < 0 {
memberENIPod = 0
}
ipPerENI := limit.IPv4PerAdapter
if utils.IsWindowsOS() {
// NB(thxCode): don't assign the primary IP of one assistant eni.
ipPerENI--
}
factory.eniMaxIP = ipPerENI

factory.maxENI = make(chan struct{}, maxEni)
if poolConfig.MaxENI != 0 && poolConfig.MaxENI < maxEni {
maxEni = poolConfig.MaxENI
}
capacity = maxEni * ipPerENI
if capacity < 0 {
capacity = 0
}
memberENIPod = limit.MemberAdapterLimit
if memberENIPod < 0 {
memberENIPod = 0
}

if poolConfig.MinENI != 0 {
poolConfig.MinPoolSize = poolConfig.MinENI * ipPerENI
}
factory.maxENI = make(chan struct{}, maxEni)

if poolConfig.MinPoolSize > capacity {
eniIPLog.Infof("min pool size bigger than node capacity, set min pool size to capacity")
poolConfig.MinPoolSize = capacity
}
if poolConfig.MinENI != 0 {
poolConfig.MinPoolSize = poolConfig.MinENI * ipPerENI
}

if poolConfig.MinPoolSize > capacity {
eniIPLog.Infof("min pool size bigger than node capacity, set min pool size to capacity")
poolConfig.MinPoolSize = capacity
}

if poolConfig.MaxPoolSize > capacity {
eniIPLog.Infof("max pool size bigger than node capacity, set max pool size to capacity")
poolConfig.MaxPoolSize = capacity
}

if poolConfig.MinPoolSize > poolConfig.MaxPoolSize {
eniIPLog.Warnf("min_pool_size bigger: %v than max_pool_size: %v, set max_pool_size to the min_pool_size",
poolConfig.MinPoolSize, poolConfig.MaxPoolSize)
poolConfig.MaxPoolSize = poolConfig.MinPoolSize
}

if poolConfig.MaxPoolSize > capacity {
eniIPLog.Infof("max pool size bigger than node capacity, set max pool size to capacity")
poolConfig.MaxPoolSize = capacity
memberLimit = limit.MemberAdapterLimit
} else {
capacity = 1
maxEni = 1
memberENIPod = 0
memberLimit = 0
}

if poolConfig.MinPoolSize > poolConfig.MaxPoolSize {
eniIPLog.Warnf("min_pool_size bigger: %v than max_pool_size: %v, set max_pool_size to the min_pool_size",
poolConfig.MinPoolSize, poolConfig.MaxPoolSize)
poolConfig.MaxPoolSize = poolConfig.MinPoolSize
if poolConfig.WaitTrunkENI {
logger.DefaultLogger.Infof("waitting trunk eni ready")
factory.trunkOnEni, err = k8s.WaitTrunkReady()
if err != nil {
return nil, err
}
logger.DefaultLogger.Infof("trunk eni found %s", factory.trunkOnEni)
}

// eniip factory metrics
Expand All @@ -928,11 +948,13 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
if factory.enableTrunk {
for _, eni := range enis {
if eni.Trunk {
trunkENI = eni
factory.trunkOnEni = eni.ID
}
if eni.ID == factory.trunkOnEni {
trunkENI = eni
}
}
if factory.trunkOnEni == "" && len(enis) < limit.MemberAdapterLimit {
if factory.trunkOnEni == "" && len(enis) < memberLimit {
trunkENIRes, err := factory.eniFactory.CreateWithIPCount(1, true)
if err != nil {
return errors.Wrapf(err, "error init trunk eni")
Expand Down Expand Up @@ -981,7 +1003,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
poolENI.ips = append(poolENI.ips, &ENIIP{
ENIIP: eniIP,
})
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolENI.MaxIPs)).Inc()
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc()

if !ok {
holder.AddIdle(eniIP)
Expand All @@ -1004,7 +1026,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
poolENI.ips = append(poolENI.ips, &ENIIP{
ENIIP: eniIP,
})
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolENI.MaxIPs)).Inc()
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc()

holder.AddInuse(eniIP, podInfoKey(res.podInfo.Namespace, res.podInfo.Name))

Expand All @@ -1031,7 +1053,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
poolENI.ips = append(poolENI.ips, &ENIIP{
ENIIP: eniIP,
})
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolENI.MaxIPs)).Inc()
metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc()

if ipFamily.IPv4 && ipFamily.IPv6 && (unUsed.IPv6 == nil || unUsed.IPv4 == nil) {
holder.AddInvalid(eniIP)
Expand Down Expand Up @@ -1062,7 +1084,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
}

//init device plugin for ENI
if poolConfig.EnableENITrunking && factory.trunkOnEni != "" {
if poolConfig.EnableENITrunking && factory.trunkOnEni != "" && !poolConfig.DisableDevicePlugin {
dp := deviceplugin.NewENIDevicePlugin(memberENIPod, deviceplugin.ENITypeMember)
err = dp.Serve()
if err != nil {
Expand Down
Loading

0 comments on commit 4dcb68e

Please sign in to comment.