Skip to content

Commit

Permalink
[agent-smith] Enable egress monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Prince Rachit Sinha committed Mar 11, 2022
1 parent b10bdc6 commit 17abe73
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
56 changes: 55 additions & 1 deletion components/ee/agent-smith/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package agent

import (
"context"
"errors"
"fmt"
"net/url"
"os"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -211,12 +213,29 @@ func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace
clo = make(chan classifiedProcess, 50)
)
agent.metrics.RegisterClassificationQueues(cli, clo)

workspaces := make(map[int]*common.Workspace)
wssWriteMutex := &sync.Mutex{}

defer wg.Wait()
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := range cli {
// Update the workspaces map if this process belongs to a new workspace
//
// When agent-smith starts there could be multiple attempts to update
// the workspaces map. This is because we do not lock before reading from the map.
// So, if multiple processes written to the cli chan are
// read in parallel, all of them will get inside the below if check.
if _, ok := workspaces[i.Workspace.PID]; !ok {
wssWriteMutex.Lock()
log.Debugf("adding workspace with pid %d and workspaceId %s to workspaces", i.Workspace.PID, i.Workspace.WorkspaceID)
workspaces[i.Workspace.PID] = i.Workspace
wssWriteMutex.Unlock()
}
// perform classification of the process
class, err := agent.classifier.Matches(i.Path, i.CommandLine)
// optimisation: early out to not block on the CLO chan
if err == nil && class.Level == classifier.LevelNoMatch {
Expand All @@ -227,6 +246,40 @@ func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace
}()
}

egressTicker := time.NewTicker(2 * time.Minute)

go func() {
for {
<-egressTicker.C
for pid, ws := range workspaces {
// check if the workspace is already stopped
fi, err := os.Stat(fmt.Sprintf("/proc/%d", pid))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
wssWriteMutex.Lock()
log.Debugf("deleting workspace with pid %d and workspaceId %s from workspaces", pid, ws.WorkspaceID)
delete(workspaces, pid)
wssWriteMutex.Unlock()
} else {
log.WithError(err).WithFields(log.OWI(ws.OwnerID, ws.WorkspaceID, ws.InstanceID)).Error("could not stat workspace, stale entries will continue to exist in workspaces state")
}
continue
}
infringement, err := agent.checkEgressTrafficCallback(pid, fi.ModTime())
if err != nil {
log.WithError(err).WithFields(log.OWI(ws.OwnerID, ws.WorkspaceID, ws.InstanceID)).Error("error calling checkEgressTrafficCallback")
continue
}
if infringement != nil {
// monitor metric here as we are not penalizing workspaces yet
log.Debugf("found infringing workspace with pid %d and worksapceId %s", pid, ws.WorkspaceID)
agent.metrics.egressViolations.WithLabelValues(string(infringement.Kind)).Inc()
}

}
}
}()

defer log.Info("agent smith main loop ended")

// We want to fill the classifier in a Go routine seaparete from using the classification
Expand Down Expand Up @@ -265,7 +318,7 @@ func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace
continue
}

agent.Penalize(InfringingWorkspace{
_, _ = agent.Penalize(InfringingWorkspace{
SupervisorPID: proc.Workspace.PID,
Owner: proc.Workspace.OwnerID,
InstanceID: proc.Workspace.InstanceID,
Expand Down Expand Up @@ -394,6 +447,7 @@ func (agent *Smith) checkEgressTrafficCallback(pid int, pidCreationTime time.Tim
return nil, err
}

log.Debugf("total egress bytes %d", resp)
if resp <= 0 {
log.WithField("total egress bytes", resp).Warn("GetEgressTraffic returned <= 0 value")
return nil, nil
Expand Down
10 changes: 10 additions & 0 deletions components/ee/agent-smith/pkg/agent/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type metrics struct {
classificationBackpressureInCount prometheus.GaugeFunc
classificationBackpressureOutCount prometheus.GaugeFunc
classificationBackpressureInDrop prometheus.Counter
egressViolations *prometheus.CounterVec

mu sync.RWMutex
cl []prometheus.Collector
Expand All @@ -41,6 +42,14 @@ func newAgentMetrics() *metrics {
Help: "The total amount of failed attempts that agent-smith is trying to apply a penalty.",
}, []string{"penalty", "reason"},
)
m.egressViolations = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "gitpod",
Subsystem: "agent_smith",
Name: "egress_violations_total",
Help: "The total amount of egress violations that agent-smith discovered.",
}, []string{"severity"},
)
m.classificationBackpressureInDrop = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "gitpod",
Subsystem: "agent_smith",
Expand All @@ -51,6 +60,7 @@ func newAgentMetrics() *metrics {
m.penaltyAttempts,
m.penaltyFailures,
m.classificationBackpressureInDrop,
m.egressViolations,
}
return m
}
Expand Down

0 comments on commit 17abe73

Please sign in to comment.