Skip to content

Commit

Permalink
[CSM] Fix busybox support for cws-instrumentation (#27863)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gui774ume authored Jul 24, 2024
1 parent 856cf4a commit f05553a
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 76 deletions.
12 changes: 12 additions & 0 deletions cmd/cws-instrumentation/subcommands/healthcmd/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package healthcmd holds the health command of CWS injector
package healthcmd

const (
// HealthCommandOutput is the expected output of the health check
HealthCommandOutput = "OK"
)
27 changes: 27 additions & 0 deletions cmd/cws-instrumentation/subcommands/healthcmd/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build linux

package healthcmd

import (
"fmt"
"github.com/spf13/cobra"
)

// Command returns the commands for the setup subcommand
func Command() []*cobra.Command {
healthCmd := &cobra.Command{
Use: "health",
Short: "Prints OK to stdout",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Print(HealthCommandOutput)
return nil
},
}

return []*cobra.Command{healthCmd}
}
2 changes: 2 additions & 0 deletions cmd/cws-instrumentation/subcommands/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package subcommands

import (
"github.com/DataDog/datadog-agent/cmd/cws-instrumentation/command"
"github.com/DataDog/datadog-agent/cmd/cws-instrumentation/subcommands/healthcmd"
"github.com/DataDog/datadog-agent/cmd/cws-instrumentation/subcommands/injectcmd"
"github.com/DataDog/datadog-agent/cmd/cws-instrumentation/subcommands/setupcmd"
"github.com/DataDog/datadog-agent/cmd/cws-instrumentation/subcommands/tracecmd"
Expand All @@ -22,5 +23,6 @@ func CWSInjectorSubcommands() []command.SubcommandFactory {
setupcmd.Command,
injectcmd.Command,
tracecmd.Command,
healthcmd.Command,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/metrics"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/common"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/cwsinstrumentation/k8scp"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/cwsinstrumentation/k8sexec"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/security/resolvers/usersessions"
"github.com/DataDog/datadog-agent/pkg/util/containers"
Expand Down Expand Up @@ -63,6 +64,7 @@ const (
cwsNilCommandReason = "nil_command"
cwsClusterAgentServiceAccountReason = "cluster_agent_service_account"
cwsClusterAgentKubectlCPReason = "cluster_agent_kubectl_cp"
cwsClusterAgentKubectlExecHealthReason = "cluster_agent_kubectl_exec_health"
cwsExcludedResourceReason = "excluded_resource"
cwsDescribePodErrorReason = "describe_pod_error"
cwsExcludedByAnnotationReason = "excluded_by_annotation"
Expand Down Expand Up @@ -433,7 +435,7 @@ func (ci *CWSInstrumentation) injectCWSCommandInstrumentation(exec *corev1.PodEx
}

// fall back in case the service account filter somehow didn't work
if len(exec.Command) > len(k8scp.CWSRemoteCopyCommand) && slices.Equal(exec.Command[0:len(k8scp.CWSRemoteCopyCommand)], k8scp.CWSRemoteCopyCommand) {
if len(exec.Command) >= len(k8scp.CWSRemoteCopyCommand) && slices.Equal(exec.Command[0:len(k8scp.CWSRemoteCopyCommand)], k8scp.CWSRemoteCopyCommand) {
log.Debugf("Ignoring kubectl cp requests to %s from the cluster agent", name)
metrics.CWSExecInstrumentationAttempts.Observe(1, ci.mode.String(), "false", cwsClusterAgentKubectlCPReason)
return false, nil
Expand Down Expand Up @@ -500,6 +502,14 @@ func (ci *CWSInstrumentation) injectCWSCommandInstrumentation(exec *corev1.PodEx
}
}

// Now that we have computed the remote path of cws-instrumentation, we can make sure the current command isn't
// remote health command from the cluster-agent (in which case we should simply ignore this request)
if len(exec.Command) >= 2 && slices.Equal(exec.Command[0:2], []string{cwsInstrumentationRemotePath, k8sexec.CWSHealthCommand}) {
log.Debugf("Ignoring kubectl health check exec requests to %s from the cluster agent", name)
metrics.CWSExecInstrumentationAttempts.Observe(1, ci.mode.String(), "false", cwsClusterAgentKubectlExecHealthReason)
return false, nil
}

arch, err := ci.resolveNodeArch(pod.Spec.NodeName, apiClient)
if err != nil {
log.Errorf("Ignoring exec request into %s: %v", common.PodString(pod), err)
Expand Down Expand Up @@ -585,7 +595,13 @@ func (ci *CWSInstrumentation) injectCWSCommandInstrumentationRemoteCopy(pod *cor
}

cp := k8scp.NewCopy(apiclient)
return cp.CopyToPod(cwsInstrumentationLocalPath, cwsInstrumentationRemotePath, pod, container)
if err = cp.CopyToPod(cwsInstrumentationLocalPath, cwsInstrumentationRemotePath, pod, container); err != nil {
return err
}

// check cws-instrumentation was properly copied by running "cws-instrumentation health"
health := k8sexec.NewHealthCommand(apiclient)
return health.Run(cwsInstrumentationRemotePath, pod, container)
}

func (ci *CWSInstrumentation) injectForPod(request *admission.MutateRequest) ([]byte, error) {
Expand Down
86 changes: 12 additions & 74 deletions pkg/clusteragent/admission/mutate/cwsinstrumentation/k8scp/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,32 @@
package k8scp

import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"

"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/cwsinstrumentation/k8sexec"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
)

var (
// CWSRemoteCopyCommand is the command used to copy cws-instrumentation, arguments are split on purpose
// to try to differentiate this kubectl cp from others
CWSRemoteCopyCommand = []string{"tar", "-x", "-m", "-f", "-", "--totals"}
CWSRemoteCopyCommand = []string{"tar", "-x", "-m", "-f", "-"}
)

// StreamOptions contains option for the remote stream
type StreamOptions struct {
Stdin bool
genericiooptions.IOStreams
}

// Copy perform remote copy operations
type Copy struct {
Container string
Namespace string

apiClient *apiserver.APIClient
in *bytes.Buffer
out *bytes.Buffer
errOut *bytes.Buffer
k8sexec.Exec
}

// NewCopy creates a Command instance
// NewCopy creates a Copy instance
func NewCopy(apiClient *apiserver.APIClient) *Copy {
return &Copy{
in: &bytes.Buffer{},
out: &bytes.Buffer{},
errOut: &bytes.Buffer{},
apiClient: apiClient,
Exec: k8sexec.NewExec(apiClient),
}
}

Expand Down Expand Up @@ -94,16 +73,16 @@ func (o *Copy) CopyToPod(localFile string, remoteFile string, pod *corev1.Pod, c
}
}(srcFile, destFile, writer)

streamOptions := StreamOptions{
streamOptions := k8sexec.StreamOptions{
IOStreams: genericiooptions.IOStreams{
In: reader,
Out: o.out,
ErrOut: o.errOut,
Out: o.Out,
ErrOut: o.ErrOut,
},
Stdin: true,
}

if err := o.execute(pod, o.prepareCommand(destFile), streamOptions); err != nil {
if err := o.Execute(pod, o.prepareCommand(destFile), streamOptions); err != nil {
return err
}

Expand All @@ -115,50 +94,9 @@ func (o *Copy) CopyToPod(localFile string, remoteFile string, pod *corev1.Pod, c
}

// check stdout and stderr from tar
outData := o.errOut.String() + o.out.String()
if !strings.HasPrefix(outData, "Total bytes read:") {
return fmt.Errorf("unexpected output: %s", outData)
outData := o.ErrOut.String() + o.Out.String()
if len(outData) > 0 {
return fmt.Errorf("unexpected output: '%s' (%d)", outData, len(outData))
}
return nil
}

func (o *Copy) execute(pod *corev1.Pod, command []string, streamOptions StreamOptions) error {
restClient, err := o.apiClient.RESTClient(
"/api",
&schema.GroupVersion{Version: "v1"},
serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs},
)
if err != nil {
return err
}

req := restClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: o.Container,
Command: command,
Stdin: streamOptions.Stdin,
Stdout: streamOptions.Out != nil,
Stderr: streamOptions.ErrOut != nil,
}, scheme.ParameterCodec)

exec, err := o.apiClient.NewSPDYExecutor(
"/api",
&schema.GroupVersion{Version: "v1"},
serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs},
"POST",
req.URL(),
)
if err != nil {
return err
}

return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: streamOptions.In,
Stdout: streamOptions.Out,
Stderr: streamOptions.ErrOut,
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

// Package k8sexec implements the necessary methods to run commands remotely
package k8sexec

import (
"bytes"
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
)

// StreamOptions contains option for the remote stream
type StreamOptions struct {
Stdin bool
genericiooptions.IOStreams
}

// Exec performs remote exec operations
type Exec struct {
Container string
Namespace string

APIClient *apiserver.APIClient
In *bytes.Buffer
Out *bytes.Buffer
ErrOut *bytes.Buffer
}

// NewExec creates a Exec instance
func NewExec(apiClient *apiserver.APIClient) Exec {
return Exec{
In: &bytes.Buffer{},
Out: &bytes.Buffer{},
ErrOut: &bytes.Buffer{},
APIClient: apiClient,
}
}

// Execute runs the exec command
func (e Exec) Execute(pod *corev1.Pod, command []string, streamOptions StreamOptions) error {
restClient, err := e.APIClient.RESTClient(
"/api",
&schema.GroupVersion{Version: "v1"},
serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs},
)
if err != nil {
return err
}

req := restClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: e.Container,
Command: command,
Stdin: streamOptions.Stdin,
Stdout: streamOptions.Out != nil,
Stderr: streamOptions.ErrOut != nil,
}, scheme.ParameterCodec)

exec, err := e.APIClient.NewSPDYExecutor(
"/api",
&schema.GroupVersion{Version: "v1"},
serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs},
"POST",
req.URL(),
)
if err != nil {
return err
}

return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: streamOptions.In,
Stdout: streamOptions.Out,
Stderr: streamOptions.ErrOut,
})
}
Loading

0 comments on commit f05553a

Please sign in to comment.