Skip to content

Commit

Permalink
feat: Add flag enable-policy-event-logs
Browse files Browse the repository at this point in the history
Policy event logging is now disabled by default
  • Loading branch information
mycrEEpy committed Sep 9, 2023
1 parent 55936d4 commit 9a08c59
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 53 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ ENV GOPROXY=direct

WORKDIR /workspace

COPY . ./
COPY go.mod go.sum ./
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

COPY . ./

RUN make build-linux

# Vmlinux
Expand Down
4 changes: 2 additions & 2 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func prometheusRegister() {

// NewPolicyEndpointsReconciler constructs new PolicyEndpointReconciler
func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger,
enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) {
enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) {
r := &PolicyEndpointsReconciler{
k8sClient: k8sClient,
log: log,
Expand All @@ -92,7 +92,7 @@ func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger,
var err error
if enableNetworkPolicy {
r.ebpfClient, err = ebpf.NewBpfClient(&r.policyEndpointeBPFContext, r.nodeIP,
enableCloudWatchLogs, enableIPv6, conntrackTTL)
enablePolicyEventLogs, enableCloudWatchLogs, enableIPv6, conntrackTTL)

// Start prometheus
prometheusRegister()
Expand Down
2 changes: 1 addition & 1 deletion controllers/policyendpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestDeriveIngressAndEgressFirewallRules(t *testing.T) {

mockClient := mock_client.NewMockClient(ctrl)
policyEndpointReconciler, _ := NewPolicyEndpointsReconciler(mockClient, logr.New(&log.NullLogSink{}),
false, false, false)
false, false, false, false)
var policyEndpointsList []string
policyEndpointsList = append(policyEndpointsList, tt.policyEndpointName)
policyEndpointReconciler.podIdentifierToPolicyEndpointMap.Store(tt.podIdentifier, policyEndpointsList)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {

ctx := ctrl.SetupSignalHandler()
policyEndpointController, err := controllers.NewPolicyEndpointsReconciler(mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnableCloudWatchLogs,
ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnablePolicyEventLogs, ctrlConfig.EnableCloudWatchLogs,
ctrlConfig.EnableIPv6, ctrlConfig.EnableNetworkPolicy)
if err != nil {
setupLog.Error(err, "unable to setup controller", "controller", "PolicyEndpoints init failed")
Expand Down
6 changes: 5 additions & 1 deletion pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
defaultLogLevel = "info"
defaultLogFile = "/var/log/aws-routed-eni/network-policy-agent.log"
defaultMaxConcurrentReconciles = 3
flagEnablePolicyEventLogs = "enable-policy-event-logs"
flagEnableCloudWatchLogs = "enable-cloudwatch-logs"
flagEnableIPv6 = "enable-ipv6"
flagEnableNetworkPolicy = "enable-network-policy"
Expand All @@ -22,6 +23,8 @@ type ControllerConfig struct {
LogFile string
// MaxConcurrentReconciles specifies the max number of reconcile loops
MaxConcurrentReconciles int
// Enable Policy decision logs
EnablePolicyEventLogs bool
// Enable Policy decision logs streaming to CloudWatch
EnableCloudWatchLogs bool
// Enable IPv6 mode
Expand All @@ -39,7 +42,8 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
"Set the controller log file - if not specified logs are written to stdout")
fs.IntVar(&cfg.MaxConcurrentReconciles, flagMaxConcurrentReconciles, defaultMaxConcurrentReconciles, ""+
"Maximum number of concurrent reconcile loops")
fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch")
fs.BoolVar(&cfg.EnablePolicyEventLogs, flagEnablePolicyEventLogs, false, "If enabled, policy decision logs will be collected & logged")
fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch, requires \"enable-policy-event-logs=true\"")
fs.BoolVar(&cfg.EnableIPv6, flagEnableIPv6, false, "If enabled, Network Policy agent will operate in IPv6 mode")
fs.BoolVar(&cfg.EnableNetworkPolicy, flagEnableNetworkPolicy, false, "If enabled, Network Policy agent will initialize BPF maps and start reconciler")

Expand Down
18 changes: 11 additions & 7 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EbpfFirewallRules struct {
L4Info []v1alpha1.Port
}

func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableCloudWatchLogs bool,
func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool,
enableIPv6 bool, conntrackTTL time.Duration) (*bpfClient, error) {
var conntrackMap goebpfmaps.BpfMap

Expand Down Expand Up @@ -212,13 +212,17 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableClou
ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6, ebpfClient.logger)
ebpfClient.logger.Info("Initialized Conntrack client")

err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..")
sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc()
return nil, err
if enablePolicyEventLogs {
err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..")
sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc()
return nil, err
}
ebpfClient.logger.Info("Configured event logging")
} else {
ebpfClient.logger.Info("Disabled event logging")
}
ebpfClient.logger.Info("Configured event logging")

// Start Conntrack routines
if enableIPv6 {
Expand Down
78 changes: 38 additions & 40 deletions pkg/ebpf/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,48 +175,46 @@ func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCl
// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for {
if record, ok := <-ringbufferdata; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
for record := range ringbufferdata {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}

if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
Expand Down

0 comments on commit 9a08c59

Please sign in to comment.