Skip to content

Commit

Permalink
Events refactor (#30) - cherry-pick (#33)
Browse files Browse the repository at this point in the history
* Events refactor (#30)

* Remove replace and add comments

* Minor refactor

* Update AL2023 image

* vmlinux generation

* Pin to sdk 1.0
  • Loading branch information
jayanthvn authored Aug 28, 2023
1 parent 3bd91c5 commit 3643136
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 167 deletions.
14 changes: 13 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ RUN go mod download

RUN make build-linux

# Vmlinux
FROM public.ecr.aws/amazonlinux/amazonlinux:2023 as vmlinuxbuilder
WORKDIR /vmlinuxbuilder
RUN yum update -y && \
yum install -y iproute procps-ng && \
yum install -y llvm clang make gcc && \
yum install -y kernel-devel elfutils-libelf-devel zlib-devel libbpf-devel bpftool && \
yum clean all
COPY . ./
RUN make vmlinuxh

# Build BPF
FROM public.ecr.aws/eks-distro-build-tooling/eks-distro-base:latest-al23 as bpfbuilder
FROM public.ecr.aws/amazonlinux/amazonlinux:2 as bpfbuilder
WORKDIR /bpfbuilder
RUN yum update -y && \
yum install -y iproute procps-ng && \
Expand All @@ -25,6 +36,7 @@ RUN yum update -y && \
yum clean all

COPY . ./
COPY --from=vmlinuxbuilder /vmlinuxbuilder/pkg/ebpf/c/vmlinux.h .
RUN make build-bpf

FROM public.ecr.aws/eks-distro-build-tooling/eks-distro-base:latest.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (l *bpfClient) detachIngressBPFProbe(hostVethName string) error {
l.logger.Info("Attempting to do an Ingress Detach")
var err error
err = goebpf.TCEgressDetach(hostVethName)
if err != nil && !utils.IsInvalidFilterListError(err.Error()) &&
if err != nil &&
!utils.IsMissingFilterError(err.Error()) {
l.logger.Info("Ingress Detach failed:", "error", err)
return err
Expand All @@ -548,7 +548,7 @@ func (l *bpfClient) detachEgressBPFProbe(hostVethName string) error {
l.logger.Info("Attempting to do an Egress Detach")
var err error
err = goebpf.TCIngressDetach(hostVethName)
if err != nil && !utils.IsInvalidFilterListError(err.Error()) &&
if err != nil &&
!utils.IsMissingFilterError(err.Error()) {
l.logger.Info("Ingress Detach failed:", "error", err)
return err
Expand Down
252 changes: 92 additions & 160 deletions pkg/ebpf/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/aws/aws-network-policy-agent/pkg/aws"
Expand All @@ -32,7 +31,7 @@ var (
NON_EKS_CW_PATH = "/aws/"
)

type Event_t struct {
type ringBufferDataV4_t struct {
SourceIP uint32
SourcePort uint32
DestIP uint32
Expand All @@ -41,7 +40,7 @@ type Event_t struct {
Verdict uint32
}

type EventV6_t struct {
type ringBufferDataV6_t struct {
SourceIP [16]byte
SourcePort uint32
DestIP [16]byte
Expand All @@ -50,10 +49,6 @@ type EventV6_t struct {
Verdict uint32
}

type EvProgram struct {
wg sync.WaitGroup
}

func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool, mapFD int, enableIPv6 bool) error {
// Enable logging and setup ring buffer
if mapFD <= 0 {
Expand All @@ -69,8 +64,7 @@ func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool,
return err
} else {
logger.Info("Configure Event loop ... ")
p := EvProgram{wg: sync.WaitGroup{}}
p.capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
if enableCloudWatchLogs {
logger.Info("Cloudwatch log support is enabled")
err = setupCW(logger)
Expand Down Expand Up @@ -112,182 +106,120 @@ func setupCW(logger logr.Logger) error {
return nil
}

func (p *EvProgram) capturePolicyV6Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

for {
if b, ok := <-events; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
func getProtocol(protocolNum int) string {
protocolStr := "UNKNOWN"
if protocolNum == utils.TCP_PROTOCOL_NUMBER {
protocolStr = "TCP"
} else if protocolNum == utils.UDP_PROTOCOL_NUMBER {
protocolStr = "UDP"
} else if protocolNum == utils.SCTP_PROTOCOL_NUMBER {
protocolStr = "SCTP"
} else if protocolNum == utils.ICMP_PROTOCOL_NUMBER {
protocolStr = "ICMP"
}
return protocolStr
}

var ev EventV6_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
func getVerdict(verdict int) string {
verdictStr := "DENY"
if verdict == utils.ACCEPT.Index() {
verdictStr = "ACCEPT"
} else if verdict == utils.EXPIRED_DELETED.Index() {
verdictStr = "EXPIRED/DELETED"
}
return verdictStr
}

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
func publishDataToCloudwatch(logQueue []*cloudwatchlogs.InputLogEvent, message string, log logr.Logger) bool {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending logs to CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(ev.SourceIP).String(), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteToIPv6(ev.DestIP).String(), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
input = *input.SetLogStreamName(logStreamName)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(ev.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(ev.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Push log events", "Failed ", err)
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
break
}
}
}
if resp != nil {
sequenceToken = *resp.NextSequenceToken
}
}(events)

logQueue = []*cloudwatchlogs.InputLogEvent{}
return false
}
return true
}

func (p *EvProgram) capturePolicyV4Events(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool) {
func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
nodeName := os.Getenv("MY_NODE_NAME")
go func(events <-chan []byte) {
defer p.wg.Done()

// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for {
if b, ok := <-events; ok {
if record, ok := <-ringbufferdata; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

var ev Event_t
buf := bytes.NewBuffer(b)
if err := binary.Read(buf, binary.LittleEndian, &ev); err != nil {
log.Info("Read Ring buf", "Failed ", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

protocol := "UNKNOWN"
if int(ev.Protocol) == utils.TCP_PROTOCOL_NUMBER {
protocol = "TCP"
} else if int(ev.Protocol) == utils.UDP_PROTOCOL_NUMBER {
protocol = "UDP"
} else if int(ev.Protocol) == utils.SCTP_PROTOCOL_NUMBER {
protocol = "SCTP"
} else if int(ev.Protocol) == utils.ICMP_PROTOCOL_NUMBER {
protocol = "ICMP"
}
log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

verdict := "DENY"
if ev.Verdict == 1 {
verdict = "ACCEPT"
} else if ev.Verdict == 2 {
verdict = "EXPIRED/DELETED"
}
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(ev.SourceIP), "Src Port", ev.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(ev.DestIP), "Dest Port", ev.DestPort,
"Proto", protocol, "Verdict", verdict)
log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message := "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(ev.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(ev.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(ev.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(ev.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}

if enableCloudWatchLogs {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}

if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}

input = *input.SetLogStreamName(logStreamName)

resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Kprobe", "Failed ", err)
}

if resp != nil {
sequenceToken = *resp.NextSequenceToken
}

logQueue = []*cloudwatchlogs.InputLogEvent{}
} else {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
}
}(events)
}

func (p *EvProgram) capturePolicyEvents(events <-chan []byte, log logr.Logger, enableCloudWatchLogs bool,
enableIPv6 bool) {
p.wg.Add(1)

if enableIPv6 {
p.capturePolicyV6Events(events, log, enableCloudWatchLogs)
} else {
p.capturePolicyV4Events(events, log, enableCloudWatchLogs)
}
}(ringbufferdata)
}

func ensureLogGroupExists(name string) error {
Expand Down
Loading

0 comments on commit 3643136

Please sign in to comment.