diff --git a/pkg/loxinet/dpbroker.go b/pkg/loxinet/dpbroker.go index 936289f5..3865c58d 100644 --- a/pkg/loxinet/dpbroker.go +++ b/pkg/loxinet/dpbroker.go @@ -18,15 +18,13 @@ package loxinet import ( "fmt" + cmn "github.com/loxilb-io/loxilb/common" + tk "github.com/loxilb-io/loxilib" "net" "os" "runtime/debug" "sync" "time" - - tk "github.com/loxilb-io/loxilib" - - cmn "github.com/loxilb-io/loxilb/common" ) // man names constants @@ -43,6 +41,10 @@ const ( MapNameFw4 = "FW4" ) +const ( + UseRPCPeer = false +) + // error codes const ( DpErrBase = iota - 103000 @@ -449,6 +451,8 @@ type DpHookInterface interface { DpCtDel(w *DpCtInfo) int DpSockVIPAdd(w *SockVIPDpWorkQ) int DpSockVIPDel(w *SockVIPDpWorkQ) int + DpCnodeAdd(w *PeerDpWorkQ) int + DpCnodeDel(w *PeerDpWorkQ) int DpTableGC() DpCtGetAsync() DpGetLock() @@ -514,6 +518,8 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { var ret int var err error + return 0 + dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() @@ -769,27 +775,35 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT { // DpWorkOnPeerOp - routine to work on a peer request for clustering func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT { if pWq.Work == DpCreate { - var newPeer DpPeer - for _, pe := range dp.Peers { - if pe.Peer.Equal(pWq.PeerIP) { - return DpCreateErr + if UseRPCPeer { + var newPeer DpPeer + for _, pe := range dp.Peers { + if pe.Peer.Equal(pWq.PeerIP) { + return DpCreateErr + } } + newPeer.Peer = pWq.PeerIP + dp.Peers = append(dp.Peers, newPeer) + tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) + return 0 + } else { + return dp.DpHooks.DpCnodeAdd(pWq) } - newPeer.Peer = pWq.PeerIP - dp.Peers = append(dp.Peers, newPeer) - tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) - return 0 } else if pWq.Work == DpRemove { - for idx := range dp.Peers { - pe := &dp.Peers[idx] - if pe.Peer.Equal(pWq.PeerIP) { - if pe.Client != nil { - dp.RPC.RPCHooks.RPCClose(pe) + if UseRPCPeer { + for idx := range dp.Peers { + pe := &dp.Peers[idx] + if pe.Peer.Equal(pWq.PeerIP) { + if pe.Client != nil { + dp.RPC.RPCHooks.RPCClose(pe) + } + dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) + tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) + return 0 } - dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) - tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) - return 0 } + } else { + return dp.DpHooks.DpCnodeDel(pWq) } } diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 40e82308..3ac62691 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -268,14 +268,30 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) { } // DpEbpfInit - initialize the ebpf dp subsystem -func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { +func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { var cfg C.struct_ebpfcfg - if clusterEn { - cfg.have_mtrace = 1 - } else { - cfg.have_mtrace = 0 - } + //cNodes := strings.Split(clusterNodes, ",") + //for i, cNode := range cNodes { + // addr := net.ParseIP(cNode) + // if addr == nil { + // continue + // } + // if utils.IsIPHostAddr(cNode) { + // continue + // } + // if i == 0 { + // cfg.cluster1 = C.CString(cNode) + // } else if i == 1 { + // cfg.cluster2 = C.CString(cNode) + // } + //} + + //if len(clusterEn) > 0 { + // cfg.have_mtrace = 1 + //} else { + // cfg.have_mtrace = 0 + //} if egrHooks { cfg.egr_hooks = 1 } else { @@ -1064,7 +1080,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int { nxfa.inactive = 1 } - dat.nxfrm = C.uchar(len(w.endPoints)) + dat.nxfrm = C.ushort(len(w.endPoints)) if w.CsumDis { dat.cdis = 1 } else { @@ -1927,6 +1943,38 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int { return ec } +// DpCnodeAdd - routine to work on adding a cnode +func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int { + cnode := w.PeerIP.String() + + cnodeStr := C.CString(cnode) + defer C.free(unsafe.Pointer(cnodeStr)) + + ec := int(C.llb_add_cnode(cnodeStr)) + if ec != 0 { + *w.Status = DpCreateErr + } else { + *w.Status = 0 + } + return ec +} + +// DpCnodeDel - routine to work on deleting a cnode +func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int { + cnode := w.PeerIP.String() + + cnodeStr := C.CString(cnode) + defer C.free(unsafe.Pointer(cnodeStr)) + + ec := int(C.llb_delete_cnode(cnodeStr)) + if ec != 0 { + *w.Status = DpRemoveErr + } else { + *w.Status = 0 + } + return ec +} + //export goMapNotiHandler func goMapNotiHandler(m *mapNoti) { diff --git a/pkg/loxinet/loxinet.go b/pkg/loxinet/loxinet.go index 5421c30c..167d6781 100644 --- a/pkg/loxinet/loxinet.go +++ b/pkg/loxinet/loxinet.go @@ -292,7 +292,7 @@ func loxiNetInit() { RunCommand(MkMountCG2, false) } // Initialize the ebpf datapath subsystem - mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) + mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode) // Initialize the security zone subsystem diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index db298bb6..52e6bea2 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,7 +78,8 @@ const ( // constants const ( - MaxLBEndPoints = 24 + MaxLBEndPoints = 1500 + MaxLBEndPointsRR = 32 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off DflLbaCheckTimeout = 10 // Default timeout for checking LB arms @@ -90,7 +91,7 @@ const ( LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health EndPointCheckerDuration = 2 // Duration at which ep-helpers will run - MaxEndPointSweeps = 20 // Maximum end-point sweeps per round + MaxEndPointSweeps = 40 // Maximum end-point sweeps per round VIPSweepDuration = 30 // Duration of periodic VIP maintenance DefaultPersistTimeOut = 10800 // Default persistent LB session timeout SnatFwMark = 0x80000000 // Snat Marker @@ -910,7 +911,8 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv pType = HostProbeConnectTCP pPort = nep.xPort } else if r.tuples.l4Prot.val == 17 { - pType = HostProbeConnectUDP + //pType = HostProbeConnectUDP + pType = HostProbeConnectTCP // FIXME pPort = nep.xPort } else if r.tuples.l4Prot.val == 1 { pType = HostProbePing @@ -1215,7 +1217,7 @@ func (R *RuleH) mkHostAssocs(r *ruleEnt) bool { } for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && !ipnet.IP.IsUnspecified() { // check if IPv4 or IPv6 is not nil if ipnet.IP.To4() != nil || ipnet.IP.To16() != nil { if tk.IsNetIPv4(ipnet.IP.String()) && r.tuples.l3Dst.addr.IP.String() != ipnet.IP.String() { @@ -1259,7 +1261,8 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool { if rule.tuples.l4Prot.val == 6 { sType = HostProbeConnectTCP } else if rule.tuples.l4Prot.val == 17 { - sType = HostProbeConnectUDP + //sType = HostProbeConnectUDP + sType = HostProbeConnectTCP // FIXME } else if rule.tuples.l4Prot.val == 1 { sType = HostProbePing } else if rule.tuples.l4Prot.val == 132 { @@ -1571,6 +1574,12 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range error") } + if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections || + serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) && + len(servEndPoints) > MaxLBEndPointsRR { + return RuleEpCountErr, errors.New("endpoints-range1 error") + } + // Validate persist timeout if serv.Sel == cmn.LbSelRrPersist { if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { @@ -1855,6 +1864,7 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al r.ci = cmn.CIDefault r.privIP = privIP r.pTO = serv.PersistTimeout + r.locIPs = make(map[string]struct{}) if !serv.Snat { @@ -2920,19 +2930,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { if at.sel == cmn.LbSelPrio { j := 0 k := 0 - var small [MaxLBEndPoints]int - var neps [MaxLBEndPoints]ruleLBEp + var small [MaxLBEndPointsRR]int + var neps [MaxLBEndPointsRR]ruleLBEp for i, ep := range at.endPoints { if ep.inActiveEP { continue } oEp := &at.endPoints[i] - sw := (int(ep.weight) * MaxLBEndPoints) / 100 + sw := (int(ep.weight) * MaxLBEndPointsRR) / 100 if sw == 0 { small[k] = i k++ } - for x := 0; x < sw && j < MaxLBEndPoints; x++ { + for x := 0; x < sw && j < MaxLBEndPointsRR; x++ { neps[j].xIP = oEp.xIP neps[j].rIP = oEp.rIP neps[j].xPort = oEp.xPort @@ -2945,12 +2955,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { j++ } } - if j < MaxLBEndPoints { + if j < MaxLBEndPointsRR { v := 0 if k == 0 { k = len(at.endPoints) } - for j < MaxLBEndPoints { + for j < MaxLBEndPointsRR { idx := small[v%k] oEp := &at.endPoints[idx] neps[j].xIP = oEp.xIP @@ -3009,8 +3019,10 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { return -1 } - mh.dp.ToDpCh <- nWork - r.VIP2DP(nWork.Work) + if !nWork.ServiceIP.IsUnspecified() { + mh.dp.ToDpCh <- nWork + r.VIP2DP(nWork.Work) + } if mode == cmn.LBModeHostOneArm { for locIP := range r.locIPs { diff --git a/pkg/loxinet/xsync_server.go b/pkg/loxinet/xsync_server.go index 02fca650..d4e9ddee 100644 --- a/pkg/loxinet/xsync_server.go +++ b/pkg/loxinet/xsync_server.go @@ -26,6 +26,7 @@ import ( "net/rpc" "os" "runtime/debug" + "time" opts "github.com/loxilb-io/loxilb/options" tk "github.com/loxilb-io/loxilib" @@ -213,6 +214,10 @@ func LoxiXsyncMain(mode string) { return } + for { + time.Sleep(1 * time.Second) + } + // Stack trace logger defer func() { if e := recover(); e != nil {