diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go index 25ebc390926..e6dae29d639 100644 --- a/backend/src/v2/component/importer_launcher.go +++ b/backend/src/v2/component/importer_launcher.go @@ -111,7 +111,7 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) { }() // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "") + pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "", "") if err != nil { return err } diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index 92a951c1c12..78749471819 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -160,7 +160,8 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { if err != nil { return err } - bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.options.Namespace, bucketConfig) + bucketSessionInfo := execution.GetPipeline().GetPipelineBucketSession() + bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.options.Namespace, bucketConfig, bucketSessionInfo) if err != nil { return err } @@ -539,7 +540,7 @@ func fetchNonDefaultBuckets( if err != nil { return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) } - nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig) + nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig, "") if err != nil { return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) } diff --git a/backend/src/v2/config/env.go b/backend/src/v2/config/env.go index 3eefcd382e3..6ff13368b07 100644 --- a/backend/src/v2/config/env.go +++ b/backend/src/v2/config/env.go @@ -19,7 +19,9 @@ package config import ( "context" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "io/ioutil" + "sigs.k8s.io/yaml" "strings" "github.com/golang/glog" @@ -82,3 +84,178 @@ func InPodName() (string, error) { name := string(podName) return strings.TrimSuffix(name, "\n"), nil } + +const ( + configBucketProviders = "providers" + // The endpoint uses Kubernetes service DNS name with namespace: + // https://kubernetes.io/docs/concepts/services-networking/service/#dns + defaultMinioEndpointInMultiUserMode = "minio-service.kubeflow:9000" + minioArtifactSecretName = "mlpipeline-minio-artifact" + minioArtifactSecretKeyKey = "secretkey" + minioArtifactAccessKeyKey = "accesskey" +) + +type BucketProviders struct { + Minio *ProviderConfig `json:"minio"` + S3 *ProviderConfig `json:"s3"` + GCS *ProviderConfig `json:"gcs"` +} + +type ProviderConfig struct { + Endpoint string `json:"endpoint"` + DefaultProviderSecretRef *SecretRef `json:"defaultProviderSecretRef"` + Region string `json:"region"` + // optional + DisableSSL bool `json:"disableSSL"` + // optional, ordered, the auth config for the first matching prefix is used + AuthConfigs []AuthConfig `json:"authConfigs"` +} + +type AuthConfig struct { + BucketName string `json:"bucketName"` + KeyPrefix string `json:"keyPrefix"` + *SecretRef `json:"secretRef"` +} + +type SecretRef struct { + SecretName string `json:"secretName"` + AccessKeyKey string `json:"accessKeyKey"` + SecretKeyKey string `json:"secretKeyKey"` +} + +func (c *Config) GetBucketSessionInfo() (objectstore.SessionInfo, error) { + path := c.DefaultPipelineRoot() + bucketConfig, err := objectstore.ParseBucketConfig(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + bucketName := bucketConfig.BucketName + bucketPrefix := bucketConfig.Prefix + provider := strings.TrimSuffix(bucketConfig.Scheme, "://") + bucketProviders, err := c.getBucketProviders() + if err != nil { + return objectstore.SessionInfo{}, err + } + + // Case 1: No "providers" field in kfp-launcher + if bucketProviders == nil { + // Use default minio if provider is minio, otherwise we default to executor env + if provider == "minio" { + return getDefaultMinioSessionInfo(), nil + } else { + // If not using minio, and no other provider config is provided + // rely on executor env (e.g. IRSA) for authenticating with provider + return objectstore.SessionInfo{}, nil + } + } + + var providerConfig *ProviderConfig + switch provider { + case "minio": + providerConfig = bucketProviders.Minio + break + case "s3": + providerConfig = bucketProviders.S3 + break + case "gs": + providerConfig = bucketProviders.Minio + break + default: + return objectstore.SessionInfo{}, fmt.Errorf("Encountered unsupported provider in BucketProviders %s", provider) + } + + // Case 2: "providers" field is empty {} + if providerConfig == nil { + if provider == "minio" { + return getDefaultMinioSessionInfo(), nil + } else { + return objectstore.SessionInfo{}, nil + } + } + + // Case 3: a provider is specified + endpoint := providerConfig.Endpoint + if endpoint == "" { + if provider == "minio" { + endpoint = objectstore.MinioDefaultEndpoint() + } else { + return objectstore.SessionInfo{}, fmt.Errorf("Invalid provider config, %s.defaultProviderSecretRef is required for this storage provider", provider) + } + } + + // DefaultProviderSecretRef takes precedent over other configs + secretRef := providerConfig.DefaultProviderSecretRef + if secretRef == nil { + if provider == "minio" { + secretRef = &SecretRef{ + SecretName: minioArtifactSecretName, + SecretKeyKey: minioArtifactSecretKeyKey, + AccessKeyKey: minioArtifactAccessKeyKey, + } + } else { + return objectstore.SessionInfo{}, fmt.Errorf("Invalid provider config, %s.defaultProviderSecretRef is required for this storage provider", provider) + } + } + + // if not provided, defaults to false + disableSSL := providerConfig.DisableSSL + + region := providerConfig.Region + if region == "" { + return objectstore.SessionInfo{}, fmt.Errorf("Invalid provider config, missing provider region") + } + + // if another secret is specified for a given bucket/prefix then that takes + // higher precedent over DefaultProviderSecretRef + authConfig := getBucketAuthByPrefix(providerConfig.AuthConfigs, bucketName, bucketPrefix) + if authConfig != nil { + if authConfig.SecretRef == nil || authConfig.SecretRef.SecretKeyKey == "" || authConfig.SecretRef.AccessKeyKey == "" || authConfig.SecretRef.SecretName == "" { + return objectstore.SessionInfo{}, fmt.Errorf("Invalid provider config, %s.AuthConfigs[].secretConfig is missing or invalid", provider) + } + secretRef = authConfig.SecretRef + } + + return objectstore.SessionInfo{ + Region: region, + Endpoint: endpoint, + DisableSSL: disableSSL, + SecretName: secretRef.SecretName, + AccessKeyKey: secretRef.AccessKeyKey, + SecretKeyKey: secretRef.SecretKeyKey, + }, nil +} + +func getDefaultMinioSessionInfo() (sessionInfo objectstore.SessionInfo) { + sess := objectstore.SessionInfo{ + Region: "minio", + Endpoint: objectstore.MinioDefaultEndpoint(), + DisableSSL: true, + SecretName: minioArtifactSecretName, + AccessKeyKey: minioArtifactAccessKeyKey, + SecretKeyKey: minioArtifactSecretKeyKey, + } + return sess +} + +// getBucketProviders gets the provider configuration +func (c *Config) getBucketProviders() (*BucketProviders, error) { + if c == nil || c.data[configBucketProviders] == "" { + return nil, nil + } + bucketProviders := &BucketProviders{} + configAuth := c.data[configBucketProviders] + err := yaml.Unmarshal([]byte(configAuth), bucketProviders) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall kfp bucket providers, ensure that providers config is well formed: %w", err) + } + return bucketProviders, nil +} + +func getBucketAuthByPrefix(authConfigs []AuthConfig, bucketName, prefix string) *AuthConfig { + for _, authConfig := range authConfigs { + if authConfig.BucketName == bucketName && (authConfig.KeyPrefix == prefix) { + return &authConfig + } + } + return nil +} diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index d227855ca32..6cccc1a1f21 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "path" "strconv" "strings" @@ -132,6 +133,7 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } // TODO(v2): in pipeline spec, rename GCS output directory to pipeline root. pipelineRoot := opts.RuntimeConfig.GetGcsOutputDirectory() + pipelineBucketSessionInfo := objectstore.SessionInfo{} if pipelineRoot != "" { glog.Infof("PipelineRoot=%q", pipelineRoot) } else { @@ -149,9 +151,18 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } pipelineRoot = cfg.DefaultPipelineRoot() glog.Infof("PipelineRoot=%q from default config", pipelineRoot) + pipelineBucketSessionInfo, err = cfg.GetBucketSessionInfo() + if err != nil { + return nil, err + } + } + bucketSessionInfo, err := json.Marshal(pipelineBucketSessionInfo) + if err != nil { + return nil, err } + bucketSessionInfoEntry := string(bucketSessionInfo) // TODO(Bobgy): fill in run resource. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot) + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot, bucketSessionInfoEntry) if err != nil { return nil, err } @@ -230,7 +241,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl } // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return nil, err } @@ -535,7 +546,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return nil, err } @@ -1206,7 +1217,7 @@ func createPVC( // Create execution regardless the operation succeeds or not defer func() { if createdExecution == nil { - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return } @@ -1286,7 +1297,7 @@ func createPVC( ecfg.CachedMLMDExecutionID = cachedMLMDExecutionID ecfg.FingerPrint = fingerPrint - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("error getting pipeline from MLMD: %w", err) } @@ -1376,7 +1387,7 @@ func deletePVC( // Create execution regardless the operation succeeds or not defer func() { if createdExecution == nil { - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return } @@ -1406,7 +1417,7 @@ func deletePVC( ecfg.CachedMLMDExecutionID = cachedMLMDExecutionID ecfg.FingerPrint = fingerPrint - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return createdExecution, pb.Execution_FAILED, fmt.Errorf("error getting pipeline from MLMD: %w", err) } diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index adfca087668..e55794071bd 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -77,7 +77,7 @@ var ( ) type ClientInterface interface { - GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) + GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, bucketSessionInfo string) (*Pipeline, error) GetDAG(ctx context.Context, executionID int64) (*DAG, error) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error) @@ -200,6 +200,18 @@ func (p *Pipeline) GetCtxID() int64 { return p.pipelineCtx.GetId() } +func (p *Pipeline) GetPipelineBucketSession() string { + if p == nil { + return "" + } + props := p.pipelineRunCtx.GetCustomProperties() + bucketSessionInfo, ok := props[keySessionInfoDetails] + if !ok { + return "" + } + return bucketSessionInfo.GetStringValue() +} + func (p *Pipeline) GetPipelineRoot() string { if p == nil { return "" @@ -262,7 +274,7 @@ func (e *Execution) FingerPrint() string { // GetPipeline returns the current pipeline represented by the specified // pipeline name and run ID. -func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) { +func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, bucketSessionInfo string) (*Pipeline, error) { pipelineContext, err := c.getOrInsertContext(ctx, pipelineName, pipelineContextType, nil) if err != nil { return nil, err @@ -272,7 +284,8 @@ func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace keyNamespace: stringValue(namespace), keyResourceName: stringValue(runResource), // pipeline root of this run - keyPipelineRoot: stringValue(strings.TrimRight(pipelineRoot, "/") + "/" + path.Join(pipelineName, runID)), + keyPipelineRoot: stringValue(strings.TrimRight(pipelineRoot, "/") + "/" + path.Join(pipelineName, runID)), + keySessionInfoDetails: stringValue(bucketSessionInfo), } runContext, err := c.getOrInsertContext(ctx, runID, pipelineRunContextType, metadata) glog.Infof("Pipeline Run Context: %+v", runContext) @@ -464,21 +477,22 @@ func (c *Client) PublishExecution(ctx context.Context, execution *Execution, out // metadata keys const ( - keyDisplayName = "display_name" - keyTaskName = "task_name" - keyImage = "image" - keyPodName = "pod_name" - keyPodUID = "pod_uid" - keyNamespace = "namespace" - keyResourceName = "resource_name" - keyPipelineRoot = "pipeline_root" - keyCacheFingerPrint = "cache_fingerprint" - keyCachedExecutionID = "cached_execution_id" - keyInputs = "inputs" - keyOutputs = "outputs" - keyParentDagID = "parent_dag_id" // Parent DAG Execution ID. - keyIterationIndex = "iteration_index" - keyIterationCount = "iteration_count" + keyDisplayName = "display_name" + keyTaskName = "task_name" + keyImage = "image" + keyPodName = "pod_name" + keyPodUID = "pod_uid" + keyNamespace = "namespace" + keyResourceName = "resource_name" + keyPipelineRoot = "pipeline_root" + keySessionInfoDetails = "bucket_session_info" + keyCacheFingerPrint = "cache_fingerprint" + keyCachedExecutionID = "cached_execution_id" + keyInputs = "inputs" + keyOutputs = "outputs" + keyParentDagID = "parent_dag_id" // Parent DAG Execution ID. + keyIterationIndex = "iteration_index" + keyIterationCount = "iteration_count" ) // CreateExecution creates a new MLMD execution under the specified Pipeline. diff --git a/backend/src/v2/metadata/client_fake.go b/backend/src/v2/metadata/client_fake.go index c2887832d83..fe2773ebd52 100644 --- a/backend/src/v2/metadata/client_fake.go +++ b/backend/src/v2/metadata/client_fake.go @@ -32,7 +32,7 @@ func NewFakeClient() *FakeClient { return &FakeClient{} } -func (c *FakeClient) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) { +func (c *FakeClient) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string, bucketSessionInfo string) (*Pipeline, error) { return nil, nil } diff --git a/backend/src/v2/metadata/client_test.go b/backend/src/v2/metadata/client_test.go index d384ab20aac..ea3bf34dde1 100644 --- a/backend/src/v2/metadata/client_test.go +++ b/backend/src/v2/metadata/client_test.go @@ -89,7 +89,7 @@ func Test_GetPipeline(t *testing.T) { mlmdClient, err := NewTestMlmdClient() fatalIf(err) - pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) expectPipelineRoot := fmt.Sprintf("%s/get-pipeline-test/%s", pipelineRoot, runId) if pipeline.GetPipelineRoot() != expectPipelineRoot { @@ -138,12 +138,12 @@ func Test_GetPipeline_Twice(t *testing.T) { client, err := metadata.NewClient(testMlmdServerAddress, testMlmdServerPort) fatalIf(err) - pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) // The second call to GetPipeline won't fail because it avoid inserting to MLMD again. - samePipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + samePipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) - if (pipeline.GetCtxID() != samePipeline.GetCtxID()) { + if pipeline.GetCtxID() != samePipeline.GetCtxID() { t.Errorf("Expect pipeline context ID %d, actual is %d", pipeline.GetCtxID(), samePipeline.GetCtxID()) } } @@ -159,7 +159,7 @@ func Test_GetPipelineFromExecution(t *testing.T) { } client := newLocalClientOrFatal(t) ctx := context.Background() - pipeline, err := client.GetPipeline(ctx, "get-pipeline-from-execution", newUUIDOrFatal(t), "kubeflow", "workflow/abc", "gs://my-bucket/root") + pipeline, err := client.GetPipeline(ctx, "get-pipeline-from-execution", newUUIDOrFatal(t), "kubeflow", "workflow/abc", "gs://my-bucket/root", "") fatalIf(err) execution, err := client.CreateExecution(ctx, pipeline, &metadata.ExecutionConfig{ TaskName: "task1", @@ -193,7 +193,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot, "") if err != nil { t.Error(err) } @@ -205,7 +205,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot, "") if err != nil { t.Error(err) } @@ -220,7 +220,7 @@ func Test_DAG(t *testing.T) { client := newLocalClientOrFatal(t) ctx := context.Background() // These parameters do not matter. - pipeline, err := client.GetPipeline(ctx, "pipeline-name", newUUIDOrFatal(t), "ns1", "workflow/pipeline-1234", pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "pipeline-name", newUUIDOrFatal(t), "ns1", "workflow/pipeline-1234", pipelineRoot, "") if err != nil { t.Fatal(err) } diff --git a/backend/src/v2/objectstore/object_store.go b/backend/src/v2/objectstore/object_store.go index 72fe2a52e4a..c2fe39b1efc 100644 --- a/backend/src/v2/objectstore/object_store.go +++ b/backend/src/v2/objectstore/object_store.go @@ -17,6 +17,7 @@ package objectstore import ( "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -44,35 +45,24 @@ type Config struct { QueryString string } -func OpenBucket(ctx context.Context, k8sClient kubernetes.Interface, namespace string, config *Config) (bucket *blob.Bucket, err error) { +func OpenBucket(ctx context.Context, k8sClient kubernetes.Interface, namespace string, config *Config, bucketSessionInfo string) (bucket *blob.Bucket, err error) { defer func() { if err != nil { err = fmt.Errorf("Failed to open bucket %q: %w", config.BucketName, err) } }() - if config.Scheme == "minio://" { - cred, err := getMinioCredential(ctx, k8sClient, namespace) - if err != nil { - return nil, fmt.Errorf("Failed to get minio credential: %w", err) - } - sess, err := session.NewSession(&aws.Config{ - Credentials: cred, - Region: aws.String("minio"), - Endpoint: aws.String(MinioDefaultEndpoint()), - DisableSSL: aws.Bool(true), - S3ForcePathStyle: aws.Bool(true), - }) - - if err != nil { - return nil, fmt.Errorf("Failed to create session to access minio: %v", err) - } - minioBucket, err := s3blob.OpenBucket(ctx, sess, config.BucketName, nil) + sess, err := createBucketSession(ctx, namespace, bucketSessionInfo, k8sClient) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve credentials for bucket %s: %w", config.BucketName, err) + } + if sess != nil { + openedBucket, err := s3blob.OpenBucket(ctx, sess, config.BucketName, nil) if err != nil { return nil, err } // Directly calling s3blob.OpenBucket does not allow overriding prefix via bucketConfig.BucketURL(). // Therefore, we need to explicitly configure the prefixed bucket. - return blob.PrefixedBucket(minioBucket, config.Prefix), nil + return blob.PrefixedBucket(openedBucket, config.Prefix), nil } return blob.OpenBucket(ctx, config.bucketURL()) @@ -340,3 +330,69 @@ func getMinioCredential(ctx context.Context, clientSet kubernetes.Interface, nam func getAWSCredential() (cred *credentials.Credentials, err error) { return credentials.NewCredentials(&credentials.ChainProvider{}), nil } + +type SessionInfo struct { + Region string + Endpoint string + DisableSSL bool + SecretName string + AccessKeyKey string + SecretKeyKey string +} + +func createBucketSession(ctx context.Context, namespace string, sessionInfoJSON string, client kubernetes.Interface) (*session.Session, error) { + if sessionInfoJSON == "" { + return nil, nil + } + sessionInfo := &SessionInfo{} + err := json.Unmarshal([]byte(sessionInfoJSON), sessionInfo) + if err != nil { + return nil, fmt.Errorf("Encountered error when attempting to unmarshall bucket session properties: %w", err) + } + creds, err := getBucketCredential(ctx, client, namespace, sessionInfo.SecretName, sessionInfo.SecretKeyKey, sessionInfo.AccessKeyKey) + if err != nil { + return nil, err + } + sess, err := session.NewSession(&aws.Config{ + Credentials: creds, + Region: aws.String(sessionInfo.Region), + Endpoint: aws.String(sessionInfo.Endpoint), + DisableSSL: aws.Bool(sessionInfo.DisableSSL), + S3ForcePathStyle: aws.Bool(true), + }) + if err != nil { + return nil, fmt.Errorf("Failed to create session to access minio: %v", err) + } + return sess, nil +} + +func getBucketCredential( + ctx context.Context, + clientSet kubernetes.Interface, + namespace string, + secretName string, + bucketSecretKeyKey string, + bucketAccessKeyKey string, +) (cred *credentials.Credentials, err error) { + defer func() { + if err != nil { + // wrap error before returning + err = fmt.Errorf("Failed to get Bucket credentials from secret name=%q namespace=%q: %w", secretName, namespace, err) + } + }() + secret, err := clientSet.CoreV1().Secrets(namespace).Get( + ctx, + secretName, + metav1.GetOptions{}) + if err != nil { + return nil, err + } + accessKey := string(secret.Data[bucketAccessKeyKey]) + secretKey := string(secret.Data[bucketSecretKeyKey]) + + if accessKey != "" && secretKey != "" { + cred = credentials.NewStaticCredentials(accessKey, secretKey, "") + return cred, err + } + return nil, fmt.Errorf("could not find specified keys '%s' or '%s'", bucketAccessKeyKey, bucketSecretKeyKey) +}