Skip to content

Commit

Permalink
feat: Allow patches to be applied to the job spec
Browse files Browse the repository at this point in the history
  • Loading branch information
phensley authored and fntlnz committed May 3, 2021
1 parent e3f4d2c commit acf43b0
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 7 deletions.
34 changes: 27 additions & 7 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ var (
# Run a bpftrace inline program on a pod container with a custom image for the bpftrace container that will run your program in the cluster
%[1]s trace run pod/nginx nginx -e "tracepoint:syscalls:sys_enter_* { @[probe] = count(); } --imagename=quay.io/custom-bpftrace-image-name"`

runCommand = "run"
usageString = "(POD | TYPE/NAME)"
requiredArgErrString = fmt.Sprintf("%s is a required argument for the %s command", usageString, runCommand)
containerAsArgOrFlagErrString = "specify container inline as argument or via its flag"
bpftraceMissingErrString = "the bpftrace program is mandatory"
bpftraceDoubleErrString = "specify the bpftrace program either via an external file or via a literal string, not both"
bpftraceEmptyErrString = "the bpftrace programm cannot be empty"
runCommand = "run"
usageString = "(POD | TYPE/NAME)"
requiredArgErrString = fmt.Sprintf("%s is a required argument for the %s command", usageString, runCommand)
containerAsArgOrFlagErrString = "specify container inline as argument or via its flag"
bpftraceMissingErrString = "the bpftrace program is mandatory"
bpftraceDoubleErrString = "specify the bpftrace program either via an external file or via a literal string, not both"
bpftraceEmptyErrString = "the bpftrace programm cannot be empty"
bpftracePatchWithoutTypeErrString = "to use --patch you must also specify the --patch-type argument"
bpftracePatchTypeWithoutPatchErrString = "to use --patch-type you must specify the --patch argument"
)

// RunOptions ...
Expand All @@ -91,6 +93,9 @@ type RunOptions struct {
podUID string
nodeName string

patch string
patchType string

clientConfig *rest.Config
}

Expand Down Expand Up @@ -142,6 +147,8 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams)
cmd.Flags().BoolVar(&o.fetchHeaders, "fetch-headers", o.fetchHeaders, "Whether to fetch linux headers or not")
cmd.Flags().Int64Var(&o.deadline, "deadline", o.deadline, "Maximum time to allow trace to run in seconds")
cmd.Flags().Int64Var(&o.deadlineGracePeriod, "deadline-grace-period", o.deadlineGracePeriod, "Maximum wait time to print maps or histograms after deadline, in seconds")
cmd.Flags().StringVar(&o.patch, "patch", "", "path of YAML or JSON file used to patch the job definition before creation")
cmd.Flags().StringVar(&o.patchType, "patch-type", "", "patch strategy to use: json, merge, or strategic")

return cmd
}
Expand Down Expand Up @@ -175,6 +182,17 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error {
return fmt.Errorf(bpftraceEmptyErrString)
}

havePatch := cmd.Flag("patch").Changed
havePatchType := cmd.Flag("patch-type").Changed

if havePatch && !havePatchType {
return fmt.Errorf(bpftracePatchWithoutTypeErrString)
}

if !havePatch && havePatchType {
return fmt.Errorf(bpftracePatchTypeWithoutPatchErrString)
}

return nil
}

Expand Down Expand Up @@ -318,6 +336,8 @@ func (o *RunOptions) Run() error {
FetchHeaders: o.fetchHeaders,
Deadline: o.deadline,
DeadlineGracePeriod: o.deadlineGracePeriod,
Patch: o.patch,
PatchType: o.patchType,
}

job, err := tc.CreateJob(tj)
Expand Down
86 changes: 86 additions & 0 deletions pkg/tracejob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package tracejob

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strconv"

jsonpatch "github.com/evanphx/json-patch"
"github.com/iovisor/kubectl-trace/pkg/meta"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
batchv1typed "k8s.io/client-go/kubernetes/typed/batch/v1"
corev1typed "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/yaml"
)

type TraceJobClient struct {
Expand All @@ -41,6 +46,8 @@ type TraceJob struct {
DeadlineGracePeriod int64
StartTime *metav1.Time
Status TraceJobStatus
Patch string
PatchType string
}

// WithOutStream setup a file stream to output trace job operation information
Expand Down Expand Up @@ -473,6 +480,16 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) {
if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil {
return nil, err
}

// Optionally patch the job before creating it
if nj.PatchType != "" && nj.Patch != "" {
newJob, err := patchJobFile(job, nj.PatchType, nj.Patch)
if err != nil {
return nil, err
}
job = newJob
}

return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{})
}

Expand Down Expand Up @@ -548,3 +565,72 @@ func jobStatus(j batchv1.Job) TraceJobStatus {
}
return TraceJobUnknown
}

var patchTypes = map[string]types.PatchType{
"json": types.JSONPatchType,
"merge": types.MergePatchType,
"strategic": types.StrategicMergePatchType,
}

func patchJobFile(j *batchv1.Job, patchType, patchPath string) (*batchv1.Job, error) {
patchYAML, err := ioutil.ReadFile(patchPath)
if err != nil {
return nil, fmt.Errorf("failed to read patch yaml path %v: %s", patchPath, err)
}
return patchJob(j, patchType, patchYAML)
}

func patchJob(j *batchv1.Job, patchType string, patchBytes []byte) (*batchv1.Job, error) {
var err error
patchJSON := patchBytes

if !json.Valid(patchBytes) {
// Convert YAML to JSON for patching
patchJSON, err = yamlutil.ToJSON(patchBytes)
if err != nil {
return nil, fmt.Errorf("converting patch yaml to json: %s", err)
}
}

jobYAML, err := yaml.Marshal(j)
if err != nil {
return nil, fmt.Errorf("marshal job to yaml: %s", err)
}

jobJSON, err := yamlutil.ToJSON(jobYAML)
if err != nil {
return nil, fmt.Errorf("converting job yaml to json: %s", err)
}

// Patch job JSON
typ := patchTypes[patchType]
switch typ {
case types.JSONPatchType:
raw, err := jsonpatch.DecodePatch(patchJSON)
if err != nil {
return nil, fmt.Errorf("decoding json patch: %s", err)
}
jobJSON, err = raw.Apply(jobJSON)

case types.MergePatchType:
jobJSON, err = jsonpatch.MergePatch(jobJSON, patchJSON)

case types.StrategicMergePatchType:
jobJSON, err = strategicpatch.StrategicMergePatch(jobJSON, patchJSON, batchv1.Job{})

default:
return nil, fmt.Errorf("%v is an invalid patch type", patchType)
}

if err != nil {
return nil, fmt.Errorf("applying %s patch to job: %s", patchType, err)
}

// Unmarshal back to Job object
newJob := batchv1.Job{}
if err = json.Unmarshal(jobJSON, &newJob); err != nil {
return nil, fmt.Errorf("failed to marshal job from patched json: %s", err)
}

return &newJob, nil
}
75 changes: 75 additions & 0 deletions pkg/tracejob/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package tracejob

import (
"reflect"
"testing"

batchv1 "k8s.io/api/batch/v1"
)

type patchTest struct {
patchType string
patch []byte
}

var patchJSON1 = []byte(`
- op: replace
path: "/spec/backOffLimit"
value: 123
- op: add
path: "/spec/template/hostPID"
value: true
- op: remove
path: "/spec/completions"
`)
var patchJSON2 = []byte(`[
{ "op": "replace", "path": "/spec/backOffLimit", "value": 123 },
{ "op": "add", "path": "/spec/template/hostPID", "value": true },
{ "op": "remove", "path": "/spec/completions"}
]`)

var patchMerge = []byte(`
spec:
backoffLimit: 123
completions: null
template:
hostPID: true
`)

var testCases = []patchTest{
{patchType: "json", patch: patchJSON1},
{patchType: "json", patch: patchJSON2},
{patchType: "merge", patch: patchMerge},
{patchType: "strategic", patch: patchMerge},
}

func TestPatchJobJSON(t *testing.T) {
for _, c := range testCases {
job := getJob()
newJob, err := patchJob(job, c.patchType, c.patch)
if err != nil {
t.Error(err)
}

// Update expected value
job.Spec.BackoffLimit = int32Ptr(123)
job.Spec.Template.Spec.HostPID = true
job.Spec.Completions = nil

if reflect.DeepEqual(job, newJob) {
t.Errorf("patch %s job does not match expected", c.patchType)
}
}
}

func getJob() *batchv1.Job {
return &batchv1.Job{
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: int64Ptr(60),
TTLSecondsAfterFinished: int32Ptr(5),
Parallelism: int32Ptr(1),
Completions: int32Ptr(1),
BackoffLimit: int32Ptr(1),
},
}
}

0 comments on commit acf43b0

Please sign in to comment.