From 6b5cf4ba67f89698786f6ddc6c16662410d20b9e Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Thu, 1 Mar 2018 12:47:20 -0500 Subject: [PATCH] Broker client dao (#795) * initial commit * adding DAO implementation for CRDs * adding CRDs to deploy template * adding CRDs to k8s templates. * working CRD dao implementation. * Implement error checking for IsNotFound error int he dao impl * implement errors returns for conversion methods --- pkg/app/app.go | 35 +- pkg/broker/broker.go | 22 +- pkg/clients/clients.go | 2 + pkg/clients/crd.go | 55 ++ pkg/dao/crd/conversion.go | 485 ++++++++++++++++++ pkg/dao/crd/dao.go | 349 +++++++++++++ pkg/dao/dao.go | 11 +- pkg/dao/etcd/dao.go | 5 + ...eploy-ansible-service-broker.template.yaml | 240 +++++++++ templates/k8s-ansible-service-broker.yaml.j2 | 242 +++++++++ 10 files changed, 1418 insertions(+), 28 deletions(-) create mode 100644 pkg/clients/crd.go create mode 100644 pkg/dao/crd/conversion.go create mode 100644 pkg/dao/crd/dao.go diff --git a/pkg/app/app.go b/pkg/app/app.go index 7ef17e3cb9..a6ccac0725 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -193,7 +193,7 @@ func CreateApp() App { } log.Debug("Connecting Dao") - app.dao, err = dao.NewDao() + app.dao, err = dao.NewDao(app.config) if err != nil { log.Error(err.Error()) os.Exit(1) @@ -390,26 +390,29 @@ func initClients(c *config.Config) error { // method on the app. Forces developers at authorship time to think about // dependencies / make sure things are ready. log.Notice("Initializing clients...") - log.Debug("Trying to connect to etcd") - // Intialize the etcd configuration - clients.InitEtcdConfig(c) - etcdClient, err := clients.Etcd() - if err != nil { - return err - } + if strings.ToLower(c.GetString("dao.type")) != "crd" { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() + log.Debug("Trying to connect to etcd") + // Intialize the etcd configuration + clients.InitEtcdConfig(c) + etcdClient, err := clients.Etcd() + if err != nil { + return err + } - version, err := etcdClient.GetVersion(ctx) - if err != nil { - return err - } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() - log.Infof("Etcd Version [Server: %s, Cluster: %s]", version.Server, version.Cluster) + version, err := etcdClient.GetVersion(ctx) + if err != nil { + return err + } + + log.Infof("Etcd Version [Server: %s, Cluster: %s]", version.Server, version.Cluster) + } - _, err = clients.Kubernetes() + _, err := clients.Kubernetes() if err != nil { return err } diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go index b28ddbbb5d..c947b95403 100644 --- a/pkg/broker/broker.go +++ b/pkg/broker/broker.go @@ -25,7 +25,6 @@ import ( "regexp" "strings" - "github.com/coreos/etcd/client" "github.com/openshift/ansible-service-broker/pkg/apb" "github.com/openshift/ansible-service-broker/pkg/config" "github.com/openshift/ansible-service-broker/pkg/dao" @@ -152,7 +151,7 @@ func NewAnsibleBroker(dao dao.Dao, func (a AnsibleBroker) GetServiceInstance(instanceUUID uuid.UUID) (apb.ServiceInstance, error) { instance, err := a.dao.GetServiceInstance(instanceUUID.String()) if err != nil { - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { log.Infof("Could not find a service instance in dao - %v", err) return apb.ServiceInstance{}, ErrorNotFound } @@ -167,7 +166,7 @@ func (a AnsibleBroker) GetServiceInstance(instanceUUID uuid.UUID) (apb.ServiceIn func (a AnsibleBroker) GetBindInstance(bindUUID uuid.UUID) (apb.BindInstance, error) { instance, err := a.dao.GetBindInstance(bindUUID.String()) if err != nil { - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { return apb.BindInstance{}, ErrorNotFound } return apb.BindInstance{}, err @@ -316,7 +315,7 @@ func (a AnsibleBroker) Recover() (string, error) { recoverStatuses, err := a.dao.FindJobStateByState(apb.StateInProgress) if err != nil { // no jobs or states to recover, this is OK. - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { log.Info("No jobs to recover") return "", nil } @@ -555,7 +554,7 @@ func (a AnsibleBroker) Provision(instanceUUID uuid.UUID, req *ProvisionRequest, specID := req.ServiceID if spec, err = a.dao.GetSpec(specID); err != nil { // etcd return not found i.e. code 100 - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { return nil, ErrorNotFound } // otherwise unknown error bubble it up @@ -730,6 +729,7 @@ func (a AnsibleBroker) isJobInProgress(instance *apb.ServiceInstance, method apb.JobMethod) (bool, string, error) { allJobs, err := a.dao.GetSvcInstJobsByState(instance.ID.String(), apb.StateInProgress) + log.Infof("All Jobs for instance: %v in state: %v - \n%#v", instance.ID, apb.StateInProgress, allJobs) if err != nil { return false, "", err } @@ -756,7 +756,7 @@ func (a AnsibleBroker) GetBind(instance apb.ServiceInstance, bindingUUID uuid.UU bi, err := a.dao.GetBindInstance(bindingUUID.String()) if err != nil { - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { log.Warningf("id: %v - could not find bind instance - %v", bindingUUID, err) return nil, ErrorNotFound } @@ -856,12 +856,12 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, switch { // unknown error - case err != nil && !client.IsKeyNotFound(err): + case err != nil && !a.dao.IsNotFoundError(err): return nil, false, err // If there is a job in "succeeded" state, or no job at all, or // the referenced job no longer exists (we assume it got // cleaned up eventually), assume everything is complete. - case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", client.IsKeyNotFound(err): + case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", a.dao.IsNotFoundError(err): log.Debug("already have this binding instance, returning 200") resp, err := NewBindResponse(provExtCreds, bindExtCreds) if err != nil { @@ -882,7 +882,7 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, // parameters are different log.Info("duplicate binding instance diff params, returning 409 conflict") return nil, false, ErrorDuplicate - } else if !client.IsKeyNotFound(err) { + } else if !a.dao.IsNotFoundError(err) { return nil, false, err } @@ -1175,7 +1175,7 @@ func (a AnsibleBroker) Update(instanceUUID uuid.UUID, req *UpdateRequest, async spec, err := a.dao.GetSpec(si.Spec.ID) if err != nil { // etcd return not found i.e. code 100 - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { return nil, ErrorNotFound } // otherwise unknown error bubble it up @@ -1410,7 +1410,7 @@ func (a AnsibleBroker) AddSpec(spec apb.Spec) (*CatalogResponse, error) { // RemoveSpec - remove the spec specified from the catalog/etcd func (a AnsibleBroker) RemoveSpec(specID string) error { spec, err := a.dao.GetSpec(specID) - if client.IsKeyNotFound(err) { + if a.dao.IsNotFoundError(err) { return ErrorNotFound } if err != nil { diff --git a/pkg/clients/clients.go b/pkg/clients/clients.go index b10ca8959f..d589aaf242 100644 --- a/pkg/clients/clients.go +++ b/pkg/clients/clients.go @@ -29,10 +29,12 @@ var instances struct { Etcd etcd.Client Kubernetes *KubernetesClient Openshift *OpenshiftClient + CRD *CRD } var once struct { Etcd sync.Once Kubernetes sync.Once Openshift sync.Once + CRD sync.Once } diff --git a/pkg/clients/crd.go b/pkg/clients/crd.go new file mode 100644 index 0000000000..6e964eea37 --- /dev/null +++ b/pkg/clients/crd.go @@ -0,0 +1,55 @@ +package clients + +import ( + "errors" + + clientset "github.com/automationbroker/broker-client-go/client/clientset/versioned" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/homedir" +) + +// CRD - Client to interact with automationbroker crd API +type CRD struct { + clientset.Interface +} + +// CRDClient - Create a new kubernetes client if needed, returns reference +func CRDClient() (*CRD, error) { + once.CRD.Do(func() { + client, err := newCRDClient() + if err != nil { + log.Error(err.Error()) + panic(err.Error()) + } + instances.CRD = client + }) + if instances.CRD == nil { + return nil, errors.New("CRDClient client instance is nil") + } + return instances.CRD, nil +} + +func newCRDClient() (*CRD, error) { + // NOTE: Both the external and internal client object are using the same + // clientset library. Internal clientset normally uses a different + // library + clientConfig, err := rest.InClusterConfig() + if err != nil { + log.Warning("Failed to create a InternalClientSet: %v.", err) + + log.Debug("Checking for a local Cluster Config") + clientConfig, err = createClientConfigFromFile(homedir.HomeDir() + "/.kube/config") + if err != nil { + log.Error("Failed to create LocalClientSet") + return nil, err + } + } + + clientset, err := clientset.NewForConfig(clientConfig) + if err != nil { + log.Error("Failed to create LocalClientSet") + return nil, err + } + c := &CRD{clientset} + return c, err +} diff --git a/pkg/dao/crd/conversion.go b/pkg/dao/crd/conversion.go new file mode 100644 index 0000000000..966b4b3b08 --- /dev/null +++ b/pkg/dao/crd/conversion.go @@ -0,0 +1,485 @@ +package dao + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/automationbroker/broker-client-go/pkg/apis/automationbroker.io/v1" + "github.com/openshift/ansible-service-broker/pkg/apb" + "github.com/pborman/uuid" +) + +type arrayErrors []error + +func (a arrayErrors) Error() string { + return fmt.Sprintf("%#v", a) +} + +func specToBundle(spec *apb.Spec) (v1.BundleSpec, error) { + // encode the metadata as string + b, err := json.Marshal(spec.Metadata) + if err != nil { + log.Errorf("unable to marshal the metadata for spec to a json byte array - %v", err) + return v1.BundleSpec{}, err + } + plans := []v1.Plan{} + errs := arrayErrors{} + for _, specPlan := range spec.Plans { + plan, err := convertPlanToCRD(specPlan) + if err != nil { + errs = append(errs, err) + } + plans = append(plans, plan) + } + if len(errs) > 0 { + return v1.BundleSpec{}, errs + } + + return v1.BundleSpec{ + Runtime: spec.Runtime, + Version: spec.Version, + FQName: spec.FQName, + Image: spec.Image, + Tags: spec.Tags, + Bindable: spec.Bindable, + Description: spec.Description, + Async: convertToAsyncType(spec.Async), + Metadata: string(b), + Plans: plans, + }, nil +} + +func convertToAsyncType(s string) v1.AsyncType { + switch s { + case "optional": + return v1.OptionalAsync + case "required": + return v1.RequiredAsync + case "unsupported": + return v1.Unsupported + default: + // Defaulting should never happen but defaulting to + // required because Bundles by default should be run in async + // because they will take time to spin up the new pod. + return v1.RequiredAsync + + } +} + +func convertPlanToCRD(plan apb.Plan) (v1.Plan, error) { + b, err := json.Marshal(plan.Metadata) + if err != nil { + log.Errorf("unable to marshal the metadata for plan to a json byte array - %v", err) + return v1.Plan{}, err + } + + bindParams := []v1.Parameters{} + params := []v1.Parameters{} + errs := arrayErrors{} + for _, p := range plan.Parameters { + param, err := convertParametersToCRD(p) + if err != nil { + errs = append(errs, err) + continue + } + params = append(params, param) + } + + for _, p := range plan.BindParameters { + param, err := convertParametersToCRD(p) + if err != nil { + errs = append(errs, err) + continue + } + bindParams = append(bindParams, param) + } + if len(errs) > 0 { + return v1.Plan{}, err + } + return v1.Plan{ + ID: plan.ID, + Name: plan.Name, + Description: plan.Description, + Metadata: string(b), + Free: plan.Free, + Bindable: plan.Bindable, + UpdatesTo: plan.UpdatesTo, + Parameters: params, + BindParameters: bindParams, + }, nil +} + +func convertParametersToCRD(param apb.ParameterDescriptor) (v1.Parameters, error) { + b, err := json.Marshal(map[string]interface{}{"default": param.Default}) + if err != nil { + log.Errorf("unable to marshal the default for parameter to a json byte array - %v", err) + return v1.Parameters{}, err + } + + var v1Max *v1.NilableNumber + if param.Maximum != nil { + n := v1.NilableNumber(reflect.ValueOf(param.Maximum).Float()) + v1Max = &n + } + var v1exMax *v1.NilableNumber + if param.ExclusiveMaximum != nil { + n := v1.NilableNumber(reflect.ValueOf(param.ExclusiveMaximum).Float()) + v1exMax = &n + } + var v1Min *v1.NilableNumber + if param.Minimum != nil { + n := v1.NilableNumber(reflect.ValueOf(param.Minimum).Float()) + v1Min = &n + } + var v1exMin *v1.NilableNumber + if param.ExclusiveMinimum != nil { + n := v1.NilableNumber(reflect.ValueOf(param.ExclusiveMinimum).Float()) + v1exMin = &n + } + + return v1.Parameters{ + Name: param.Name, + Title: param.Title, + Type: param.Type, + Description: param.Description, + Default: string(b), + DeprecatedMaxLength: param.DeprecatedMaxlength, + MaxLength: param.MaxLength, + MinLength: param.MinLength, + Pattern: param.Pattern, + MultipleOf: param.MultipleOf, + Maximum: v1Max, + ExclusiveMaximum: v1exMax, + ExclusiveMinimum: v1exMin, + Minimum: v1Min, + Enum: param.Enum, + Required: param.Required, + Updatable: param.Updatable, + DisplayType: param.DisplayType, + DisplayGroup: param.DisplayGroup, + }, nil +} + +func convertServiceInstanceToCRD(si *apb.ServiceInstance) (v1.ServiceInstanceSpec, error) { + var b []byte + if si.Parameters != nil { + by, err := json.Marshal(si.Parameters) + if err != nil { + log.Errorf("unable to convert parameters to encoded json byte array -%v", err) + return v1.ServiceInstanceSpec{}, err + } + b = by + } + + bindingIDs := []string{} + for key := range si.BindingIDs { + bindingIDs = append(bindingIDs, key) + } + + return v1.ServiceInstanceSpec{ + BundleID: si.Spec.ID, + Context: v1.Context{ + Namespace: si.Context.Namespace, + Plateform: si.Context.Platform, + }, + Parameters: string(b), + BindingIDs: bindingIDs, + }, nil +} + +func convertServiceBindingToCRD(bi *apb.BindInstance) (v1.ServiceBindingSpec, error) { + var b []byte + if bi.Parameters != nil { + by, err := json.Marshal(bi.Parameters) + if err != nil { + log.Errorf("Unable to marshal parameters to json byte array - %v", err) + return v1.ServiceBindingSpec{}, err + } + b = by + } + return v1.ServiceBindingSpec{ + ServiceInstanceID: bi.ServiceID.String(), + Parameters: string(b), + JobToken: bi.CreateJobKey, + }, nil +} + +func convertJobStateToCRD(js *apb.JobState) (v1.JobStateSpec, error) { + return v1.JobStateSpec{ + State: convertStateToCRD(js.State), + Method: convertJobMethodToCRD(js.Method), + PodName: js.Podname, + Error: js.Error, + Description: js.Description, + }, nil +} + +func convertJobMethodToCRD(j apb.JobMethod) v1.JobMethod { + switch j { + case apb.JobMethodProvision: + return v1.JobMethodProvision + case apb.JobMethodDeprovision: + return v1.JobMethodDeprovision + case apb.JobMethodBind: + return v1.JobMethodBind + case apb.JobMethodUnbind: + return v1.JobMethodUnbind + case apb.JobMethodUpdate: + return v1.JobMethodUpdate + } + log.Errorf("unable to find the job method - %v", j) + // This should never be called as all cases should already be covered. + return v1.JobMethodProvision +} + +func convertStateToCRD(s apb.State) v1.State { + switch s { + case apb.StateNotYetStarted: + return v1.StateNotYetStarted + case apb.StateInProgress: + return v1.StateInProgress + case apb.StateSucceeded: + return v1.StateSucceeded + case apb.StateFailed: + return v1.StateFailed + } + // all cases should be coverd. we should never hit this code path. + log.Errorf("Job state not found: %v", s) + return v1.StateFailed +} + +func bundleToSpec(spec v1.BundleSpec, id string) (*apb.Spec, error) { + // encode the metadata as string + m := map[string]interface{}{} + err := json.Unmarshal([]byte(spec.Metadata), &m) + if err != nil { + log.Errorf("unable to unmarshal the metadata for spec - %v", err) + return &apb.Spec{}, err + } + plans := []apb.Plan{} + errs := arrayErrors{} + for _, specPlan := range spec.Plans { + plan, err := convertPlanToAPB(specPlan) + if err != nil { + errs = append(errs, err) + continue + } + plans = append(plans, plan) + } + + if len(errs) > 0 { + return &apb.Spec{}, errs + } + + return &apb.Spec{ + ID: id, + Runtime: spec.Runtime, + Version: spec.Version, + FQName: spec.FQName, + Image: spec.Image, + Tags: spec.Tags, + Bindable: spec.Bindable, + Description: spec.Description, + Async: convertAsyncTypeToString(spec.Async), + Metadata: m, + Plans: plans, + }, nil +} + +func convertAsyncTypeToString(a v1.AsyncType) string { + switch a { + case v1.OptionalAsync: + return "optional" + case v1.RequiredAsync: + return "required" + case v1.Unsupported: + return "unsupported" + } + log.Errorf("unable to find the async type - %v", a) + return "required" +} + +func convertPlanToAPB(plan v1.Plan) (apb.Plan, error) { + m := map[string]interface{}{} + err := json.Unmarshal([]byte(plan.Metadata), &m) + if err != nil { + log.Errorf("unable to unmarshal the metadata for plan - %v", err) + return apb.Plan{}, err + } + + bindParams := []apb.ParameterDescriptor{} + params := []apb.ParameterDescriptor{} + errs := arrayErrors{} + for _, p := range plan.Parameters { + param, err := convertParametersToAPB(p) + if err != nil { + errs = append(errs, err) + continue + } + params = append(params, param) + } + + for _, p := range plan.BindParameters { + param, err := convertParametersToAPB(p) + if err != nil { + errs = append(errs, err) + continue + } + bindParams = append(bindParams, param) + } + return apb.Plan{ + ID: plan.ID, + Name: plan.Name, + Description: plan.Description, + Metadata: m, + Free: plan.Free, + Bindable: plan.Bindable, + UpdatesTo: plan.UpdatesTo, + Parameters: params, + BindParameters: bindParams, + }, nil +} + +func convertParametersToAPB(param v1.Parameters) (apb.ParameterDescriptor, error) { + m := map[string]interface{}{} + err := json.Unmarshal([]byte(param.Default), &m) + if err != nil { + log.Errorf("unable to unmarshal the default for parameter - %v", err) + return apb.ParameterDescriptor{}, err + } + + b := m["default"] + + var v1Max *apb.NilableNumber + if param.Maximum != nil { + n := apb.NilableNumber(reflect.ValueOf(param.Maximum).Float()) + v1Max = &n + } + var v1exMax *apb.NilableNumber + if param.ExclusiveMaximum != nil { + n := apb.NilableNumber(reflect.ValueOf(param.ExclusiveMaximum).Float()) + v1exMax = &n + } + var v1Min *apb.NilableNumber + if param.Minimum != nil { + n := apb.NilableNumber(reflect.ValueOf(param.Minimum).Float()) + v1Min = &n + } + var v1exMin *apb.NilableNumber + if param.ExclusiveMinimum != nil { + n := apb.NilableNumber(reflect.ValueOf(param.ExclusiveMinimum).Float()) + v1exMin = &n + } + + return apb.ParameterDescriptor{ + Name: param.Name, + Title: param.Title, + Type: param.Type, + Description: param.Description, + Default: b, + DeprecatedMaxlength: param.DeprecatedMaxLength, + MaxLength: param.MaxLength, + MinLength: param.MinLength, + Pattern: param.Pattern, + MultipleOf: param.MultipleOf, + Maximum: v1Max, + ExclusiveMaximum: v1exMax, + ExclusiveMinimum: v1exMin, + Minimum: v1Min, + Enum: param.Enum, + Required: param.Required, + Updatable: param.Updatable, + DisplayType: param.DisplayType, + DisplayGroup: param.DisplayGroup, + }, nil +} + +func convertServiceInstanceToAPB(si v1.ServiceInstanceSpec, spec *apb.Spec, id string) (*apb.ServiceInstance, error) { + parameters := &apb.Parameters{} + if si.Parameters != "" { + err := json.Unmarshal([]byte(si.Parameters), parameters) + if err != nil { + log.Errorf("unable to convert parameters to unmarshaled apb parameters -%v", err) + return &apb.ServiceInstance{}, err + } + } + + bindingIDs := map[string]bool{} + for _, val := range si.BindingIDs { + bindingIDs[val] = true + } + + return &apb.ServiceInstance{ + ID: uuid.Parse(id), + Spec: spec, + Context: &apb.Context{ + Namespace: si.Context.Namespace, + Platform: si.Context.Plateform, + }, + Parameters: parameters, + BindingIDs: bindingIDs, + }, nil +} + +func convertServiceBindingToAPB(bi v1.ServiceBindingSpec, id string) (*apb.BindInstance, error) { + parameters := &apb.Parameters{} + if bi.Parameters != "" { + err := json.Unmarshal([]byte(bi.Parameters), parameters) + if err != nil { + log.Errorf("Unable to unmarshal parameters to apb parameters- %v", err) + return &apb.BindInstance{}, err + } + } + return &apb.BindInstance{ + ID: uuid.Parse(id), + ServiceID: uuid.Parse(bi.ServiceInstanceID), + Parameters: parameters, + CreateJobKey: bi.JobToken, + }, nil +} + +func convertJobStateToAPB(js v1.JobStateSpec, id string) (*apb.JobState, error) { + return &apb.JobState{ + Token: id, + State: convertStateToAPB(js.State), + Method: convertJobMethodToAPB(js.Method), + Podname: js.PodName, + Error: js.Error, + Description: js.Description, + }, nil +} + +func convertJobMethodToAPB(j v1.JobMethod) apb.JobMethod { + switch j { + case v1.JobMethodProvision: + return apb.JobMethodProvision + case v1.JobMethodDeprovision: + return apb.JobMethodDeprovision + case v1.JobMethodBind: + return apb.JobMethodBind + case v1.JobMethodUnbind: + return apb.JobMethodUnbind + case v1.JobMethodUpdate: + return apb.JobMethodUpdate + } + // We should have already covered all the cases above + log.Errorf("Unable to find job method from - %v", j) + return apb.JobMethodProvision +} + +func convertStateToAPB(s v1.State) apb.State { + switch s { + case v1.StateNotYetStarted: + return apb.StateNotYetStarted + case v1.StateInProgress: + return apb.StateInProgress + case v1.StateSucceeded: + return apb.StateSucceeded + case v1.StateFailed: + return apb.StateFailed + } + // We should have already covered all the cases above + log.Errorf("Unable to find job state from - %v", s) + return apb.StateFailed +} diff --git a/pkg/dao/crd/dao.go b/pkg/dao/crd/dao.go new file mode 100644 index 0000000000..69696db772 --- /dev/null +++ b/pkg/dao/crd/dao.go @@ -0,0 +1,349 @@ +// +// Copyright (c) 2018 Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "fmt" + + automationbrokerv1 "github.com/automationbroker/broker-client-go/client/clientset/versioned/typed/automationbroker.io/v1" + v1 "github.com/automationbroker/broker-client-go/pkg/apis/automationbroker.io/v1" + "github.com/openshift/ansible-service-broker/pkg/apb" + "github.com/openshift/ansible-service-broker/pkg/clients" + logutil "github.com/openshift/ansible-service-broker/pkg/util/logging" + "github.com/pborman/uuid" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var log = logutil.NewLog() + +const ( + // instanceLabel for the job state to track which instance created it. + jobStateInstanceLabel string = "instanceId" + jobStateLabel string = "state" +) + +// Dao - object to interface with the data store. +type Dao struct { + client automationbrokerv1.AutomationbrokerV1Interface + namespace string +} + +// NewDao - Create a new Dao object +func NewDao(namespace string) (*Dao, error) { + dao := Dao{namespace: namespace} + + crdClient, err := clients.CRDClient() + if err != nil { + return nil, err + } + dao.client = crdClient.AutomationbrokerV1() + return &dao, nil +} + +// GetSpec - Retrieve the spec from the k8s API. +func (d *Dao) GetSpec(id string) (*apb.Spec, error) { + log.Debugf("get spec: %v", id) + s, err := d.client.Bundles(d.namespace).Get(id, metav1.GetOptions{}) + if err != nil { + log.Errorf("unable to get bundle from k8s api - %v", err) + return nil, err + } + return bundleToSpec(s.Spec, s.GetName()) +} + +// SetSpec - set spec for an id in the kvp API. +func (d *Dao) SetSpec(id string, spec *apb.Spec) error { + log.Debugf("set spec: %v", id) + bundleSpec, err := specToBundle(spec) + if err != nil { + return err + } + b := v1.Bundle{ + ObjectMeta: metav1.ObjectMeta{ + Name: id, + Namespace: d.namespace, + }, + Spec: bundleSpec, + } + _, err = d.client.Bundles(d.namespace).Create(&b) + return err +} + +// DeleteSpec - Delete the spec for a given spec id. +func (d *Dao) DeleteSpec(specID string) error { + log.Debug("Dao::DeleteSpec-> [ %s ]", specID) + return d.client.Bundles(d.namespace).Delete(specID, &metav1.DeleteOptions{}) +} + +// BatchSetSpecs - set specs based on SpecManifest in the kvp API. +func (d *Dao) BatchSetSpecs(specs apb.SpecManifest) error { + for id, spec := range specs { + err := d.SetSpec(id, spec) + if err != nil { + return err + } + } + + return nil +} + +// BatchGetSpecs - Retrieve all the specs for dir. +func (d *Dao) BatchGetSpecs(dir string) ([]*apb.Spec, error) { + log.Debugf("Dao::BatchGetSpecs") + l, err := d.client.Bundles(d.namespace).List(metav1.ListOptions{}) + if err != nil { + log.Errorf("unable to get batch specs - %v", err) + return nil, err + } + specs := []*apb.Spec{} + // capture all the errors and still try to save the correct bundles + errs := arrayErrors{} + for _, b := range l.Items { + spec, err := bundleToSpec(b.Spec, b.GetName()) + if err != nil { + errs = append(errs, err) + continue + } + specs = append(specs, spec) + } + if len(errs) > 0 { + return specs, errs + } + return specs, nil +} + +// BatchDeleteSpecs - set specs based on SpecManifest in the kvp API. +func (d *Dao) BatchDeleteSpecs(specs []*apb.Spec) error { + for _, spec := range specs { + err := d.DeleteSpec(spec.ID) + if err != nil { + return err + } + } + return nil +} + +// GetServiceInstance - Retrieve specific service instance from the kvp API. +func (d *Dao) GetServiceInstance(id string) (*apb.ServiceInstance, error) { + log.Debugf("get service instance: %v", id) + servInstance, err := d.client.ServiceInstances(d.namespace).Get(id, metav1.GetOptions{}) + if err != nil { + return nil, err + } + spec, err := d.GetSpec(servInstance.Spec.BundleID) + if err != nil { + return nil, err + } + return convertServiceInstanceToAPB(servInstance.Spec, spec, servInstance.GetName()) +} + +// SetServiceInstance - Set service instance for an id in the kvp API. +func (d *Dao) SetServiceInstance(id string, serviceInstance *apb.ServiceInstance) error { + log.Debugf("set service instance: %v", id) + spec, err := convertServiceInstanceToCRD(serviceInstance) + if err != nil { + return err + } + if si, err := d.client.ServiceInstances(d.namespace).Get(id, metav1.GetOptions{}); err == nil { + log.Debugf("updating service instance: %v", id) + si.Spec = spec + _, err := d.client.ServiceInstances(d.namespace).Update(si) + if err != nil { + log.Errorf("unable to update service instance - %v", err) + return err + } + return nil + } + s := v1.ServiceInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: id, + Namespace: d.namespace, + }, + Spec: spec, + } + + _, err = d.client.ServiceInstances(d.namespace).Create(&s) + if err != nil { + log.Errorf("unable to save service instance - %v", err) + return err + } + return nil +} + +// DeleteServiceInstance - Delete the service instance for an service instance id. +func (d *Dao) DeleteServiceInstance(id string) error { + log.Debugf("Dao::DeleteServiceInstance -> [ %s ]", id) + return d.client.ServiceInstances(d.namespace).Delete(id, &metav1.DeleteOptions{}) +} + +// GetBindInstance - Retrieve a specific bind instance from the kvp API +func (d *Dao) GetBindInstance(id string) (*apb.BindInstance, error) { + log.Debugf("get binidng instance: %v", id) + bi, err := d.client.ServiceBindings(d.namespace).Get(id, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return convertServiceBindingToAPB(bi.Spec, bi.GetName()) +} + +// SetBindInstance - Set the bind instance for id in the kvp API. +func (d *Dao) SetBindInstance(id string, bindInstance *apb.BindInstance) error { + log.Debugf("set binding instance: %v", id) + b, err := convertServiceBindingToCRD(bindInstance) + if err != nil { + return err + } + bi := v1.ServiceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: id, + Namespace: d.namespace, + }, + Spec: b, + } + _, err = d.client.ServiceBindings(d.namespace).Create(&bi) + if err != nil { + log.Errorf("unable to save service binding - %v", err) + return err + } + return nil +} + +// DeleteBindInstance - Delete the binding instance for an id in the kvp API. +func (d *Dao) DeleteBindInstance(id string) error { + log.Debugf("Dao::DeleteBindInstance -> [ %s ]", id) + err := d.client.ServiceBindings(d.namespace).Delete(id, &metav1.DeleteOptions{}) + return err +} + +// SetState - Set the Job State in the kvp API for id. +func (d *Dao) SetState(instanceID string, state apb.JobState) (string, error) { + log.Debugf("set job state for instance: %v token: %v", instanceID, state.Token) + j, err := convertJobStateToCRD(&state) + if err != nil { + return "", err + } + if js, err := d.client.JobStates(d.namespace).Get(state.Token, metav1.GetOptions{}); err == nil { + js.Spec = j + js.ObjectMeta.Labels[jobStateLabel] = fmt.Sprintf("%v", convertStateToCRD(state.State)) + _, err := d.client.JobStates(d.namespace).Update(js) + if err != nil { + log.Errorf("Unable to update the job state: %v - %v", state.Token, err) + return state.Token, err + } + return state.Token, nil + } + + js := v1.JobState{ + ObjectMeta: metav1.ObjectMeta{ + Name: state.Token, + Namespace: d.namespace, + Labels: map[string]string{jobStateInstanceLabel: instanceID, + jobStateLabel: fmt.Sprintf("%v", convertStateToCRD(state.State)), + }, + }, + Spec: j, + } + + _, err = d.client.JobStates(d.namespace).Create(&js) + if err != nil { + log.Errorf("unable to create the job state - %v", err) + return "", err + } + return state.Token, nil +} + +// GetState - Retrieve a job state from the kvp API for an ID and Token. +func (d *Dao) GetState(id string, token string) (apb.JobState, error) { + js, err := d.client.JobStates(d.namespace).Get(token, metav1.GetOptions{}) + if err != nil { + log.Errorf("unable to get state for token: %v", err) + return apb.JobState{}, err + } + j, err := convertJobStateToAPB(js.Spec, js.GetName()) + if err != nil { + return apb.JobState{}, err + } + return *j, nil +} + +// GetStateByKey - Retrieve a job state from the kvp API for a job key +func (d *Dao) GetStateByKey(key string) (apb.JobState, error) { + return d.GetState("", key) +} + +// FindJobStateByState - Retrieve all the jobs that match the specified state +func (d *Dao) FindJobStateByState(state apb.State) ([]apb.RecoverStatus, error) { + log.Debugf("Dao::FindJobStateByState -> [%v]", state) + jobStates, err := d.client.JobStates(d.namespace).List(metav1.ListOptions{ + LabelSelector: fmt.Sprintf("state=%v", convertStateToCRD(state)), + }) + if err != nil { + log.Errorf("unable to get job states for the state: %v - %v", state, err) + return nil, err + } + + rs := []apb.RecoverStatus{} + errs := arrayErrors{} + for _, js := range jobStates.Items { + j, err := convertJobStateToAPB(js.Spec, js.GetName()) + if err != nil { + errs = append(errs, err) + continue + } + rs = append(rs, apb.RecoverStatus{ + InstanceID: uuid.Parse(js.GetLabels()[jobStateInstanceLabel]), + State: *j, + }) + } + if len(errs) > 0 { + return rs, errs + } + return rs, nil +} + +// GetSvcInstJobsByState - Lookup all jobs of a given state for a specific instance +func (d *Dao) GetSvcInstJobsByState(ID string, state apb.State) ([]apb.JobState, error) { + log.Debugf("Dao::FindJobStateByState -> [%v]", state) + jobStates, err := d.client.JobStates(d.namespace).List(metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%v=%v,%v=%v", jobStateInstanceLabel, ID, jobStateLabel, convertStateToCRD(state)), + }) + if err != nil { + log.Errorf("unable to get job states for the state: %v - %v", state, err) + return nil, err + } + + jss := []apb.JobState{} + errs := arrayErrors{} + for _, js := range jobStates.Items { + job, err := convertJobStateToAPB(js.Spec, js.GetName()) + if err != nil { + errs = append(errs, err) + continue + } + jss = append(jss, *job) + } + if len(errs) > 0 { + return jss, errs + } + return jss, nil +} + +// IsNotFoundError - Will determine if the error is an apimachinary IsNotFound error. +func (d *Dao) IsNotFoundError(err error) bool { + return apierrors.IsNotFound(err) +} diff --git a/pkg/dao/dao.go b/pkg/dao/dao.go index fd1b18af20..744c4502f5 100644 --- a/pkg/dao/dao.go +++ b/pkg/dao/dao.go @@ -18,6 +18,8 @@ package dao import ( "github.com/openshift/ansible-service-broker/pkg/apb" + "github.com/openshift/ansible-service-broker/pkg/config" + crd "github.com/openshift/ansible-service-broker/pkg/dao/crd" etcd "github.com/openshift/ansible-service-broker/pkg/dao/etcd" logutil "github.com/openshift/ansible-service-broker/pkg/util/logging" ) @@ -25,8 +27,12 @@ import ( var log = logutil.NewLog() // NewDao - Create a new Dao object -func NewDao() (Dao, error) { +func NewDao(c *config.Config) (Dao, error) { + if c.GetString("dao.type") == "crd" { + return crd.NewDao(c.GetString("openshift.namespace")) + } return etcd.NewDao() + } // Dao - object to interface with the data store. @@ -81,4 +87,7 @@ type Dao interface { // GetStateByKey - Retrieve a job state from the kvp API for a job key GetStateByKey(key string) (apb.JobState, error) + + // IsNotFoundError - Will determine if the error is a not found error from the DAO implementation. + IsNotFoundError(err error) bool } diff --git a/pkg/dao/etcd/dao.go b/pkg/dao/etcd/dao.go index 5a52a71f10..ce6109174a 100644 --- a/pkg/dao/etcd/dao.go +++ b/pkg/dao/etcd/dao.go @@ -333,6 +333,11 @@ func (d *Dao) GetStateByKey(key string) (apb.JobState, error) { return state, nil } +// IsNotFoundError - Will determine if an error is a key is not found error. +func (d *Dao) IsNotFoundError(err error) bool { + return client.IsKeyNotFound(err) +} + func (d *Dao) getObject(key string, data interface{}) error { raw, err := d.GetRaw(key) if err != nil { diff --git a/templates/deploy-ansible-service-broker.template.yaml b/templates/deploy-ansible-service-broker.template.yaml index 425b8ec081..dabfe9302c 100644 --- a/templates/deploy-ansible-service-broker.template.yaml +++ b/templates/deploy-ansible-service-broker.template.yaml @@ -93,6 +93,10 @@ objects: attributeRestrictions: null resources: ["networkpolicies"] verbs: ["create", "delete"] + - apiGroups: ["automationbroker.io"] + attributeRestrictions: null + resources: ["bundles", "jobstates", "servicebindings", "serviceinstances"] + verbs: ["*"] - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding @@ -403,6 +407,242 @@ objects: ${{BROKER_AUTH}} caBundle: ${BROKER_CA_CERT} +# CRDs for the broker. +- apiVersion: apiextensions.k8s.io/v1beta1 + kind: CustomResourceDefinition + metadata: + name: bundles.automationbroker.io + spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: bundles + singular: bundle + kind: Bundle + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + properties: + spec: + properties: + runtime: + type: integer + minimum: 1 + maximum: 2 + version: + type: string + pattern: '^[\d]+.[\d*]+$' + fqName: + type: string + image: + type: string + tags: + type: array + items: + type: string + bindable: + type: boolean + description: + type: string + metadata: + type: string + async: + type: string + pattern: '^(optional|required|unsupported)$' + plans: + type: array + minItems: 1 + items: + type: object + properties: + id: + type: string + name: + type: string + description: + type: string + metadata: + type: string + free: + type: boolean + bindable: + type: boolean + updatesTo: + type: array + items: + type: string + parameters: + type: array + items: + type: object + properties: + name: + type: string + title: + type: string + type: + type: string + description: + type: string + default: + type: string + deprecateMaxLength: + type: integer + maxLength: + type: integer + minLength: + type: integer + pattern: + type: string + multipleOf: + type: float + maximum: + type: float + exclusiveMaximum: + type: float + minimum: + type: float + exclusiveMinimum: + type: float + enum: + type: array + items: + type: string + required: + type: boolean + updatable: + type: boolean + displayType: + type: string + displayGroup: + type: string + bindParameters: + type: array + properties: + name: + type: string + title: + type: string + type: + type: string + description: + type: string + default: + type: string + deprecateMaxLength: + type: integer + maxLength: + type: integer + minLength: + type: integer + pattern: + type: string + multipleOf: + type: float + maximum: + type: float + exclusiveMaximum: + type: float + minimum: + type: float + exclusiveMinimum: + type: float + enum: + type: array + items: + type: string + required: + type: boolean + updatable: + type: boolean + displayType: + type: string + displayGroup: + type: string +- apiVersion: apiextensions.k8s.io/v1beta1 + kind: CustomResourceDefinition + metadata: + name: jobstates.automationbroker.io + spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: jobstates + singular: jobstate + kind: JobState + validation: + # openAPIV3Schema is the schema for validating custom objects. + # Token is the name of the resource + openAPIV3Schema: + properties: + state: + type: string + podName: + type: string + method: + type: string + error: + type: string + description: + type: string + +- apiVersion: apiextensions.k8s.io/v1beta1 + kind: CustomResourceDefinition + metadata: + name: servicebindings.automationbroker.io + spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: servicebindings + singular: servicebinding + kind: ServiceBinding + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + serviceInstanceId: + type: string + parameters: + type: string + JobToken: + type: string + +- apiVersion: apiextensions.k8s.io/v1beta1 + kind: CustomResourceDefinition + metadata: + name: serviceinstances.automationbroker.io + spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: serviceinstances + singular: serviceinstance + kind: ServiceInstance + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + properties: + spec: + properties: + specId: + type: string + context: + type: object + properties: + plateform: + type: string + namespace: + type: string + parameters: + type: string + bindingIds: + type: array + items: + type: string parameters: - description: Service Catalog API Version. Newer service-catalogs use servicecatalog.k8s.io/v1beta1 displayname: Service Catalog API Version. Newer service-catalogs use servicecatalog.k8s.io/v1beta1 diff --git a/templates/k8s-ansible-service-broker.yaml.j2 b/templates/k8s-ansible-service-broker.yaml.j2 index 406def17be..5f5cd121f6 100644 --- a/templates/k8s-ansible-service-broker.yaml.j2 +++ b/templates/k8s-ansible-service-broker.yaml.j2 @@ -319,3 +319,245 @@ metadata: annotations: kubernetes.io/service-account.name: ansibleservicebroker-client type: kubernetes.io/service-account-token + +# CRDs for the broker. +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: bundles.automationbroker.io +spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: bundles + singular: bundle + kind: Bundle + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + properties: + spec: + properties: + runtime: + type: integer + minimum: 1 + maximum: 2 + version: + type: string + pattern: '^[\d]+.[\d*]+$' + fqName: + type: string + image: + type: string + tags: + type: array + items: + type: string + bindable: + type: boolean + description: + type: string + metadata: + type: string + async: + type: string + pattern: '^(optional|required|unsupported)$' + plans: + type: array + minItems: 1 + items: + type: object + properties: + id: + type: string + name: + type: string + description: + type: string + metadata: + type: string + free: + type: boolean + bindable: + type: boolean + updatesTo: + type: array + items: + type: string + parameters: + type: array + items: + type: object + properties: + name: + type: string + title: + type: string + type: + type: string + description: + type: string + default: + type: string + deprecateMaxLength: + type: integer + maxLength: + type: integer + minLength: + type: integer + pattern: + type: string + multipleOf: + type: float + maximum: + type: float + exclusiveMaximum: + type: float + minimum: + type: float + exclusiveMinimum: + type: float + enum: + type: array + items: + type: string + required: + type: boolean + updatable: + type: boolean + displayType: + type: string + displayGroup: + type: string + bindParameters: + type: object + properties: + name: + type: string + title: + type: string + type: + type: string + description: + type: string + default: + type: object + deprecateMaxLength: + type: integer + maxLength: + type: integer + minLength: + type: integer + pattern: + type: string + multipleOf: + type: float + maximum: + type: float + exclusiveMaximum: + type: float + minimum: + type: float + exclusiveMinimum: + type: float + enum: + type: array + items: + type: string + required: + type: boolean + updatable: + type: boolean + displayType: + type: string + displayGroup: + type: string + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: jobstates.automationbroker.io +spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: jobstates + singular: jobstate + kind: JobState + validation: + # openAPIV3Schema is the schema for validating custom objects. + # Token is the name of the resource + openAPIV3Schema: + properties: + state: + type: string + podName: + type: string + method: + type: string + error: + type: string + description: + type: string + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: servicebindings.automationbroker.io +spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: servicebindings + singular: servicebinding + kind: ServiceBinding + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + serviceInstanceId: + type: string + parameters: + type: string + JobToken: + type: string + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: serviceinstances.automationbroker.io +spec: + group: automationbroker.io + version: v1 + scope: Namespaced + names: + plural: serviceinstances + singular: serviceinstance + kind: ServiceInstance + validation: + # openAPIV3Schema is the schema for validating custom objects. + openAPIV3Schema: + properties: + spec: + properties: + specId: + type: string + context: + type: object + properties: + plateform: + type: string + namespace: + type: string + parameters: + type: string + bindingIds: + type: array + items: + type: string