Skip to content

Commit

Permalink
feat: audit multiple sockets in a single scra instance
Browse files Browse the repository at this point in the history
This is really useful on for instance Talos Linux, which runs a seperate
containerd for Kubernetes. You'll want to watch both containerd
instances, which SCRA supports now.

Signed-off-by: Jorik Jonker <[email protected]>
  • Loading branch information
jonkerj committed Apr 22, 2022
1 parent 74f2deb commit 3be2029
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 26 deletions.
31 changes: 26 additions & 5 deletions cmd/once.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package cmd

import (
"context"
"fmt"

"github.com/equinix-ms/scra/internal/runtimes/containerd"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -19,13 +23,30 @@ func init() {
}

func once(cmd *cobra.Command, args []string) {
a, err := containerd.NewAuditor(viper.GetString("containerd-address"), viper.GetString("root-prefix"))
if err != nil {
panic(err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

group, gctx := errgroup.WithContext(ctx)
rootPrefix := viper.GetString("root-prefix")

for _, address := range viper.GetStringSlice("containerd-address") {
a, err := containerd.NewAuditor(address, rootPrefix, gctx)
if err != nil {
panic(err)
}

group.Go(func() error {
err = a.AuditOnce()
if err != nil {
return err
}

return nil
})
}

err = a.AuditOnce()
err := group.Wait()
if err != nil {
panic(err)
fmt.Printf("error: %v\n", err)
}
}
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
viper.SetEnvPrefix("scra")

flags := rootCmd.PersistentFlags()
flags.String("containerd-address", "/run/containerd/containerd.sock", "location of containerd endpoint")
flags.StringSlice("containerd-address", []string{"/run/containerd/containerd.sock"}, "location of containerd endpoint")
flags.String("root-prefix", "", "root prefix when accessing /proc et al from a hostPath mount")

if err := viper.BindPFlags(flags); err != nil {
Expand Down
32 changes: 27 additions & 5 deletions cmd/watch.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package cmd

import (
"context"
"fmt"

"github.com/equinix-ms/scra/internal/runtimes/containerd"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -19,13 +23,31 @@ func init() {
}

func watch(cmd *cobra.Command, args []string) {
a, err := containerd.NewAuditor(viper.GetString("containerd-address"), viper.GetString("root-prefix"))
if err != nil {
panic(err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

group, gctx := errgroup.WithContext(ctx)
rootPrefix := viper.GetString("root-prefix")

for _, address := range viper.GetStringSlice("containerd-address") {
a, err := containerd.NewAuditor(address, rootPrefix, gctx)
if err != nil {
panic(err)
}

group.Go(func() error {
err := a.Watch()
if err != nil {
return err
}

return nil
})

}

err = a.Watch()
err := group.Wait()
if err != nil {
panic(err)
fmt.Printf("error: %v\n", err)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/vishvananda/netlink v1.2.0-beta
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220412071739-889880a91fd5
)

Expand Down Expand Up @@ -52,7 +53,6 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
google.golang.org/grpc v1.45.0 // indirect
Expand Down
8 changes: 5 additions & 3 deletions internal/runtimes/containerd/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type Auditor struct {
containerdClient *Client
rootPrefix string
logger *zap.Logger
context context.Context
}

func NewAuditor(address string, prefix string) (*Auditor, error) {
client, err := NewClient(address)
func NewAuditor(address string, prefix string, ctx context.Context) (*Auditor, error) {
client, err := NewClient(address, ctx)
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}
Expand All @@ -34,6 +35,7 @@ func NewAuditor(address string, prefix string) (*Auditor, error) {
containerdClient: client,
rootPrefix: prefix,
logger: logger,
context: ctx,
}

return a, nil
Expand Down Expand Up @@ -63,7 +65,7 @@ func (a *Auditor) AuditOnce() error {
}

func (a *Auditor) auditContainer(namespace string, container containerd.Container) error {
ctx := namespaces.WithNamespace(context.Background(), namespace)
ctx := namespaces.WithNamespace(a.context, namespace)
spec, err := container.Spec(ctx)
if err != nil {
return fmt.Errorf("error getting spec: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions internal/runtimes/containerd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@ import (
)

type Client struct {
client *containerd.Client
client *containerd.Client
context context.Context
}

func NewClient(address string) (*Client, error) {
func NewClient(address string, ctx context.Context) (*Client, error) {
client, err := containerd.New(address)
if err != nil {
return nil, fmt.Errorf("error connecting to containerd: %v", err)
}

return &Client{client: client}, nil
return &Client{client: client, context: ctx}, nil
}

func (c *Client) Close() {
c.client.Close()
}

func (c *Client) ListNamespaces() ([]string, error) {
ctx := context.Background()
namespaces := c.client.NamespaceService()
nss, err := namespaces.List(ctx)
nss, err := namespaces.List(c.context)
if err != nil {
return nil, fmt.Errorf("error listing namespaces: %v", err)
}
Expand All @@ -37,7 +37,7 @@ func (c *Client) ListNamespaces() ([]string, error) {
}

func (c *Client) ListContainers(namespace string) ([]containerd.Container, error) {
ctx := namespaces.WithNamespace(context.Background(), namespace)
ctx := namespaces.WithNamespace(c.context, namespace)
containers, err := c.client.Containers(ctx, "")
if err != nil {
return nil, fmt.Errorf("error listing containers: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions internal/runtimes/containerd/watch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package containerd

import (
"context"

apievents "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
Expand All @@ -12,15 +10,17 @@ import (
)

func (a *Auditor) Watch() error {
ctx := context.Background()
eventStream, errC := a.containerdClient.client.EventService().Subscribe(ctx, `topic=="/tasks/start"`)
eventStream, errC := a.containerdClient.client.EventService().Subscribe(a.context, `topic=="/tasks/start"`)
a.logger.Info("listening for tasks/start events", zap.String("address", a.address))
for {
var (
event *events.Envelope
err error
)
select {
case <-a.context.Done():
a.logger.Info("i have been canceled")
return nil
case err = <-errC:
if err != nil {
a.logger.Warn("received error", zap.Error(err))
Expand All @@ -40,7 +40,7 @@ func (a *Auditor) Watch() error {

switch t := e.(type) {
case *apievents.TaskStart:
nsCtx := namespaces.WithNamespace(ctx, event.Namespace)
nsCtx := namespaces.WithNamespace(a.context, event.Namespace)

container, err := a.containerdClient.client.LoadContainer(nsCtx, t.ContainerID)
if err != nil {
Expand Down

0 comments on commit 3be2029

Please sign in to comment.