diff --git a/pkg/controller/job-controller.go b/pkg/controller/job-controller.go index b43afa7d23e..0c0d8c60d6b 100644 --- a/pkg/controller/job-controller.go +++ b/pkg/controller/job-controller.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "reflect" - "strings" "sync" "time" @@ -30,10 +29,8 @@ import ( stsv1alpha1 "github.com/minio/operator/pkg/apis/sts.min.io/v1alpha1" jobinformers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1" joblisters "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1" - runtime2 "github.com/minio/operator/pkg/runtime" "github.com/minio/operator/pkg/utils/miniojob" batchjobv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,34 +45,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -const ( - commandFilePath = "/temp" - minioJobName = "job.min.io/job-name" - minioJobCRName = "job.min.io/job-cr-name" - // DefaultMCImage - job mc image - DefaultMCImage = "minio/mc:latest" - // MinioJobPhaseError - error - MinioJobPhaseError = "Error" - // MinioJobPhaseSuccess - success - MinioJobPhaseSuccess = "Success" - // MinioJobPhaseRunning - running - MinioJobPhaseRunning = "Running" - // MinioJobPhaseFailed - failed - MinioJobPhaseFailed = "Failed" -) - -var operationAlias = map[string]string{ - "make-bucket": "mb", - "admin/policy/add": "admin/policy/create", -} - -var jobOperation = map[string][]miniojob.FieldsFunc{ - "mb": {miniojob.FLAGS(), miniojob.Sanitize(miniojob.ALIAS(), miniojob.Static("/"), miniojob.Key("name")), miniojob.Static("--ignore-existing")}, - "admin/user/add": {miniojob.ALIAS(), miniojob.Key("user"), miniojob.Key("password")}, - "admin/policy/create": {miniojob.ALIAS(), miniojob.Key("name"), miniojob.File("policy", "json")}, - "admin/policy/attach": {miniojob.ALIAS(), miniojob.Key("policy"), miniojob.OneOf(miniojob.KeyForamt("user", "--user"), miniojob.KeyForamt("group", "--group"))}, -} - // JobController struct watches the Kubernetes API for changes to Tenant resources type JobController struct { namespacesToWatch set.StringSet @@ -155,25 +124,25 @@ func NewJobController( jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, new interface{}) { newJob := new.(*batchjobv1.Job) - jobName, ok := newJob.Labels[minioJobName] + jobName, ok := newJob.Labels[miniojob.MinioJobName] if !ok { return } - jobCRName, ok := newJob.Labels[minioJobCRName] + jobCRName, ok := newJob.Labels[miniojob.MinioJobCRName] if !ok { return } val, ok := globalIntervalJobStatus.Load(fmt.Sprintf("%s/%s", newJob.GetNamespace(), jobCRName)) if ok { - intervalJob := val.(*MinIOIntervalJob) + intervalJob := val.(*miniojob.MinIOIntervalJob) command, ok := intervalJob.CommandMap[jobName] if ok { if newJob.Status.Succeeded > 0 { - command.setStatus(true, "") + command.SetStatus(true, "") } else { for _, condition := range newJob.Status.Conditions { if condition.Type == batchjobv1.JobFailed { - command.setStatus(false, condition.Message) + command.SetStatus(false, condition.Message) break } } @@ -237,8 +206,8 @@ func (c *JobController) SyncHandler(key string) (Result, error) { } return WrapResult(Result{}, err) } - // if job cr is success, do nothing - if jobCR.Status.Phase == MinioJobPhaseSuccess { + // if job cr is Success, do nothing + if jobCR.Status.Phase == miniojob.MinioJobPhaseSuccess { // delete the job status globalIntervalJobStatus.Delete(fmt.Sprintf("%s/%s", jobCR.Namespace, jobCR.Name)) return WrapResult(Result{}, nil) @@ -256,7 +225,7 @@ func (c *JobController) SyncHandler(key string) (Result, error) { } err = c.k8sClient.Get(ctx, client.ObjectKeyFromObject(tenant), tenant) if err != nil { - jobCR.Status.Phase = MinioJobPhaseError + jobCR.Status.Phase = miniojob.MinioJobPhaseError jobCR.Status.Message = fmt.Sprintf("Get tenant %s/%s error:%v", jobCR.Spec.TenantRef.Namespace, jobCR.Spec.TenantRef.Name, err) err = c.updateJobStatus(ctx, &jobCR) return WrapResult(Result{}, err) @@ -282,15 +251,15 @@ func (c *JobController) SyncHandler(key string) (Result, error) { if !saFound { return WrapResult(Result{}, fmt.Errorf("no serviceaccount found")) } - err = intervalJob.createCommandJob(ctx, c.k8sClient) + err = intervalJob.CreateCommandJob(ctx, c.k8sClient) if err != nil { - jobCR.Status.Phase = MinioJobPhaseError + jobCR.Status.Phase = miniojob.MinioJobPhaseError jobCR.Status.Message = fmt.Sprintf("Create job error:%v", err) err = c.updateJobStatus(ctx, &jobCR) return WrapResult(Result{}, err) } // update status - jobCR.Status = intervalJob.getMinioJobStatus(ctx) + jobCR.Status = intervalJob.GetMinioJobStatus(ctx) err = c.updateJobStatus(ctx, &jobCR) return WrapResult(Result{}, err) } @@ -299,276 +268,7 @@ func (c *JobController) updateJobStatus(ctx context.Context, job *v1alpha1.MinIO return c.k8sClient.Status().Update(ctx, job) } -func operationAliasToMC(operation string) (op string, found bool) { - for k, v := range operationAlias { - if k == operation { - return v, true - } - if v == operation { - return v, true - } - } - // operation like admin/policy/attach match nothing. - // but it's a valid operation - if strings.Contains(operation, "/") { - return operation, true - } - // operation like replace match nothing - // it's not a valid operation - return "", false -} - -// MinIOIntervalJobCommandFile - Job run command need a file such as /temp/policy.json -type MinIOIntervalJobCommandFile struct { - Name string - Ext string - Dir string - Content string -} - -// MinIOIntervalJobCommand - Job run command -type MinIOIntervalJobCommand struct { - mutex sync.RWMutex - JobName string - MCOperation string - Command string - DepnedsOn []string - Files []MinIOIntervalJobCommandFile - Succeeded bool - Message string - Created bool -} - -func (jobCommand *MinIOIntervalJobCommand) setStatus(success bool, message string) { - if jobCommand == nil { - return - } - jobCommand.mutex.Lock() - jobCommand.Succeeded = success - jobCommand.Message = message - jobCommand.mutex.Unlock() -} - -func (jobCommand *MinIOIntervalJobCommand) success() bool { - if jobCommand == nil { - return false - } - jobCommand.mutex.Lock() - defer jobCommand.mutex.Unlock() - return jobCommand.Succeeded -} - -func (jobCommand *MinIOIntervalJobCommand) createJob(ctx context.Context, k8sClient client.Client, jobCR *v1alpha1.MinIOJob) error { - if jobCommand == nil { - return nil - } - jobCommand.mutex.RLock() - if jobCommand.Created || jobCommand.Succeeded { - jobCommand.mutex.RUnlock() - return nil - } - jobCommand.mutex.RUnlock() - jobCommands := []string{} - commands := []string{"mc"} - commands = append(commands, strings.SplitN(jobCommand.MCOperation, "/", -1)...) - commands = append(commands, strings.SplitN(jobCommand.Command, " ", -1)...) - for _, command := range commands { - trimCommand := strings.TrimSpace(command) - if trimCommand != "" { - jobCommands = append(jobCommands, trimCommand) - } - } - jobCommands = append(jobCommands, "--insecure") - objs := []client.Object{} - mcImage := jobCR.Spec.MCImage - if mcImage == "" { - mcImage = DefaultMCImage - } - job := &batchjobv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", jobCR.Name, jobCommand.JobName), - Namespace: jobCR.Namespace, - Labels: map[string]string{ - minioJobName: jobCommand.JobName, - minioJobCRName: jobCR.Name, - }, - Annotations: map[string]string{ - "job.min.io/operation": jobCommand.MCOperation, - }, - }, - Spec: batchjobv1.JobSpec{ - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - minioJobName: jobCommand.JobName, - }, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: jobCR.Spec.ServiceAccountName, - Containers: []corev1.Container{ - { - Name: "mc", - Image: mcImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "MC_HOST_myminio", - Value: fmt.Sprintf("https://$(ACCESS_KEY):$(SECRET_KEY)@minio.%s.svc.cluster.local", jobCR.Namespace), - }, - { - Name: "MC_STS_ENDPOINT_myminio", - Value: fmt.Sprintf("https://sts.%s.svc.cluster.local:4223/sts/%s", miniov2.GetNSFromFile(), jobCR.Namespace), - }, - { - Name: "MC_WEB_IDENTITY_TOKEN_FILE_myminio", - Value: "/var/run/secrets/kubernetes.io/serviceaccount/token", - }, - }, - Command: jobCommands, - }, - }, - }, - }, - }, - } - if jobCR.Spec.FailureStrategy == v1alpha1.StopOnFailure { - job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever - } else { - job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure - } - if len(jobCommand.Files) > 0 { - cmName := fmt.Sprintf("%s-%s-cm", jobCR.Name, jobCommand.JobName) - job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: "file-volume", - ReadOnly: true, - MountPath: jobCommand.Files[0].Dir, - }) - job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ - Name: "file-volume", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: cmName, - }, - }, - }, - }) - configMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: cmName, - Namespace: jobCR.Namespace, - Labels: map[string]string{ - "job.min.io/name": jobCR.Name, - }, - }, - Data: map[string]string{}, - } - for _, file := range jobCommand.Files { - configMap.Data[fmt.Sprintf("%s.%s", file.Name, file.Ext)] = file.Content - } - objs = append(objs, configMap) - } - objs = append(objs, job) - for _, obj := range objs { - _, err := runtime2.NewObjectSyncer(ctx, k8sClient, jobCR, func() error { - return nil - }, obj, runtime2.SyncTypeCreateOrUpdate).Sync(ctx) - if err != nil { - return err - } - } - jobCommand.mutex.Lock() - jobCommand.Created = true - jobCommand.mutex.Unlock() - return nil -} - -// MinIOIntervalJob - Interval Job -type MinIOIntervalJob struct { - // to see if that change - JobCR *v1alpha1.MinIOJob - Command []*MinIOIntervalJobCommand - CommandMap map[string]*MinIOIntervalJobCommand -} - -func (intervalJob *MinIOIntervalJob) getMinioJobStatus(ctx context.Context) v1alpha1.MinIOJobStatus { - status := v1alpha1.MinIOJobStatus{} - failed := false - running := false - message := "" - for _, command := range intervalJob.Command { - command.mutex.RLock() - if command.Succeeded { - status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ - Name: command.JobName, - Result: "success", - Message: command.Message, - }) - } else { - failed = true - message = command.Message - // if success is false and message is empty, it means the job is running - if command.Message == "" { - running = true - status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ - Name: command.JobName, - Result: "running", - Message: command.Message, - }) - } else { - status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ - Name: command.JobName, - Result: "failed", - Message: command.Message, - }) - } - } - command.mutex.RUnlock() - } - if running { - status.Phase = MinioJobPhaseRunning - } else { - if failed { - status.Phase = MinioJobPhaseFailed - status.Message = message - } else { - status.Phase = MinioJobPhaseSuccess - } - } - return status -} - -func (intervalJob *MinIOIntervalJob) createCommandJob(ctx context.Context, k8sClient client.Client) error { - for _, command := range intervalJob.Command { - if len(command.DepnedsOn) == 0 { - err := command.createJob(ctx, k8sClient, intervalJob.JobCR) - if err != nil { - return err - } - } else { - allDepsSuccess := true - for _, dep := range command.DepnedsOn { - status, found := intervalJob.CommandMap[dep] - if !found { - return fmt.Errorf("dependent job %s not found", dep) - } - if !status.success() { - allDepsSuccess = false - break - } - } - if allDepsSuccess { - err := command.createJob(ctx, k8sClient, intervalJob.JobCR) - if err != nil { - return err - } - } - } - } - return nil -} - -func checkMinIOJob(jobCR *v1alpha1.MinIOJob) (intervalJob *MinIOIntervalJob, err error) { +func checkMinIOJob(jobCR *v1alpha1.MinIOJob) (intervalJob *miniojob.MinIOIntervalJob, err error) { defer func() { if err != nil { globalIntervalJobStatus.Delete(fmt.Sprintf("%s/%s", jobCR.Namespace, jobCR.Name)) @@ -576,16 +276,16 @@ func checkMinIOJob(jobCR *v1alpha1.MinIOJob) (intervalJob *MinIOIntervalJob, err }() val, found := globalIntervalJobStatus.Load(fmt.Sprintf("%s/%s", jobCR.Namespace, jobCR.Name)) if found { - intervalJob = val.(*MinIOIntervalJob) + intervalJob = val.(*miniojob.MinIOIntervalJob) if reflect.DeepEqual(intervalJob.JobCR.Spec, jobCR.Spec) { intervalJob.JobCR.UID = jobCR.UID return intervalJob, nil } } - intervalJob = &MinIOIntervalJob{ + intervalJob = &miniojob.MinIOIntervalJob{ JobCR: jobCR.DeepCopy(), - Command: []*MinIOIntervalJobCommand{}, - CommandMap: map[string]*MinIOIntervalJobCommand{}, + Command: []*miniojob.MinIOIntervalJobCommand{}, + CommandMap: map[string]*miniojob.MinIOIntervalJobCommand{}, } if jobCR.Spec.TenantRef.Namespace == "" { return intervalJob, fmt.Errorf("tenant namespace is empty") @@ -597,48 +297,20 @@ func checkMinIOJob(jobCR *v1alpha1.MinIOJob) (intervalJob *MinIOIntervalJob, err return intervalJob, fmt.Errorf("serviceaccount name is empty") } for index, val := range jobCR.Spec.Commands { - mcCommand, found := operationAliasToMC(val.Operation) + mcCommand, found := miniojob.OperationAliasToMC(val.Operation) if !found { return intervalJob, fmt.Errorf("operation %s is not supported", val.Operation) } - commands := []string{} - files := []MinIOIntervalJobCommandFile{} - argsFuncs, found := jobOperation[mcCommand] + argsFuncs, found := miniojob.JobOperation[mcCommand] if !found { return intervalJob, fmt.Errorf("operation %s is not supported", mcCommand) } - for _, argsFunc := range argsFuncs { - jobArg, err := argsFunc(val.Args) - if err != nil { - return intervalJob, err - } - if jobArg.IsFile() { - files = append(files, MinIOIntervalJobCommandFile{ - Name: jobArg.FileName, - Ext: jobArg.FileExt, - Dir: commandFilePath, - Content: jobArg.FileContext, - }) - commands = append(commands, fmt.Sprintf("%s/%s.%s", commandFilePath, jobArg.FileName, jobArg.FileExt)) - } else { - if jobArg.Command != "" { - commands = append(commands, jobArg.Command) - } - } - } - jobCommand := MinIOIntervalJobCommand{ - JobName: val.Name, - MCOperation: mcCommand, - Command: strings.Join(commands, " "), - DepnedsOn: val.DependsOn, - Files: files, - } - // some commands need to have a empty name - if jobCommand.JobName == "" { - jobCommand.JobName = fmt.Sprintf("command-%d", index) + jobCommand, err := miniojob.GenerateMinIOIntervalJobCommand(mcCommand, index, val.DependsOn, val.Name, val.Args, argsFuncs) + if err != nil { + return intervalJob, err } - intervalJob.Command = append(intervalJob.Command, &jobCommand) - intervalJob.CommandMap[jobCommand.JobName] = &jobCommand + intervalJob.Command = append(intervalJob.Command, jobCommand) + intervalJob.CommandMap[jobCommand.JobName] = jobCommand } // check all dependon for _, command := range intervalJob.Command { diff --git a/pkg/utils/miniojob/minioJob.go b/pkg/utils/miniojob/minioJob.go index de3303da4b7..38dac0a4127 100644 --- a/pkg/utils/miniojob/minioJob.go +++ b/pkg/utils/miniojob/minioJob.go @@ -22,17 +22,25 @@ import ( "strings" ) +// ArgType - arg type +type ArgType int + +const ( + // ArgTypeKey - key=value print value + ArgTypeKey ArgType = iota + // ArgTypeFile - key=value print /temp/value.ext + ArgTypeFile + // ArgTypeKeyFile - key=value print key="/temp/value.ext" + ArgTypeKeyFile +) + // Arg - parse the arg result type Arg struct { Command string FileName string FileExt string FileContext string -} - -// IsFile - if it is a file -func (arg Arg) IsFile() bool { - return arg.FileName != "" + ArgType ArgType } // FieldsFunc - alias function @@ -74,6 +82,7 @@ func File(fName string, ext string) FieldsFunc { out.FileName = fName out.FileExt = ext out.FileContext = strings.TrimSpace(val) + out.ArgType = ArgTypeFile return out, nil } } @@ -81,6 +90,50 @@ func File(fName string, ext string) FieldsFunc { } } +// KeyValue - match key and putout the key, like endpoint="https://webhook-1.example.net" +func KeyValue(key string) FieldsFunc { + return func(args map[string]string) (out Arg, err error) { + if args == nil { + return out, fmt.Errorf("args is nil") + } + val, ok := args[key] + if !ok { + return out, fmt.Errorf("key %s not found", key) + } + out.Command = fmt.Sprintf(`%s="%s"`, key, val) + return out, nil + } +} + +// KeyFile - match key and putout the key, like client_cert="[here is content]" +func KeyFile(key string, ext string) FieldsFunc { + return func(args map[string]string) (out Arg, err error) { + if args == nil { + return out, fmt.Errorf("args is nil") + } + val, ok := args[key] + if !ok { + return out, fmt.Errorf("key %s not found", key) + } + out.FileName = key + out.FileExt = ext + out.FileContext = strings.TrimSpace(val) + out.ArgType = ArgTypeKeyFile + return out, nil + } +} + +// Option - ignore the error +func Option(opt FieldsFunc) FieldsFunc { + return func(args map[string]string) (out Arg, err error) { + if args == nil { + return out, nil + } + out, _ = opt(args) + return out, nil + } +} + // KeyForamt - match key and get outPut to replace $0 to output the value // if format not contain $0, will add $0 to the end func KeyForamt(key string, format string) FieldsFunc { diff --git a/pkg/utils/miniojob/minioJob_test.go b/pkg/utils/miniojob/minioJob_test.go index 0f850001e29..e95d2b3a742 100644 --- a/pkg/utils/miniojob/minioJob_test.go +++ b/pkg/utils/miniojob/minioJob_test.go @@ -143,26 +143,140 @@ func TestParser(t *testing.T) { for _, tc := range testCase { cmd, err := tc.command(args) if tc.expectError && err == nil { - t.Fatalf("expect error") + t.Fatalf("expectCommand error") } if !tc.expectError && err != nil { - t.Fatalf("expect not a error") + t.Fatalf("expectCommand not a error") } if !tc.expectError { if tc.expect.Command != "" && cmd.Command != tc.expect.Command { - t.Fatalf("expect %s, but got %s", tc.expect, cmd.Command) + t.Fatalf("expectCommand %s, but got %s", tc.expect.Command, cmd.Command) } if tc.expect.FileName != "" { if tc.expect.FileContext != cmd.FileContext { - t.Fatalf("expect %s, but got %s", tc.expect.FileContext, cmd.FileContext) + t.Fatalf("expectCommand %s, but got %s", tc.expect.FileContext, cmd.FileContext) } if tc.expect.FileExt != cmd.FileExt { - t.Fatalf("expect %s, but got %s", tc.expect.FileExt, cmd.FileExt) + t.Fatalf("expectCommand %s, but got %s", tc.expect.FileExt, cmd.FileExt) } if tc.expect.FileName != cmd.FileName { - t.Fatalf("expect %s, but got %s", tc.expect.FileName, cmd.FileName) + t.Fatalf("expectCommand %s, but got %s", tc.expect.FileName, cmd.FileName) } } } } } + +func TestAdminPolicyCreate(t *testing.T) { + mcCommand := "admin/policy/create" + funcs := JobOperation[mcCommand] + testCase := []struct { + name string + args map[string]string + expectError bool + expectCommand string + expectFileNumber int + }{ + { + name: "testFull", + args: map[string]string{ + "name": "mypolicy", + "policy": "JsonContent", + }, + expectCommand: "myminio mypolicy /temp/policy.json", + expectFileNumber: 1, + }, + { + name: "testError1", + args: map[string]string{ + "name": "mypolicy", + }, + expectCommand: "", + expectError: true, + }, + { + name: "testError2", + args: map[string]string{ + "policy": "JsonContent", + }, + expectCommand: "", + expectError: true, + }, + } + for _, tc := range testCase { + command, err := GenerateMinIOIntervalJobCommand(mcCommand, 0, nil, "test", tc.args, funcs) + if !tc.expectError { + if err != nil { + t.Fatal(err) + } + if command.Command != tc.expectCommand { + t.Fatalf("[%s] expectCommand %s, but got %s", tc.name, tc.expectCommand, command.Command) + } + } else { + if err == nil { + t.Fatalf("[%s] expectCommand error", tc.name) + } + } + } +} + +func TestMCConfigSet(t *testing.T) { + mcCommand := "admin/config/set" + funcs := JobOperation[mcCommand] + testCase := []struct { + name string + args map[string]string + expectCommand string + expectError bool + expectFileNumber int + }{ + { + name: "testFull", + args: map[string]string{ + "webhookName": "webhook1", + "endpoint": "endpoint1", + "auth_token": "token1", + "client_cert": "cert1", + "client_key": "key1", + }, + expectCommand: "myminio webhook1 endpoint=\"endpoint1\" auth_token=\"token1\" client_key=\"/temp/client_key.key\" client_cert=\"/temp/client_cert.pem\"", + expectFileNumber: 2, + }, + { + name: "testOptionFile", + args: map[string]string{ + "webhookName": "webhook1", + "endpoint": "endpoint1", + "auth_token": "token1", + "client_key": "key1", + }, + expectCommand: "myminio webhook1 endpoint=\"endpoint1\" auth_token=\"token1\" client_key=\"/temp/client_key.key\"", + expectFileNumber: 1, + }, + { + name: "testOptionKeyValue", + args: map[string]string{ + "webhookName": "webhook1", + "endpoint": "endpoint1", + "client_key": "key1", + }, + expectCommand: "myminio webhook1 endpoint=\"endpoint1\" client_key=\"/temp/client_key.key\"", + expectFileNumber: 1, + }, + } + for _, tc := range testCase { + command, err := GenerateMinIOIntervalJobCommand(mcCommand, 0, nil, "test", tc.args, funcs) + if !tc.expectError { + if err != nil { + t.Fatal(err) + } + if command.Command != tc.expectCommand { + t.Fatalf("[%s] expectCommand %s, but got %s", tc.name, tc.expectCommand, command.Command) + } + } else { + if err == nil { + t.Fatalf("[%s] expectCommand error", tc.name) + } + } + } +} diff --git a/pkg/utils/miniojob/types.go b/pkg/utils/miniojob/types.go new file mode 100644 index 00000000000..b0ad30fd0f8 --- /dev/null +++ b/pkg/utils/miniojob/types.go @@ -0,0 +1,387 @@ +// This file is part of MinIO Operator +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package miniojob + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/minio/operator/pkg/apis/job.min.io/v1alpha1" + miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" + "github.com/minio/operator/pkg/runtime" + batchjobv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // DefaultMCImage - job mc image + DefaultMCImage = "minio/mc:latest" + // MinioJobName - job name + MinioJobName = "job.min.io/job-name" + // MinioJobCRName - job cr name + MinioJobCRName = "job.min.io/job-cr-name" + // CommandFilePath - command file path + CommandFilePath = "/temp" + // MinioJobPhaseError - error + MinioJobPhaseError = "Error" + // MinioJobPhaseSuccess - Success + MinioJobPhaseSuccess = "Success" + // MinioJobPhaseRunning - running + MinioJobPhaseRunning = "Running" + // MinioJobPhaseFailed - failed + MinioJobPhaseFailed = "Failed" +) + +var operationAlias = map[string]string{ + "make-bucket": "mb", + "admin/policy/add": "admin/policy/create", +} + +// JobOperation - job operation +var JobOperation = map[string][]FieldsFunc{ + "mb": {FLAGS(), Sanitize(ALIAS(), Static("/"), Key("name")), Static("--ignore-existing")}, + "admin/user/add": {ALIAS(), Key("user"), Key("password")}, + "admin/policy/create": {ALIAS(), Key("name"), File("policy", "json")}, + "admin/policy/attach": {ALIAS(), Key("policy"), OneOf(KeyForamt("user", "--user"), KeyForamt("group", "--group"))}, + "admin/config/set": {ALIAS(), Key("webhookName"), Option(KeyValue("endpoint")), Option(KeyValue("auth_token")), Option(KeyFile("client_key", "key")), Option(KeyFile("client_cert", "pem"))}, +} + +// OperationAliasToMC - convert operation to mc operation +func OperationAliasToMC(operation string) (op string, found bool) { + for k, v := range operationAlias { + if k == operation { + return v, true + } + if v == operation { + return v, true + } + } + // operation like admin/policy/attach match nothing. + // but it's a valid operation + if strings.Contains(operation, "/") { + return operation, true + } + // operation like replace match nothing + // it's not a valid operation + return "", false +} + +// MinIOIntervalJobCommandFile - Job run command need a file such as /temp/policy.json +type MinIOIntervalJobCommandFile struct { + Name string + Ext string + Dir string + Content string +} + +// MinIOIntervalJobCommand - Job run command +type MinIOIntervalJobCommand struct { + mutex sync.RWMutex + JobName string + MCOperation string + Command string + DepnedsOn []string + Files []MinIOIntervalJobCommandFile + Succeeded bool + Message string + Created bool +} + +// SetStatus - set job command status +func (jobCommand *MinIOIntervalJobCommand) SetStatus(success bool, message string) { + if jobCommand == nil { + return + } + jobCommand.mutex.Lock() + jobCommand.Succeeded = success + jobCommand.Message = message + jobCommand.mutex.Unlock() +} + +// Success - check job command status +func (jobCommand *MinIOIntervalJobCommand) Success() bool { + if jobCommand == nil { + return false + } + jobCommand.mutex.Lock() + defer jobCommand.mutex.Unlock() + return jobCommand.Succeeded +} + +// CreateJob - create job +func (jobCommand *MinIOIntervalJobCommand) CreateJob(ctx context.Context, k8sClient client.Client, jobCR *v1alpha1.MinIOJob) error { + if jobCommand == nil { + return nil + } + jobCommand.mutex.RLock() + if jobCommand.Created || jobCommand.Succeeded { + jobCommand.mutex.RUnlock() + return nil + } + jobCommand.mutex.RUnlock() + jobCommands := []string{} + commands := []string{"mc"} + commands = append(commands, strings.SplitN(jobCommand.MCOperation, "/", -1)...) + commands = append(commands, strings.SplitN(jobCommand.Command, " ", -1)...) + for _, command := range commands { + trimCommand := strings.TrimSpace(command) + if trimCommand != "" { + jobCommands = append(jobCommands, trimCommand) + } + } + jobCommands = append(jobCommands, "--insecure") + objs := []client.Object{} + mcImage := jobCR.Spec.MCImage + if mcImage == "" { + mcImage = DefaultMCImage + } + job := &batchjobv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", jobCR.Name, jobCommand.JobName), + Namespace: jobCR.Namespace, + Labels: map[string]string{ + MinioJobName: jobCommand.JobName, + MinioJobCRName: jobCR.Name, + }, + Annotations: map[string]string{ + "job.min.io/operation": jobCommand.MCOperation, + }, + }, + Spec: batchjobv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + MinioJobName: jobCommand.JobName, + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: jobCR.Spec.ServiceAccountName, + Containers: []corev1.Container{ + { + Name: "mc", + Image: mcImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: "MC_HOST_myminio", + Value: fmt.Sprintf("https://$(ACCESS_KEY):$(SECRET_KEY)@minio.%s.svc.cluster.local", jobCR.Namespace), + }, + { + Name: "MC_STS_ENDPOINT_myminio", + Value: fmt.Sprintf("https://sts.%s.svc.cluster.local:4223/sts/%s", miniov2.GetNSFromFile(), jobCR.Namespace), + }, + { + Name: "MC_WEB_IDENTITY_TOKEN_FILE_myminio", + Value: "/var/run/secrets/kubernetes.io/serviceaccount/token", + }, + }, + Command: jobCommands, + }, + }, + }, + }, + }, + } + if jobCR.Spec.FailureStrategy == v1alpha1.StopOnFailure { + job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever + } else { + job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + } + if len(jobCommand.Files) > 0 { + cmName := fmt.Sprintf("%s-%s-cm", jobCR.Name, jobCommand.JobName) + job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: "file-volume", + ReadOnly: true, + MountPath: jobCommand.Files[0].Dir, + }) + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ + Name: "file-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cmName, + }, + }, + }, + }) + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: jobCR.Namespace, + Labels: map[string]string{ + "job.min.io/name": jobCR.Name, + }, + }, + Data: map[string]string{}, + } + for _, file := range jobCommand.Files { + configMap.Data[fmt.Sprintf("%s.%s", file.Name, file.Ext)] = file.Content + } + objs = append(objs, configMap) + } + objs = append(objs, job) + for _, obj := range objs { + _, err := runtime.NewObjectSyncer(ctx, k8sClient, jobCR, func() error { + return nil + }, obj, runtime.SyncTypeCreateOrUpdate).Sync(ctx) + if err != nil { + return err + } + } + jobCommand.mutex.Lock() + jobCommand.Created = true + jobCommand.mutex.Unlock() + return nil +} + +// MinIOIntervalJob - Interval Job +type MinIOIntervalJob struct { + // to see if that change + JobCR *v1alpha1.MinIOJob + Command []*MinIOIntervalJobCommand + CommandMap map[string]*MinIOIntervalJobCommand +} + +// GetMinioJobStatus - get job status +func (intervalJob *MinIOIntervalJob) GetMinioJobStatus(ctx context.Context) v1alpha1.MinIOJobStatus { + status := v1alpha1.MinIOJobStatus{} + failed := false + running := false + message := "" + for _, command := range intervalJob.Command { + command.mutex.RLock() + if command.Succeeded { + status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ + Name: command.JobName, + Result: "Success", + Message: command.Message, + }) + } else { + failed = true + message = command.Message + // if Success is false and message is empty, it means the job is running + if command.Message == "" { + running = true + status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ + Name: command.JobName, + Result: "running", + Message: command.Message, + }) + } else { + status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{ + Name: command.JobName, + Result: "failed", + Message: command.Message, + }) + } + } + command.mutex.RUnlock() + } + if running { + status.Phase = MinioJobPhaseRunning + } else { + if failed { + status.Phase = MinioJobPhaseFailed + status.Message = message + } else { + status.Phase = MinioJobPhaseSuccess + } + } + return status +} + +// CreateCommandJob - create command job +func (intervalJob *MinIOIntervalJob) CreateCommandJob(ctx context.Context, k8sClient client.Client) error { + for _, command := range intervalJob.Command { + if len(command.DepnedsOn) == 0 { + err := command.CreateJob(ctx, k8sClient, intervalJob.JobCR) + if err != nil { + return err + } + } else { + allDepsSuccess := true + for _, dep := range command.DepnedsOn { + status, found := intervalJob.CommandMap[dep] + if !found { + return fmt.Errorf("dependent job %s not found", dep) + } + if !status.Success() { + allDepsSuccess = false + break + } + } + if allDepsSuccess { + err := command.CreateJob(ctx, k8sClient, intervalJob.JobCR) + if err != nil { + return err + } + } + } + } + return nil +} + +// GenerateMinIOIntervalJobCommand - generate command +func GenerateMinIOIntervalJobCommand(mcCommand string, commandIndex int, dependsOn []string, jobName string, args map[string]string, argsFuncs []FieldsFunc) (*MinIOIntervalJobCommand, error) { + commands := []string{} + files := []MinIOIntervalJobCommandFile{} + for _, argsFunc := range argsFuncs { + jobArg, err := argsFunc(args) + if err != nil { + return nil, err + } + switch jobArg.ArgType { + case ArgTypeKey: + if jobArg.Command != "" { + commands = append(commands, jobArg.Command) + } + case ArgTypeFile: + files = append(files, MinIOIntervalJobCommandFile{ + Name: jobArg.FileName, + Ext: jobArg.FileExt, + Dir: CommandFilePath, + Content: jobArg.FileContext, + }) + commands = append(commands, fmt.Sprintf("%s/%s.%s", CommandFilePath, jobArg.FileName, jobArg.FileExt)) + case ArgTypeKeyFile: + files = append(files, MinIOIntervalJobCommandFile{ + Name: jobArg.FileName, + Ext: jobArg.FileExt, + Dir: CommandFilePath, + Content: jobArg.FileContext, + }) + commands = append(commands, fmt.Sprintf(`%s="%s/%s.%s"`, jobArg.FileName, CommandFilePath, jobArg.FileName, jobArg.FileExt)) + } + + } + jobCommand := &MinIOIntervalJobCommand{ + JobName: jobName, + MCOperation: mcCommand, + Command: strings.Join(commands, " "), + DepnedsOn: dependsOn, + Files: files, + } + // some commands need to have a empty name + if jobCommand.JobName == "" { + jobCommand.JobName = fmt.Sprintf("command-%d", commandIndex) + } + return jobCommand, nil +}