Skip to content

Commit

Permalink
feat: support ipv6
Browse files Browse the repository at this point in the history
  • Loading branch information
tydra-wang committed May 20, 2022
1 parent 8ee8f4f commit 20e67d0
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 33 deletions.
2 changes: 0 additions & 2 deletions cmd/yurt-controller-manager/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ import (
"time"

"k8s.io/component-base/logs"

// load all the prometheus client-go plugin
_ "k8s.io/component-base/metrics/prometheus/clientgo"

// for version metric registration
_ "k8s.io/component-base/metrics/prometheus/version"

Expand Down
7 changes: 6 additions & 1 deletion cmd/yurt-tunnel-agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/pflag"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"

"github.com/openyurtio/openyurt/cmd/yurt-tunnel-agent/app/config"
Expand Down Expand Up @@ -134,7 +135,11 @@ func (o *AgentOptions) Config() (*config.Config, error) {
}

if len(c.AgentIdentifiers) == 0 {
c.AgentIdentifiers = fmt.Sprintf("ipv4=%s&host=%s", o.NodeIP, o.NodeName)
ipFamily := "ipv4"
if utilnet.IsIPv6String(o.NodeIP) {
ipFamily = "ipv6"
}
c.AgentIdentifiers = fmt.Sprintf("%s=%s&host=%s", ipFamily, o.NodeIP, o.NodeName)
}
klog.Infof("%s is set for agent identifies", c.AgentIdentifiers)

Expand Down
2 changes: 2 additions & 0 deletions cmd/yurt-tunnel-server/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/util/iptables"
"github.com/openyurtio/openyurt/pkg/yurttunnel/constants"
)

Expand All @@ -34,6 +35,7 @@ type Config struct {
EnableIptables bool
EnableDNSController bool
IptablesSyncPeriod int
IPFamily iptables.Protocol
DNSSyncPeriod int
CertDNSNames []string
CertIPs []net.IP
Expand Down
7 changes: 7 additions & 0 deletions cmd/yurt-tunnel-server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/spf13/pflag"
"k8s.io/client-go/informers"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
"sigs.k8s.io/apiserver-network-proxy/pkg/server"

"github.com/openyurtio/openyurt/cmd/yurt-tunnel-server/app/config"
"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/util/certmanager"
"github.com/openyurtio/openyurt/pkg/util/iptables"
"github.com/openyurtio/openyurt/pkg/yurttunnel/constants"
kubeutil "github.com/openyurtio/openyurt/pkg/yurttunnel/kubernetes"
)
Expand Down Expand Up @@ -136,6 +138,11 @@ func (o *ServerOptions) Config() (*config.Config, error) {
}
}

if utilnet.IsIPv6String(o.BindAddr) {
cfg.IPFamily = iptables.ProtocolIpv6
} else {
cfg.IPFamily = iptables.ProtocolIpv4
}
cfg.ListenAddrForAgent = net.JoinHostPort(o.BindAddr, o.TunnelAgentConnectPort)
cfg.ListenAddrForMaster = net.JoinHostPort(o.BindAddr, o.SecurePort)
cfg.ListenInsecureAddrForMaster = net.JoinHostPort(o.InsecureBindAddr, o.InsecurePort)
Expand Down
5 changes: 3 additions & 2 deletions cmd/yurt-tunnel-server/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
}
// 1. start the IP table manager
if cfg.EnableIptables {
iptablesMgr := iptables.NewIptablesManager(cfg.Client,
iptablesMgr := iptables.NewIptablesManagerWithIPFamily(cfg.Client,
cfg.SharedInformerFactory.Core().V1().Nodes(),
cfg.ListenAddrForMaster,
cfg.ListenInsecureAddrForMaster,
cfg.IptablesSyncPeriod)
cfg.IptablesSyncPeriod,
cfg.IPFamily)
if iptablesMgr == nil {
return fmt.Errorf("fail to create a new IptableManager")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/certmanager/certmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewYurttunnelServerCertManager(
)

// the ips and dnsNames should be acquired through api-server at the first time, because the informer factory has not started yet.
_ = wait.PollUntil(5*time.Second, func() (bool, error) {
werr := wait.PollUntil(5*time.Second, func() (bool, error) {
dnsNames, ips, err = serveraddr.GetYurttunelServerDNSandIP(clientset)
if err != nil {
klog.Errorf("failed to get yurt tunnel server dns and ip, %v", err)
Expand All @@ -89,7 +89,10 @@ func NewYurttunnelServerCertManager(

return true, nil
}, stopCh)
// add user specified DNS names and IP addresses
if werr != nil {
return nil, werr
}
// add user specified DNS anems and IP addresses
dnsNames = append(dnsNames, clCertNames...)
ips = append(ips, clIPs...)
klog.Infof("subject of tunnel server certificate, ips=%#+v, dnsNames=%#+v", ips, dnsNames)
Expand Down
28 changes: 18 additions & 10 deletions pkg/yurthub/network/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
"k8s.io/utils/exec"
utilnet "k8s.io/utils/net"

"github.com/openyurtio/openyurt/pkg/util/iptables"
)
Expand All @@ -40,6 +41,9 @@ type IptablesManager struct {

func NewIptablesManager(dummyIfIP, dummyIfPort string) *IptablesManager {
protocol := iptables.ProtocolIpv4
if utilnet.IsIPv6String(dummyIfIP) {
protocol = iptables.ProtocolIpv6
}
execer := exec.New()
iptInterface := iptables.New(execer, protocol)

Expand All @@ -52,6 +56,10 @@ func NewIptablesManager(dummyIfIP, dummyIfPort string) *IptablesManager {
}

func makeupIptablesRules(ifIP, ifPort string) []iptablesRule {
loopbackAddr := "127.0.0.1"
if utilnet.IsIPv6String(ifIP) {
loopbackAddr = "::1"
}
return []iptablesRule{
// skip connection track for traffic from container to 169.254.2.1:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainPrerouting, []string{"-p", "tcp", "--dport", ifPort, "--destination", ifIP, "-j", "NOTRACK"}},
Expand All @@ -63,16 +71,16 @@ func makeupIptablesRules(ifIP, ifPort string) []iptablesRule {
{iptables.Prepend, iptables.Table("raw"), iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", ifIP, "-j", "NOTRACK"}},
// accept traffic from 169.254.2.1:10261
{iptables.Prepend, iptables.TableFilter, iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", ifIP, "-j", "ACCEPT"}},
// skip connection track for traffic from container to 127.0.0.1:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainPrerouting, []string{"-p", "tcp", "--dport", ifPort, "--destination", "127.0.0.1", "-j", "NOTRACK"}},
// skip connection track for traffic from host network to 127.0.0.1:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainOutput, []string{"-p", "tcp", "--dport", ifPort, "--destination", "127.0.0.1", "-j", "NOTRACK"}},
// accept traffic to 127.0.0.1:10261
{iptables.Prepend, iptables.TableFilter, iptables.ChainInput, []string{"-p", "tcp", "--dport", ifPort, "--destination", "127.0.0.1", "-j", "ACCEPT"}},
// skip connection track for traffic from 127.0.0.1:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", "127.0.0.1", "-j", "NOTRACK"}},
// accept traffic from 127.0.0.1:10261
{iptables.Prepend, iptables.TableFilter, iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", "127.0.0.1", "-j", "ACCEPT"}},
// skip connection track for traffic from container to localhost:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainPrerouting, []string{"-p", "tcp", "--dport", ifPort, "--destination", loopbackAddr, "-j", "NOTRACK"}},
// skip connection track for traffic from host network to localhost:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainOutput, []string{"-p", "tcp", "--dport", ifPort, "--destination", loopbackAddr, "-j", "NOTRACK"}},
// accept traffic to localhost:10261
{iptables.Prepend, iptables.TableFilter, iptables.ChainInput, []string{"-p", "tcp", "--dport", ifPort, "--destination", loopbackAddr, "-j", "ACCEPT"}},
// skip connection track for traffic from localhost:10261
{iptables.Prepend, iptables.Table("raw"), iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", loopbackAddr, "-j", "NOTRACK"}},
// accept traffic from localhost:10261
{iptables.Prepend, iptables.TableFilter, iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", loopbackAddr, "-j", "ACCEPT"}},
}
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/yurttunnel/handlerwrapper/localhostproxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,15 @@ func (plm *localHostProxyMiddleware) WrapHandler(handler http.Handler) http.Hand
req.Header.Set(constants.ProxyHostHeaderKey, nodeName)
}

proxyDest = fmt.Sprintf("127.0.0.1:%s", port)
loopbackAddr := "127.0.0.1"
isIPv6, err := util.IsIPv6Request(req)
if err != nil {
klog.Errorf("failed to parse remote ip: %v", err)
} else if isIPv6 {
loopbackAddr = "::1"
}

proxyDest = net.JoinHostPort(loopbackAddr, port)
oldHost := req.URL.Host
req.Host = proxyDest
req.Header.Set("Host", proxyDest)
Expand Down
18 changes: 14 additions & 4 deletions pkg/yurttunnel/server/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurttunnel/constants"
"github.com/openyurtio/openyurt/pkg/yurttunnel/util"
)

var (
Expand Down Expand Up @@ -64,7 +65,7 @@ func putBufioReader(br *bufio.Reader) {
// prometheus and metric server, setup proxy tunnel to kubelet, sends requests
// through the tunnel and sends responses back to the master
type RequestInterceptor struct {
contextDialer func(addr string, header http.Header, isTLS bool) (net.Conn, error)
contextDialer func(addr string, header http.Header, isTLS, isIPv6 bool) (net.Conn, error)
}

// NewRequestInterceptor creates a interceptor object that intercept request from kube-apiserver
Expand All @@ -74,7 +75,7 @@ func NewRequestInterceptor(udsSockFile string, cfg *tls.Config) *RequestIntercep
}

cfg.InsecureSkipVerify = true
contextDialer := func(addr string, header http.Header, isTLS bool) (net.Conn, error) {
contextDialer := func(addr string, header http.Header, isTLS, isIPv6 bool) (net.Conn, error) {
klog.V(4).Infof("Sending request to %q.", addr)
proxyConn, err := net.Dial("unix", udsSockFile)
if err != nil {
Expand All @@ -88,7 +89,12 @@ func NewRequestInterceptor(udsSockFile string, cfg *tls.Config) *RequestIntercep
}
}

fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s%s\r\n\r\n", addr, "127.0.0.1", connectHeaders)
loopbackAddr := "127.0.0.1"
if isIPv6 {
loopbackAddr = "::1"
}

fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s%s\r\n\r\n", addr, loopbackAddr, connectHeaders)
br := newBufioReader(proxyConn)
defer putBufioReader(br)
res, err := http.ReadResponse(br, nil)
Expand Down Expand Up @@ -140,8 +146,12 @@ func klogAndHTTPError(w http.ResponseWriter, errCode int, format string, i ...in
// ServeHTTP will proxy the request to the tunnel and return response from tunnel back
// to the client
func (ri *RequestInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
isIPv6, err := util.IsIPv6Request(r)
if err != nil {
klog.Errorf("failed to check if request from IPv6 client: %v", err)
}
// 1. setup the tunnel
tunnelConn, err := ri.contextDialer(r.Host, r.Header, r.TLS != nil)
tunnelConn, err := ri.contextDialer(r.Host, r.Header, r.TLS != nil, isIPv6)
if err != nil {
klogAndHTTPError(w, http.StatusServiceUnavailable,
"fail to setup the tunnel: %s", err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/yurttunnel/server/serveraddr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,7 +58,7 @@ func GetTunnelServerAddr(clientset kubernetes.Interface) (string, error) {

for _, tmpIP := range ips {
// we use the first non-loopback IP address.
if tmpIP.String() != "127.0.0.1" {
if s := tmpIP.String(); s != "127.0.0.1" && s != "::1" {
ip = tmpIP
break
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func GetTunnelServerAddr(clientset kubernetes.Interface) (string, error) {
return "", errors.New("fail to get the port number")
}

return fmt.Sprintf("%s:%d", host, tcpPort), nil
return net.JoinHostPort(host, strconv.Itoa(int(tcpPort))), nil
}

// GetYurttunelServerDNSandIP gets DNS names and IPS for generating tunnel server certificate.
Expand Down
33 changes: 24 additions & 9 deletions pkg/yurttunnel/trafficforward/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
)

const (
loopbackAddr = "127.0.0.1"
reqReturnComment = "return request to access node directly"
dnatToTunnelComment = "dnat to tunnel for access node"
yurttunnelServerPortChain = "TUNNEL-PORT"
Expand Down Expand Up @@ -84,6 +83,7 @@ type iptablesManager struct {
nodeInformer coreinformer.NodeInformer
iptables iptables.Interface
execer exec.Interface
loopbackAddr string
conntrackPath string
secureDnatDest string
insecureDnatDest string
Expand All @@ -92,18 +92,18 @@ type iptablesManager struct {
syncPeriod int
}

// NewIptablesManager creates an IptablesManager; deletes old chains, if any;
// NewIptablesManagerWithIPFamily creates an IptablesManager; deletes old chains, if any;
// generates new dnat rules based on IPs of current active nodes; and
// appends the rules to the iptable.
func NewIptablesManager(client clientset.Interface,
func NewIptablesManagerWithIPFamily(client clientset.Interface,
nodeInformer coreinformer.NodeInformer,
listenAddr string,
listenInsecureAddr string,
syncPeriod int) IptablesManager {
syncPeriod int,
ipFamily iptables.Protocol) IptablesManager {

protocol := iptables.ProtocolIpv4
execer := exec.New()
iptInterface := iptables.New(execer, protocol)
iptInterface := iptables.New(execer, ipFamily)

if syncPeriod < defaultSyncPeriod {
syncPeriod = defaultSyncPeriod
Expand All @@ -121,6 +121,12 @@ func NewIptablesManager(client clientset.Interface,
syncPeriod: syncPeriod,
}

if ipFamily == iptables.ProtocolIpv6 {
im.loopbackAddr = "::1"
} else {
im.loopbackAddr = "127.0.0.1"
}

// conntrack setting
conntrackPath, err := im.execer.LookPath("conntrack")
if err != nil {
Expand All @@ -135,6 +141,15 @@ func NewIptablesManager(client clientset.Interface,
return im
}

// NewIptablesManager creates an IptablesManager with ipv4 protocol
func NewIptablesManager(client clientset.Interface,
nodeInformer coreinformer.NodeInformer,
listenAddr string,
listenInsecureAddr string,
syncPeriod int) IptablesManager {
return NewIptablesManagerWithIPFamily(client, nodeInformer, listenAddr, listenInsecureAddr, syncPeriod, iptables.ProtocolIpv4)
}

// Run starts the iptablesManager that will updates dnat rules periodically
func (im *iptablesManager) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
Expand Down Expand Up @@ -361,7 +376,7 @@ func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIP
iptables.Prepend,
iptables.TableNAT, portChain, reqReturnPortIptablesArgs...)
if err != nil {
klog.Errorf("could not ensure -j RETURN iptables rule for %s:%s: %v", ip, port, err)
klog.Errorf("could not ensure -j RETURN iptables rule for %s: %v", net.JoinHostPort(ip, port), err)
return err
}
}
Expand All @@ -382,7 +397,7 @@ func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIP
err = im.iptables.DeleteRule(iptables.TableNAT,
portChain, deletedIPIptablesArgs...)
if err != nil {
klog.Errorf("could not delete old iptables rules for %s:%s: %v", ip, port, err)
klog.Errorf("could not delete old iptables rules for %s: %v", net.JoinHostPort(ip, port), err)
return err
}
}
Expand Down Expand Up @@ -509,7 +524,7 @@ func (im *iptablesManager) syncIptableSetting() {
// check if there are new nodes
nodesIP := im.getIPOfNodesWithoutAgent()
nodesChanged, addedNodesIP, deletedNodesIP := im.getAddedAndDeletedNodes(nodesIP)
currentNodesIP := append(nodesIP, loopbackAddr)
currentNodesIP := append(nodesIP, im.loopbackAddr)

// update the iptables setting if necessary
err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP, portMappings)
Expand Down
10 changes: 10 additions & 0 deletions pkg/yurttunnel/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"

"github.com/openyurtio/openyurt/pkg/profile"
"github.com/openyurtio/openyurt/pkg/projectinfo"
Expand Down Expand Up @@ -176,3 +177,12 @@ func resolvePorts(portsStr, insecurePort string) []string {

return ports
}

// IsIPv6Request returns if request send by an IPv6 client.
func IsIPv6Request(req *http.Request) (bool, error) {
remoteIP, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return false, err
}
return utilnet.IsIPv6String(remoteIP), nil
}

0 comments on commit 20e67d0

Please sign in to comment.