Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add flag enable-policy-event-logs #48

Merged
merged 4 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ ENV GOPROXY=direct

WORKDIR /workspace

COPY . ./
COPY go.mod go.sum ./
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

COPY . ./

RUN make build-linux

# Vmlinux
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ Default: false

Set this flag to `true` to enable the Network Policy feature support.

#### `enable-policy-event-logs`

Type: Boolean

Default: false

Set this flag to `true` to enable the collection & logging of policy decision logs.

> Notice: Enabling this feature requires one CPU core per node.

#### `enable-cloudwatch-logs`

Type: Boolean
Expand All @@ -39,6 +49,8 @@ Default: false

Network Policy Agent provides an option to stream policy decision logs to Cloudwatch. For EKS clusters, the policy logs will be located under `/aws/eks/<cluster-name>/cluster/` and for self-managed K8S clusters, the logs will be placed under `/aws/k8s-cluster/cluster/`. By default, Network Policy Agent will log policy decision information for individual flows to a file on the local node (`/var/run/aws-routed-eni/network-policy-agent.log`).

This feature requires to also enable the `enable-policy-event-logs` flag.

This feature requires you to provide relevant Cloudwatch permissions to `aws-node` pod via the below policy.

```
Expand Down
4 changes: 2 additions & 2 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func prometheusRegister() {

// NewPolicyEndpointsReconciler constructs new PolicyEndpointReconciler
func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger,
enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) {
enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool) (*PolicyEndpointsReconciler, error) {
r := &PolicyEndpointsReconciler{
k8sClient: k8sClient,
log: log,
Expand All @@ -92,7 +92,7 @@ func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger,
var err error
if enableNetworkPolicy {
r.ebpfClient, err = ebpf.NewBpfClient(&r.policyEndpointeBPFContext, r.nodeIP,
enableCloudWatchLogs, enableIPv6, conntrackTTL)
enablePolicyEventLogs, enableCloudWatchLogs, enableIPv6, conntrackTTL)

// Start prometheus
prometheusRegister()
Expand Down
2 changes: 1 addition & 1 deletion controllers/policyendpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestDeriveIngressAndEgressFirewallRules(t *testing.T) {

mockClient := mock_client.NewMockClient(ctrl)
policyEndpointReconciler, _ := NewPolicyEndpointsReconciler(mockClient, logr.New(&log.NullLogSink{}),
false, false, false)
false, false, false, false)
var policyEndpointsList []string
policyEndpointsList = append(policyEndpointsList, tt.policyEndpointName)
policyEndpointReconciler.podIdentifierToPolicyEndpointMap.Store(tt.podIdentifier, policyEndpointsList)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {

ctx := ctrl.SetupSignalHandler()
policyEndpointController, err := controllers.NewPolicyEndpointsReconciler(mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnableCloudWatchLogs,
ctrl.Log.WithName("controllers").WithName("policyEndpoints"), ctrlConfig.EnablePolicyEventLogs, ctrlConfig.EnableCloudWatchLogs,
ctrlConfig.EnableIPv6, ctrlConfig.EnableNetworkPolicy)
if err != nil {
setupLog.Error(err, "unable to setup controller", "controller", "PolicyEndpoints init failed")
Expand Down
6 changes: 5 additions & 1 deletion pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
defaultLogLevel = "info"
defaultLogFile = "/var/log/aws-routed-eni/network-policy-agent.log"
defaultMaxConcurrentReconciles = 3
flagEnablePolicyEventLogs = "enable-policy-event-logs"
flagEnableCloudWatchLogs = "enable-cloudwatch-logs"
flagEnableIPv6 = "enable-ipv6"
flagEnableNetworkPolicy = "enable-network-policy"
Expand All @@ -22,6 +23,8 @@ type ControllerConfig struct {
LogFile string
// MaxConcurrentReconciles specifies the max number of reconcile loops
MaxConcurrentReconciles int
// Enable Policy decision logs
EnablePolicyEventLogs bool
// Enable Policy decision logs streaming to CloudWatch
EnableCloudWatchLogs bool
// Enable IPv6 mode
Expand All @@ -39,7 +42,8 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
"Set the controller log file - if not specified logs are written to stdout")
fs.IntVar(&cfg.MaxConcurrentReconciles, flagMaxConcurrentReconciles, defaultMaxConcurrentReconciles, ""+
"Maximum number of concurrent reconcile loops")
fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch")
fs.BoolVar(&cfg.EnablePolicyEventLogs, flagEnablePolicyEventLogs, false, "If enabled, policy decision logs will be collected & logged")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flow specific logs will be rather useful for debugging. Since this is an initial release, will be helpful to have it turned on by default for the first few releases. It should only log an entry per flow and not per packet (for accepted flows). Obviously, this change allows an user to turn it off if it becomes too verbose for them..

Copy link
Contributor Author

@mycrEEpy mycrEEpy Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with turning it on by default is not the amount of logs being produced, but the fact that the busy loop which is collecting the decision logs is consuming a full CPU core on every node. From a users point of view this is unexpected and undesired default behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. We had some internal discussions on what the default should be for this flag and we agreed on disabling this by default considering the agent runs as a daemonset. So, we should be good with the current PR. We will look to enable it by default, once we find a way to optimize the CPU usage. Thanks for the PR.

fs.BoolVar(&cfg.EnableCloudWatchLogs, flagEnableCloudWatchLogs, false, "If enabled, policy decision logs will be streamed to CloudWatch, requires \"enable-policy-event-logs=true\"")
fs.BoolVar(&cfg.EnableIPv6, flagEnableIPv6, false, "If enabled, Network Policy agent will operate in IPv6 mode")
fs.BoolVar(&cfg.EnableNetworkPolicy, flagEnableNetworkPolicy, false, "If enabled, Network Policy agent will initialize BPF maps and start reconciler")

Expand Down
18 changes: 11 additions & 7 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EbpfFirewallRules struct {
L4Info []v1alpha1.Port
}

func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableCloudWatchLogs bool,
func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool,
enableIPv6 bool, conntrackTTL time.Duration) (*bpfClient, error) {
var conntrackMap goebpfmaps.BpfMap

Expand Down Expand Up @@ -212,13 +212,17 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enableClou
ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6, ebpfClient.logger)
ebpfClient.logger.Info("Initialized Conntrack client")

err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..")
sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc()
return nil, err
if enablePolicyEventLogs {
err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..")
sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc()
return nil, err
}
ebpfClient.logger.Info("Configured event logging")
} else {
ebpfClient.logger.Info("Disabled event logging")
}
ebpfClient.logger.Info("Configured event logging")

// Start Conntrack routines
if enableIPv6 {
Expand Down
78 changes: 38 additions & 40 deletions pkg/ebpf/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,48 +175,46 @@ func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCl
// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for {
if record, ok := <-ringbufferdata; ok {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
for record := range ringbufferdata {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also changed this line to wait for data from the channel instead of dead spinning the CPU in the for loop.

var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}

if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := getProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))

log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)

message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}

if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
Expand Down