From 9a08c59bce4dd11c4a8e17fe6790006027d273d1 Mon Sep 17 00:00:00 2001 From: Tobias Germer Date: Sat, 2 Sep 2023 14:20:43 +0200 Subject: [PATCH] feat: Add flag enable-policy-event-logs Policy event logging is now disabled by default --- Dockerfile | 4 +- controllers/policyendpoints_controller.go | 4 +- .../policyendpoints_controller_test.go | 2 +- main.go | 2 +- pkg/config/controller_config.go | 6 +- pkg/ebpf/bpf_client.go | 18 +++-- pkg/ebpf/events/events.go | 78 +++++++++---------- 7 files changed, 61 insertions(+), 53 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8be8f0b..e66ebbe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,11 +8,13 @@ ENV GOPROXY=direct WORKDIR /workspace -COPY . ./ +COPY go.mod go.sum ./ # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer RUN go mod download +COPY . ./ + RUN make build-linux # Vmlinux diff --git a/controllers/policyendpoints_controller.go b/controllers/policyendpoints_controller.go index dc01a42..3fb0f6d 100644 --- a/controllers/policyendpoints_controller.go +++ b/controllers/policyendpoints_controller.go @@ -77,7 +77,7 @@ func prometheusRegister() { // NewPolicyEndpointsReconciler constructs new PolicyEndpointReconciler func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger, - enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) { + enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) { r := &PolicyEndpointsReconciler{ k8sClient: k8sClient, log: log, @@ -92,7 +92,7 @@ func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger, var err error if enableNetworkPolicy { r.ebpfClient, err = ebpf.NewBpfClient(&r.policyEndpointeBPFContext, r.nodeIP, - enableCloudWatchLogs, enableIPv6, conntrackTTL) + enablePolicyEventLogs, enableCloudWatchLogs, enableIPv6, conntrackTTL) // Start prometheus prometheusRegister() diff --git a/controllers/policyendpoints_controller_test.go b/controllers/policyendpoints_controller_test.go index 9931156..115d30d 100644 --- a/controllers/policyendpoints_controller_test.go +++ b/controllers/policyendpoints_controller_test.go @@ -329,7 +329,7 @@ func TestDeriveIngressAndEgressFirewallRules(t *testing.T) { mockClient := mock_client.NewMockClient(ctrl) policyEndpointReconciler, _ := NewPolicyEndpointsReconciler(mockClient, logr.New(&log.NullLogSink{}), - false, false, false) + false, false, false, false) var policyEndpointsList []string policyEndpointsList = append(policyEndpointsList, tt.policyEndpointName) policyEndpointReconciler.podIdentifierToPolicyEndpointMap.Store(tt.podIdentifier, policyEndpointsList) diff --git a/main.go b/main.go index 46e7e36..f99b232 100644 --- a/main.go +++ b/main.go @@ -91,7 +91,7 @@ func main() { ctx := ctrl.SetupSignalHandler() policyEndpointController, err := controllers.NewPolicyEndpointsReconciler(mgr.GetClient(), - ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnableCloudWatchLogs, + ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnablePolicyEventLogs, ctrlConfig.EnableCloudWatchLogs, ctrlConfig.EnableIPv6, ctrlConfig.EnableNetworkPolicy) if err != nil { setupLog.Error(err, "unable to setup controller", "controller", "PolicyEndpoints init failed") diff --git a/pkg/config/controller_config.go b/pkg/config/controller_config.go index 59bac53..ea24617 100644 --- a/pkg/config/controller_config.go +++ b/pkg/config/controller_config.go @@ -9,6 +9,7 @@ const ( defaultLogLevel = "info" defaultLogFile = "/var/log/aws-routed-eni/network-policy-agent.log" defaultMaxConcurrentReconciles = 3 + flagEnablePolicyEventLogs = "enable-policy-event-logs" flagEnableCloudWatchLogs = "enable-cloudwatch-logs" flagEnableIPv6 = "enable-ipv6" flagEnableNetworkPolicy = "enable-network-policy" @@ -22,6 +23,8 @@ type ControllerConfig struct { LogFile string // MaxConcurrentReconciles specifies the max number of reconcile loops MaxConcurrentReconciles int + // Enable Policy decision logs + EnablePolicyEventLogs bool // Enable Policy decision logs streaming to CloudWatch EnableCloudWatchLogs bool // Enable IPv6 mode @@ -39,7 +42,8 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) { "Set the controller log file - if not specified logs are written to stdout") fs.IntVar(&cfg.MaxConcurrentReconciles, flagMaxConcurrentReconciles, defaultMaxConcurrentReconciles, ""+ "Maximum number of concurrent reconcile loops") - fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch") + fs.BoolVar(&cfg.EnablePolicyEventLogs, flagEnablePolicyEventLogs, false, "If enabled, policy decision logs will be collected & logged") + fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch, requires \"enable-policy-event-logs=true\"") fs.BoolVar(&cfg.EnableIPv6, flagEnableIPv6, false, "If enabled, Network Policy agent will operate in IPv6 mode") fs.BoolVar(&cfg.EnableNetworkPolicy, flagEnableNetworkPolicy, false, "If enabled, Network Policy agent will initialize BPF maps and start reconciler") diff --git a/pkg/ebpf/bpf_client.go b/pkg/ebpf/bpf_client.go index 9e34867..82a6df9 100644 --- a/pkg/ebpf/bpf_client.go +++ b/pkg/ebpf/bpf_client.go @@ -105,7 +105,7 @@ type EbpfFirewallRules struct { L4Info []v1alpha1.Port } -func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableCloudWatchLogs bool, +func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, conntrackTTL time.Duration) (*bpfClient, error) { var conntrackMap goebpfmaps.BpfMap @@ -212,13 +212,17 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableClou ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6, ebpfClient.logger) ebpfClient.logger.Info("Initialized Conntrack client") - err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6) - if err != nil { - ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..") - sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc() - return nil, err + if enablePolicyEventLogs { + err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6) + if err != nil { + ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..") + sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc() + return nil, err + } + ebpfClient.logger.Info("Configured event logging") + } else { + ebpfClient.logger.Info("Disabled event logging") } - ebpfClient.logger.Info("Configured event logging") // Start Conntrack routines if enableIPv6 { diff --git a/pkg/ebpf/events/events.go b/pkg/ebpf/events/events.go index e6a5ce4..28e8222 100644 --- a/pkg/ebpf/events/events.go +++ b/pkg/ebpf/events/events.go @@ -175,48 +175,46 @@ func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCl // Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed. go func(ringbufferdata <-chan []byte) { done := false - for { - if record, ok := <-ringbufferdata; ok { - var logQueue []*cloudwatchlogs.InputLogEvent - var message string - if enableIPv6 { - var rb ringBufferDataV6_t - buf := bytes.NewBuffer(record) - if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil { - log.Info("Failed to read from Ring buf", err) - continue - } - - protocol := getProtocol(int(rb.Protocol)) - verdict := getVerdict(int(rb.Verdict)) - - log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort, - "Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort, - "Proto", protocol, "Verdict", verdict) - - message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict - } else { - var rb ringBufferDataV4_t - buf := bytes.NewBuffer(record) - if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil { - log.Info("Failed to read from Ring buf", err) - continue - } - protocol := getProtocol(int(rb.Protocol)) - verdict := getVerdict(int(rb.Verdict)) - - log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort, - "Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort, - "Proto", protocol, "Verdict", verdict) - - message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict + for record := range ringbufferdata { + var logQueue []*cloudwatchlogs.InputLogEvent + var message string + if enableIPv6 { + var rb ringBufferDataV6_t + buf := bytes.NewBuffer(record) + if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil { + log.Info("Failed to read from Ring buf", err) + continue } - if enableCloudWatchLogs { - done = publishDataToCloudwatch(logQueue, message, log) - if done { - break - } + protocol := getProtocol(int(rb.Protocol)) + verdict := getVerdict(int(rb.Verdict)) + + log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort, + "Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort, + "Proto", protocol, "Verdict", verdict) + + message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict + } else { + var rb ringBufferDataV4_t + buf := bytes.NewBuffer(record) + if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil { + log.Info("Failed to read from Ring buf", err) + continue + } + protocol := getProtocol(int(rb.Protocol)) + verdict := getVerdict(int(rb.Verdict)) + + log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort, + "Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort, + "Proto", protocol, "Verdict", verdict) + + message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict + } + + if enableCloudWatchLogs { + done = publishDataToCloudwatch(logQueue, message, log) + if done { + break } } }