Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR- Improved udp probe scalability #29

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/basic-sanity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ jobs:
- uses: actions/setup-go@v3
with:
go-version: '>=1.17.0'
- run: go test .
- run: go test -bench=.
- run: sudo go test .
- run: sudo go test -bench=.
7 changes: 6 additions & 1 deletion lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net"
"testing"

"github.com/loxilb-io/sctp"
)

Expand Down Expand Up @@ -1036,4 +1035,10 @@ func TestProber(t *testing.T) {

sOk = L4ServiceProber("udp", "127.0.0.1:12234", "", "", "")
t.Logf("udp prober test2 %v\n", sOk)

sOk = L4ServiceProber("udp", "127.0.0.1:8080", "", "", "")
t.Logf("udp prober test3 %v\n", sOk)

sOk = L4ServiceProber("udp", "192.168.20.55:2234", "", "", "")
t.Logf("udp prober test4 %v\n\n\n", sOk)
}
121 changes: 94 additions & 27 deletions serviceprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,79 @@ import (
"golang.org/x/net/ipv4"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
)

// SvcWait - Channel to wait for service reply
type SvcWait struct {
wait chan bool
}

// SvcKey - Service Key
type SvcKey struct {
Dst string
Port int
}

var (
icmpRunner chan bool
svcLock sync.RWMutex
svcs map[SvcKey]*SvcWait
)

func waitForBoolChannelOrTimeout(ch <-chan bool, timeout time.Duration) (bool, bool) {
select {
case val := <-ch:
return val, true
case <-time.After(timeout):
return false, false
}
}

func listenForICMPUNreachable() {

// Open a raw socket to listen for ICMP messages
rc, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
os.Exit(1)
}
defer rc.Close()
pktData := make([]byte, 1500)
//rc.SetDeadline(time.Now().Add(5 * time.Second))
icmpRunner <- true
for {
plen, _, err := rc.ReadFrom(pktData)
if err != nil {
continue
}

icmpNr, err := icmp.ParseMessage(1, pktData)
if err != nil {
continue
}
if icmpNr.Code == 3 && plen >= 8+20+8 {
iph, err := ipv4.ParseHeader(pktData[8:])
if err != nil {
continue
}

if iph.Protocol == 17 {
dport := int(binary.BigEndian.Uint16(pktData[30:32]))
svcLock.Lock()
key := SvcKey{Dst: iph.Dst.String(), Port: dport}
if svcWait := svcs[key]; svcWait != nil {
svcWait.wait <- true
}
svcLock.Unlock()
}
}
}
}

// HTTPProber - Do a http probe for given url
// returns true/false depending on whether probing was successful
func HTTPProber(urls string) bool {
Expand All @@ -35,6 +103,16 @@ func HTTPProber(urls string) bool {
// resp is the response expected from server (empty for none)
// returns true/false depending on whether probing was successful
func L4ServiceProber(sType string, sName string, sHint, req, resp string) bool {

svcLock.Lock()
if svcs == nil {
icmpRunner = make(chan bool)
svcs = map[SvcKey]*SvcWait{}
go listenForICMPUNreachable()
<-icmpRunner
}
svcLock.Unlock()

sOk := false
timeout := 1 * time.Second

Expand Down Expand Up @@ -145,47 +223,36 @@ func L4ServiceProber(sType string, sName string, sHint, req, resp string) bool {
return false
}
} else if sType == "udp" {
var lc net.ListenConfig
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(3*time.Second))
defer cancel()
rc, err := lc.ListenPacket(ctx, "ip4:1", "0.0.0.0")
if err != nil {
return sOk

svcLock.Lock()
key := SvcKey{Dst: svcPair[0], Port: svcPort}
svcWait := svcs[key]
if svcWait == nil {
svcWait = &SvcWait{wait: make(chan bool)}
svcs[key] = svcWait
}
defer rc.Close()
svcLock.Unlock()

c.SetDeadline(time.Now().Add(1 * time.Second))
sOk = true
_, err = c.Write([]byte("probe"))
if err != nil {
return false
}
pktData := make([]byte, 1500)
rc.SetDeadline(time.Now().Add(1 * time.Second))
_, err = c.Read(pktData)
if err == nil {
return sOk
}

plen, _, err := rc.ReadFrom(pktData)
if err != nil {
return sOk
}
icmpNr, err := icmp.ParseMessage(1, pktData)
if err != nil {
return sOk
}
if icmpNr.Code == 3 && plen >= 8+20+8 {
iph, err := ipv4.ParseHeader(pktData[8:])
if err != nil {
return sOk
}
if iph.Dst.String() == svcPair[0] && iph.Protocol == 17 {
dport := int(binary.BigEndian.Uint16(pktData[30:32]))
if dport == svcPort {
sOk = false
}
}
_, unRch := waitForBoolChannelOrTimeout(svcWait.wait, 1*time.Second)
if unRch {
sOk = false
}

svcLock.Lock()
delete(svcs, key)
svcLock.Unlock()
}

return sOk
Expand Down
Loading