diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 6c0a7414..8dd4ad93 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -58,13 +58,15 @@ var ( # Run a bpftrace inline program on a pod container with a custom image for the bpftrace container that will run your program in the cluster %[1]s trace run pod/nginx nginx -e "tracepoint:syscalls:sys_enter_* { @[probe] = count(); } --imagename=quay.io/custom-bpftrace-image-name"` - runCommand = "run" - usageString = "(POD | TYPE/NAME)" - requiredArgErrString = fmt.Sprintf("%s is a required argument for the %s command", usageString, runCommand) - containerAsArgOrFlagErrString = "specify container inline as argument or via its flag" - bpftraceMissingErrString = "the bpftrace program is mandatory" - bpftraceDoubleErrString = "specify the bpftrace program either via an external file or via a literal string, not both" - bpftraceEmptyErrString = "the bpftrace programm cannot be empty" + runCommand = "run" + usageString = "(POD | TYPE/NAME)" + requiredArgErrString = fmt.Sprintf("%s is a required argument for the %s command", usageString, runCommand) + containerAsArgOrFlagErrString = "specify container inline as argument or via its flag" + bpftraceMissingErrString = "the bpftrace program is mandatory" + bpftraceDoubleErrString = "specify the bpftrace program either via an external file or via a literal string, not both" + bpftraceEmptyErrString = "the bpftrace programm cannot be empty" + bpftracePatchWithoutTypeErrString = "to use --patch you must also specify the --patch-type argument" + bpftracePatchTypeWithoutPatchErrString = "to use --patch-type you must specify the --patch argument" ) // RunOptions ... @@ -91,6 +93,9 @@ type RunOptions struct { podUID string nodeName string + patch string + patchType string + clientConfig *rest.Config } @@ -142,6 +147,8 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams) cmd.Flags().BoolVar(&o.fetchHeaders, "fetch-headers", o.fetchHeaders, "Whether to fetch linux headers or not") cmd.Flags().Int64Var(&o.deadline, "deadline", o.deadline, "Maximum time to allow trace to run in seconds") cmd.Flags().Int64Var(&o.deadlineGracePeriod, "deadline-grace-period", o.deadlineGracePeriod, "Maximum wait time to print maps or histograms after deadline, in seconds") + cmd.Flags().StringVar(&o.patch, "patch", "", "path of YAML or JSON file used to patch the job definition before creation") + cmd.Flags().StringVar(&o.patchType, "patch-type", "", "patch strategy to use: json, merge, or strategic") return cmd } @@ -175,6 +182,17 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { return fmt.Errorf(bpftraceEmptyErrString) } + havePatch := cmd.Flag("patch").Changed + havePatchType := cmd.Flag("patch-type").Changed + + if havePatch && !havePatchType { + return fmt.Errorf(bpftracePatchWithoutTypeErrString) + } + + if !havePatch && havePatchType { + return fmt.Errorf(bpftracePatchTypeWithoutPatchErrString) + } + return nil } @@ -318,6 +336,8 @@ func (o *RunOptions) Run() error { FetchHeaders: o.fetchHeaders, Deadline: o.deadline, DeadlineGracePeriod: o.deadlineGracePeriod, + Patch: o.patch, + PatchType: o.patchType, } job, err := tc.CreateJob(tj) diff --git a/pkg/tracejob/job.go b/pkg/tracejob/job.go index 313b8856..3710c4c0 100644 --- a/pkg/tracejob/job.go +++ b/pkg/tracejob/job.go @@ -2,19 +2,24 @@ package tracejob import ( "context" + "encoding/json" "fmt" "io" "io/ioutil" "strconv" + jsonpatch "github.com/evanphx/json-patch" "github.com/iovisor/kubectl-trace/pkg/meta" batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" batchv1typed "k8s.io/client-go/kubernetes/typed/batch/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" + "sigs.k8s.io/yaml" ) type TraceJobClient struct { @@ -41,6 +46,8 @@ type TraceJob struct { DeadlineGracePeriod int64 StartTime *metav1.Time Status TraceJobStatus + Patch string + PatchType string } // WithOutStream setup a file stream to output trace job operation information @@ -473,6 +480,16 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil { return nil, err } + + // Optionally patch the job before creating it + if nj.PatchType != "" && nj.Patch != "" { + newJob, err := patchJobFile(job, nj.PatchType, nj.Patch) + if err != nil { + return nil, err + } + job = newJob + } + return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{}) } @@ -548,3 +565,72 @@ func jobStatus(j batchv1.Job) TraceJobStatus { } return TraceJobUnknown } + +var patchTypes = map[string]types.PatchType{ + "json": types.JSONPatchType, + "merge": types.MergePatchType, + "strategic": types.StrategicMergePatchType, +} + +func patchJobFile(j *batchv1.Job, patchType, patchPath string) (*batchv1.Job, error) { + patchYAML, err := ioutil.ReadFile(patchPath) + if err != nil { + return nil, fmt.Errorf("failed to read patch yaml path %v: %s", patchPath, err) + } + return patchJob(j, patchType, patchYAML) +} + +func patchJob(j *batchv1.Job, patchType string, patchBytes []byte) (*batchv1.Job, error) { + var err error + patchJSON := patchBytes + + if !json.Valid(patchBytes) { + // Convert YAML to JSON for patching + patchJSON, err = yamlutil.ToJSON(patchBytes) + if err != nil { + return nil, fmt.Errorf("converting patch yaml to json: %s", err) + } + } + + jobYAML, err := yaml.Marshal(j) + if err != nil { + return nil, fmt.Errorf("marshal job to yaml: %s", err) + } + + jobJSON, err := yamlutil.ToJSON(jobYAML) + if err != nil { + return nil, fmt.Errorf("converting job yaml to json: %s", err) + } + + // Patch job JSON + typ := patchTypes[patchType] + switch typ { + case types.JSONPatchType: + raw, err := jsonpatch.DecodePatch(patchJSON) + if err != nil { + return nil, fmt.Errorf("decoding json patch: %s", err) + } + jobJSON, err = raw.Apply(jobJSON) + + case types.MergePatchType: + jobJSON, err = jsonpatch.MergePatch(jobJSON, patchJSON) + + case types.StrategicMergePatchType: + jobJSON, err = strategicpatch.StrategicMergePatch(jobJSON, patchJSON, batchv1.Job{}) + + default: + return nil, fmt.Errorf("%v is an invalid patch type", patchType) + } + + if err != nil { + return nil, fmt.Errorf("applying %s patch to job: %s", patchType, err) + } + + // Unmarshal back to Job object + newJob := batchv1.Job{} + if err = json.Unmarshal(jobJSON, &newJob); err != nil { + return nil, fmt.Errorf("failed to marshal job from patched json: %s", err) + } + + return &newJob, nil +} diff --git a/pkg/tracejob/job_test.go b/pkg/tracejob/job_test.go new file mode 100644 index 00000000..4ffeb053 --- /dev/null +++ b/pkg/tracejob/job_test.go @@ -0,0 +1,75 @@ +package tracejob + +import ( + "reflect" + "testing" + + batchv1 "k8s.io/api/batch/v1" +) + +type patchTest struct { + patchType string + patch []byte +} + +var patchJSON1 = []byte(` +- op: replace + path: "/spec/backOffLimit" + value: 123 +- op: add + path: "/spec/template/hostPID" + value: true +- op: remove + path: "/spec/completions" +`) +var patchJSON2 = []byte(`[ + { "op": "replace", "path": "/spec/backOffLimit", "value": 123 }, + { "op": "add", "path": "/spec/template/hostPID", "value": true }, + { "op": "remove", "path": "/spec/completions"} +]`) + +var patchMerge = []byte(` +spec: + backoffLimit: 123 + completions: null + template: + hostPID: true +`) + +var testCases = []patchTest{ + {patchType: "json", patch: patchJSON1}, + {patchType: "json", patch: patchJSON2}, + {patchType: "merge", patch: patchMerge}, + {patchType: "strategic", patch: patchMerge}, +} + +func TestPatchJobJSON(t *testing.T) { + for _, c := range testCases { + job := getJob() + newJob, err := patchJob(job, c.patchType, c.patch) + if err != nil { + t.Error(err) + } + + // Update expected value + job.Spec.BackoffLimit = int32Ptr(123) + job.Spec.Template.Spec.HostPID = true + job.Spec.Completions = nil + + if reflect.DeepEqual(job, newJob) { + t.Errorf("patch %s job does not match expected", c.patchType) + } + } +} + +func getJob() *batchv1.Job { + return &batchv1.Job{ + Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: int64Ptr(60), + TTLSecondsAfterFinished: int32Ptr(5), + Parallelism: int32Ptr(1), + Completions: int32Ptr(1), + BackoffLimit: int32Ptr(1), + }, + } +}