Skip to content
This repository has been archived by the owner on Oct 14, 2020. It is now read-only.

Commit

Permalink
refactor: Define interface for vulnerability scanner
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Pacak <[email protected]>
  • Loading branch information
danielpacak committed Jun 29, 2020
1 parent 2facc70 commit 16d9fbd
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 196 deletions.
202 changes: 33 additions & 169 deletions pkg/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,18 @@ import (
"context"
"fmt"

"github.com/aquasecurity/starboard-security-operator/pkg/scanner/vulnerability"
"github.com/aquasecurity/starboard-security-operator/pkg/scanner/vulnerability/aqua"
"k8s.io/klog"

"github.com/aquasecurity/starboard-security-operator/pkg/aqua"
"github.com/aquasecurity/starboard/pkg/find/vulnerabilities/crd"
starboard "github.com/aquasecurity/starboard/pkg/generated/clientset/versioned"

"github.com/aquasecurity/starboard/pkg/apis/aquasecurity/v1alpha1"
"github.com/aquasecurity/starboard-security-operator/pkg/etc"
"github.com/aquasecurity/starboard/pkg/find/vulnerabilities"
"github.com/aquasecurity/starboard/pkg/kube/pod"
"k8s.io/utils/pointer"

"github.com/aquasecurity/starboard-security-operator/pkg/etc"
"github.com/google/uuid"
batch "k8s.io/api/batch/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"k8s.io/client-go/kubernetes"

"github.com/aquasecurity/starboard/pkg/kube"
Expand All @@ -37,21 +32,23 @@ import (
type Starboard interface {
IsPodScheduled(old, new *core.Pod) bool
SubmitScanJobByPod(ctx context.Context, spec *core.Pod) error
IsScanJobProcessable(old, new *batch.Job) bool
IsScanJobFinished(ctx context.Context, job *batch.Job) (bool, batch.JobConditionType)
ProcessCompleteScanJob(ctx context.Context, scanJob *batch.Job) error
ProcessFailedScanJob(ctx context.Context, scanJob *batch.Job) error
}

// NewStarboard construct a new Starboard action with the specified configuration
// and Kubernetes clientsets.
func NewStarboard(config etc.Config,
kubeClientset kubernetes.Interface,
starboardClientset starboard.Interface) Starboard {
pods := pod.NewPodManager(kubeClientset)
return &action{
config: config,
kubeClientset: kubeClientset,
pods: pod.NewPodManager(kubeClientset),
pods: pods,
writer: crd.NewWriter(starboardClientset),
converter: aqua.NewConverter(config.ScannerAquaCSP),
scanner: aqua.NewScanner(config, pods, aqua.NewConverter(config.ScannerAquaCSP)),
}
}

Expand All @@ -60,7 +57,7 @@ type action struct {
kubeClientset kubernetes.Interface
pods *pod.Manager
writer vulnerabilities.Writer
converter aqua.Converter
scanner vulnerability.Scanner
}

func (a *action) IsPodScheduled(old, new *core.Pod) bool {
Expand All @@ -79,25 +76,26 @@ func (a *action) hasContainersReadyCondition(pod *core.Pod) bool {
return false
}

func (a *action) IsScanJobProcessable(_, new *batch.Job) bool {
func (a *action) IsScanJobFinished(_ context.Context, job *batch.Job) (bool, batch.JobConditionType) {
// TODO Detect scan Jobs scheduled by Scanner CLI
if new.Namespace != a.config.Operator.StarboardNamespace {
return false
if job.Namespace != a.config.Operator.StarboardNamespace {
return false, ""
}

if len(new.Status.Conditions) == 0 {
return false
if len(job.Status.Conditions) == 0 {
return false, ""
}

if new.Status.Conditions[0].Type == batch.JobComplete {
return true
}
return true, job.Status.Conditions[0].Type
}

if new.Status.Conditions[0].Type == batch.JobFailed {
return true
func (a *action) SubmitScanJobByPod(ctx context.Context, pod *core.Pod) error {
job := a.scanner.PrepareScanJob(ctx, pod)
_, err := a.kubeClientset.BatchV1().Jobs(a.config.Operator.StarboardNamespace).Create(ctx, job, meta.CreateOptions{})
if err != nil {
return fmt.Errorf("creating scan job: %w", err)
}

return false
return nil
}

func (a *action) ProcessCompleteScanJob(ctx context.Context, scanJob *batch.Job) error {
Expand All @@ -106,15 +104,9 @@ func (a *action) ProcessCompleteScanJob(ctx context.Context, scanJob *batch.Job)
return fmt.Errorf("getting workload from scan job labels set: %w", err)
}

vulnerabilityReports := make(map[string]v1alpha1.VulnerabilityReport)

for _, container := range scanJob.Spec.Template.Spec.Containers {
vulnerabilityReport, err := a.processVulnerabilityReportByContainer(ctx, scanJob, container.Name)
if err != nil {
klog.Errorf("Error while processing complete scan job by container: %v", err)
continue
}
vulnerabilityReports[container.Name] = vulnerabilityReport
vulnerabilityReports, err := a.scanner.GetVulnerabilityReportsByScanJob(ctx, scanJob)
if err != nil {
return err
}

err = a.writer.Write(ctx, workload, vulnerabilityReports)
Expand All @@ -124,144 +116,16 @@ func (a *action) ProcessCompleteScanJob(ctx context.Context, scanJob *batch.Job)
return nil
}

func (a *action) processVulnerabilityReportByContainer(ctx context.Context, scanJob *batch.Job, container string) (v1alpha1.VulnerabilityReport, error) {
logsReader, err := a.pods.GetContainerLogsByJob(ctx, scanJob, container)
if err != nil {
return v1alpha1.VulnerabilityReport{}, fmt.Errorf("getting logs from container %s of %s/%s: %w", container, scanJob.Namespace, scanJob.Name, err)
}
defer func() {
_ = logsReader.Close()
}()
vulnerabilityReport, err := a.converter.Convert(logsReader)
if err != nil {
return v1alpha1.VulnerabilityReport{}, fmt.Errorf("converting logs to scan report: %w", err)
}

return vulnerabilityReport, nil
}

func (a *action) SubmitScanJobByPod(ctx context.Context, pod *core.Pod) error {
job := a.prepareScanJob(pod)
_, err := a.kubeClientset.BatchV1().Jobs(a.config.Operator.StarboardNamespace).Create(ctx, job, meta.CreateOptions{})
func (a *action) ProcessFailedScanJob(ctx context.Context, scanJob *batch.Job) error {
statuses, err := a.pods.GetTerminatedContainersStatusesByJob(ctx, scanJob)
if err != nil {
return fmt.Errorf("creating scan job: %w", err)
}
return nil
}

func (a *action) prepareScanJob(pod *core.Pod) *batch.Job {
resource := a.GetImmediateOwnerReference(pod)

scanJobContainers := make([]core.Container, len(pod.Spec.Containers))
for i, container := range pod.Spec.Containers {
scanJobContainers[i] = a.newScanJobContainer(container)
}

return &batch.Job{
ObjectMeta: meta.ObjectMeta{
Name: uuid.New().String(),
Namespace: a.config.Operator.StarboardNamespace,
Labels: labels.Set{
kube.LabelResourceKind: string(resource.Kind),
kube.LabelResourceName: resource.Name,
kube.LabelResourceNamespace: resource.Namespace,
},
},
Spec: batch.JobSpec{
BackoffLimit: pointer.Int32Ptr(0),
Template: core.PodTemplateSpec{
Spec: core.PodSpec{
RestartPolicy: core.RestartPolicyNever,
Volumes: []core.Volume{
{
Name: "dockersock",
VolumeSource: core.VolumeSource{
HostPath: &core.HostPathVolumeSource{
Path: "/var/run/docker.sock",
},
},
},
},
Containers: scanJobContainers,
},
},
},
}
}

func (a *action) newScanJobContainer(podContainer core.Container) core.Container {
return core.Container{
Name: podContainer.Name,
Image: fmt.Sprintf("%s/scanner:%s",
a.config.ScannerAquaCSP.RegistryServer,
a.config.ScannerAquaCSP.Version),
ImagePullPolicy: core.PullNever,
Command: []string{
"/bin/sh",
"-c",
fmt.Sprintf("/opt/aquasec/scannercli scan --checkonly --host $(OPERATOR_SCANNER_AQUA_CSP_HOST) --user $(OPERATOR_SCANNER_AQUA_CSP_USER) --password $(OPERATOR_SCANNER_AQUA_CSP_PASSWORD) --local %s 2> %s",
podContainer.Image,
core.TerminationMessagePathDefault),
},
Env: []core.EnvVar{
{
Name: "OPERATOR_SCANNER_AQUA_CSP_HOST",
ValueFrom: &core.EnvVarSource{
SecretKeyRef: &core.SecretKeySelector{
LocalObjectReference: core.LocalObjectReference{
Name: "starboard-security-operator",
},
Key: "OPERATOR_SCANNER_AQUA_CSP_HOST",
},
},
},
{
Name: "OPERATOR_SCANNER_AQUA_CSP_USER",
ValueFrom: &core.EnvVarSource{
SecretKeyRef: &core.SecretKeySelector{
LocalObjectReference: core.LocalObjectReference{
Name: "starboard-security-operator",
},
Key: "OPERATOR_SCANNER_AQUA_CSP_USER",
},
},
},
{
Name: "OPERATOR_SCANNER_AQUA_CSP_PASSWORD",
ValueFrom: &core.EnvVarSource{
SecretKeyRef: &core.SecretKeySelector{
LocalObjectReference: core.LocalObjectReference{
Name: "starboard-security-operator",
},
Key: "OPERATOR_SCANNER_AQUA_CSP_PASSWORD",
},
},
},
},
VolumeMounts: []core.VolumeMount{
{
Name: "dockersock",
MountPath: "/var/run/docker.sock",
},
},
return fmt.Errorf("getting terminated containers statuses: %w", err)
}
}

// TODO Climb up the owners hierarchy and use the root?
// TODO Move such utility function to libstarboard
// TODO Add GetRootOwnerReference to have another option
func (a *action) GetImmediateOwnerReference(pod *core.Pod) kube.Object {
ownerRef := meta.GetControllerOf(pod)
if ownerRef != nil {
return kube.Object{
Namespace: pod.Namespace,
Kind: kube.Kind(ownerRef.Kind),
Name: ownerRef.Name,
for container, status := range statuses {
if status.ExitCode == 0 {
continue
}
klog.Errorf("Scan job container %s %s: %s", container, status.Reason, status.Message)
}
return kube.Object{
Namespace: pod.Namespace,
Kind: kube.KindPod,
Name: pod.Name,
}
return nil
}
30 changes: 22 additions & 8 deletions pkg/controller/job/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,33 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
}

func (c *Controller) jobUpdated(_, new interface{}) {
if pod, ok := new.(*batch.Job); ok {
c.processJob(nil, pod)
if job, ok := new.(*batch.Job); ok {
c.processJob(job)
}
}

func (c *Controller) processJob(old, new *batch.Job) {
if !c.action.IsScanJobProcessable(old, new) {
func (c *Controller) processJob(job *batch.Job) {
var finished bool
var jobCondition batch.JobConditionType

if finished, jobCondition = c.action.IsScanJobFinished(context.Background(), job); !finished {
return
}

klog.Infof("Processing job: %s/%s", new.Namespace, new.Name)
err := c.action.ProcessCompleteScanJob(context.Background(), new)
if err != nil {
klog.Errorf("Error while processing job: %v", err)
switch jobCondition {
case batch.JobComplete:
klog.Infof("Processing complete scan job: %s/%s", job.Namespace, job.Name)
err := c.action.ProcessCompleteScanJob(context.Background(), job)
if err != nil {
klog.Errorf("Error while processing complete scan job: %v", err)
}
case batch.JobFailed:
klog.Infof("Processing failed scan job: %s/%s", job.Namespace, job.Name)
err := c.action.ProcessFailedScanJob(context.Background(), job)
if err != nil {
klog.Errorf("Error while processing failed scan job: %v", err)
}
default:
klog.Warningf("Unrecognized scan job condition: %v", jobCondition)
}
}
Loading

0 comments on commit 16d9fbd

Please sign in to comment.