Skip to content

Commit

Permalink
Merge branch 'persist-2' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder authored Jan 1, 2025
2 parents ddf7def + e289ba6 commit 190a1e9
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 41 deletions.
54 changes: 34 additions & 20 deletions pkg/loxinet/dpbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package loxinet

import (
"fmt"
cmn "github.com/loxilb-io/loxilb/common"
tk "github.com/loxilb-io/loxilib"
"net"
"os"
"runtime/debug"
"sync"
"time"

tk "github.com/loxilb-io/loxilib"

cmn "github.com/loxilb-io/loxilb/common"
)

// man names constants
Expand All @@ -43,6 +41,10 @@ const (
MapNameFw4 = "FW4"
)

const (
UseRPCPeer = false
)

// error codes
const (
DpErrBase = iota - 103000
Expand Down Expand Up @@ -449,6 +451,8 @@ type DpHookInterface interface {
DpCtDel(w *DpCtInfo) int
DpSockVIPAdd(w *SockVIPDpWorkQ) int
DpSockVIPDel(w *SockVIPDpWorkQ) int
DpCnodeAdd(w *PeerDpWorkQ) int
DpCnodeDel(w *PeerDpWorkQ) int
DpTableGC()
DpCtGetAsync()
DpGetLock()
Expand Down Expand Up @@ -514,6 +518,8 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int {
var ret int
var err error

return 0

dp.SyncMtx.Lock()
defer dp.SyncMtx.Unlock()

Expand Down Expand Up @@ -769,27 +775,35 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT {
// DpWorkOnPeerOp - routine to work on a peer request for clustering
func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT {
if pWq.Work == DpCreate {
var newPeer DpPeer
for _, pe := range dp.Peers {
if pe.Peer.Equal(pWq.PeerIP) {
return DpCreateErr
if UseRPCPeer {
var newPeer DpPeer
for _, pe := range dp.Peers {
if pe.Peer.Equal(pWq.PeerIP) {
return DpCreateErr
}
}
newPeer.Peer = pWq.PeerIP
dp.Peers = append(dp.Peers, newPeer)
tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String())
return 0
} else {
return dp.DpHooks.DpCnodeAdd(pWq)
}
newPeer.Peer = pWq.PeerIP
dp.Peers = append(dp.Peers, newPeer)
tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String())
return 0
} else if pWq.Work == DpRemove {
for idx := range dp.Peers {
pe := &dp.Peers[idx]
if pe.Peer.Equal(pWq.PeerIP) {
if pe.Client != nil {
dp.RPC.RPCHooks.RPCClose(pe)
if UseRPCPeer {
for idx := range dp.Peers {
pe := &dp.Peers[idx]
if pe.Peer.Equal(pWq.PeerIP) {
if pe.Client != nil {
dp.RPC.RPCHooks.RPCClose(pe)
}
dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...)
tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String())
return 0
}
dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...)
tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String())
return 0
}
} else {
return dp.DpHooks.DpCnodeDel(pWq)
}
}

Expand Down
62 changes: 55 additions & 7 deletions pkg/loxinet/dpebpf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,30 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) {
}

// DpEbpfInit - initialize the ebpf dp subsystem
func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH {
func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH {
var cfg C.struct_ebpfcfg

if clusterEn {
cfg.have_mtrace = 1
} else {
cfg.have_mtrace = 0
}
//cNodes := strings.Split(clusterNodes, ",")
//for i, cNode := range cNodes {
// addr := net.ParseIP(cNode)
// if addr == nil {
// continue
// }
// if utils.IsIPHostAddr(cNode) {
// continue
// }
// if i == 0 {
// cfg.cluster1 = C.CString(cNode)
// } else if i == 1 {
// cfg.cluster2 = C.CString(cNode)
// }
//}

//if len(clusterEn) > 0 {
// cfg.have_mtrace = 1
//} else {
// cfg.have_mtrace = 0
//}
if egrHooks {
cfg.egr_hooks = 1
} else {
Expand Down Expand Up @@ -1064,7 +1080,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int {
nxfa.inactive = 1
}

dat.nxfrm = C.uchar(len(w.endPoints))
dat.nxfrm = C.ushort(len(w.endPoints))
if w.CsumDis {
dat.cdis = 1
} else {
Expand Down Expand Up @@ -1927,6 +1943,38 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int {
return ec
}

// DpCnodeAdd - routine to work on adding a cnode
func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int {
cnode := w.PeerIP.String()

cnodeStr := C.CString(cnode)
defer C.free(unsafe.Pointer(cnodeStr))

ec := int(C.llb_add_cnode(cnodeStr))

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / udp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / sctp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / udp-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / advanced-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / basic-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / sctp-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / scale-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / ipsec-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / nat66-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / advanced-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / scale-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / tcp-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / liveness-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / ipsec-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / nat66-lb-sanity

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / tcp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / basic-sanity-ubuntu-22

could not determine kind of name for C.llb_add_cnode

Check failure on line 1953 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / k3s-base-sanity

could not determine kind of name for C.llb_add_cnode
if ec != 0 {
*w.Status = DpCreateErr
} else {
*w.Status = 0
}
return ec
}

// DpCnodeDel - routine to work on deleting a cnode
func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int {
cnode := w.PeerIP.String()

cnodeStr := C.CString(cnode)
defer C.free(unsafe.Pointer(cnodeStr))

ec := int(C.llb_delete_cnode(cnodeStr))

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / udp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / sctp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / udp-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / advanced-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / basic-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / sctp-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / scale-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / ipsec-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / nat66-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / advanced-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / scale-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / tcp-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / liveness-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / ipsec-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / nat66-lb-sanity

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / tcp-lb-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / basic-sanity-ubuntu-22

could not determine kind of name for C.llb_delete_cnode

Check failure on line 1969 in pkg/loxinet/dpebpf_linux.go

View workflow job for this annotation

GitHub Actions / k3s-base-sanity

could not determine kind of name for C.llb_delete_cnode
if ec != 0 {
*w.Status = DpRemoveErr
} else {
*w.Status = 0
}
return ec
}

//export goMapNotiHandler
func goMapNotiHandler(m *mapNoti) {

Expand Down
2 changes: 1 addition & 1 deletion pkg/loxinet/loxinet.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func loxiNetInit() {
RunCommand(MkMountCG2, false)
}
// Initialize the ebpf datapath subsystem
mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel)
mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel)
mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode)

// Initialize the security zone subsystem
Expand Down
38 changes: 25 additions & 13 deletions pkg/loxinet/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ const (

// constants
const (
MaxLBEndPoints = 24
MaxLBEndPoints = 1500
MaxLBEndPointsRR = 32
DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off
MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off
DflLbaCheckTimeout = 10 // Default timeout for checking LB arms
Expand All @@ -90,7 +91,7 @@ const (
LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions
MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health
EndPointCheckerDuration = 2 // Duration at which ep-helpers will run
MaxEndPointSweeps = 20 // Maximum end-point sweeps per round
MaxEndPointSweeps = 40 // Maximum end-point sweeps per round
VIPSweepDuration = 30 // Duration of periodic VIP maintenance
DefaultPersistTimeOut = 10800 // Default persistent LB session timeout
SnatFwMark = 0x80000000 // Snat Marker
Expand Down Expand Up @@ -910,7 +911,8 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv
pType = HostProbeConnectTCP
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 17 {
pType = HostProbeConnectUDP
//pType = HostProbeConnectUDP
pType = HostProbeConnectTCP // FIXME
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 1 {
pType = HostProbePing
Expand Down Expand Up @@ -1215,7 +1217,7 @@ func (R *RuleH) mkHostAssocs(r *ruleEnt) bool {
}

for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && !ipnet.IP.IsUnspecified() {
// check if IPv4 or IPv6 is not nil
if ipnet.IP.To4() != nil || ipnet.IP.To16() != nil {
if tk.IsNetIPv4(ipnet.IP.String()) && r.tuples.l3Dst.addr.IP.String() != ipnet.IP.String() {
Expand Down Expand Up @@ -1259,7 +1261,8 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool {
if rule.tuples.l4Prot.val == 6 {
sType = HostProbeConnectTCP
} else if rule.tuples.l4Prot.val == 17 {
sType = HostProbeConnectUDP
//sType = HostProbeConnectUDP
sType = HostProbeConnectTCP // FIXME
} else if rule.tuples.l4Prot.val == 1 {
sType = HostProbePing
} else if rule.tuples.l4Prot.val == 132 {
Expand Down Expand Up @@ -1571,6 +1574,12 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al
return RuleEpCountErr, errors.New("endpoints-range error")
}

if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections ||
serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) &&
len(servEndPoints) > MaxLBEndPointsRR {
return RuleEpCountErr, errors.New("endpoints-range1 error")
}

// Validate persist timeout
if serv.Sel == cmn.LbSelRrPersist {
if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 {
Expand Down Expand Up @@ -1855,6 +1864,7 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al
r.ci = cmn.CIDefault
r.privIP = privIP
r.pTO = serv.PersistTimeout

r.locIPs = make(map[string]struct{})

if !serv.Snat {
Expand Down Expand Up @@ -2920,19 +2930,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int {
if at.sel == cmn.LbSelPrio {
j := 0
k := 0
var small [MaxLBEndPoints]int
var neps [MaxLBEndPoints]ruleLBEp
var small [MaxLBEndPointsRR]int
var neps [MaxLBEndPointsRR]ruleLBEp
for i, ep := range at.endPoints {
if ep.inActiveEP {
continue
}
oEp := &at.endPoints[i]
sw := (int(ep.weight) * MaxLBEndPoints) / 100
sw := (int(ep.weight) * MaxLBEndPointsRR) / 100
if sw == 0 {
small[k] = i
k++
}
for x := 0; x < sw && j < MaxLBEndPoints; x++ {
for x := 0; x < sw && j < MaxLBEndPointsRR; x++ {
neps[j].xIP = oEp.xIP
neps[j].rIP = oEp.rIP
neps[j].xPort = oEp.xPort
Expand All @@ -2945,12 +2955,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int {
j++
}
}
if j < MaxLBEndPoints {
if j < MaxLBEndPointsRR {
v := 0
if k == 0 {
k = len(at.endPoints)
}
for j < MaxLBEndPoints {
for j < MaxLBEndPointsRR {
idx := small[v%k]
oEp := &at.endPoints[idx]
neps[j].xIP = oEp.xIP
Expand Down Expand Up @@ -3009,8 +3019,10 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int {
return -1
}

mh.dp.ToDpCh <- nWork
r.VIP2DP(nWork.Work)
if !nWork.ServiceIP.IsUnspecified() {
mh.dp.ToDpCh <- nWork
r.VIP2DP(nWork.Work)
}

if mode == cmn.LBModeHostOneArm {
for locIP := range r.locIPs {
Expand Down
5 changes: 5 additions & 0 deletions pkg/loxinet/xsync_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/rpc"
"os"
"runtime/debug"
"time"

opts "github.com/loxilb-io/loxilb/options"
tk "github.com/loxilb-io/loxilib"
Expand Down Expand Up @@ -213,6 +214,10 @@ func LoxiXsyncMain(mode string) {
return
}

for {
time.Sleep(1 * time.Second)
}

// Stack trace logger
defer func() {
if e := recover(); e != nil {
Expand Down

0 comments on commit 190a1e9

Please sign in to comment.