Skip to content

Commit

Permalink
Broker client dao (openshift#795)
Browse files Browse the repository at this point in the history
* initial commit

* adding DAO implementation for CRDs

* adding CRDs to deploy template

* adding CRDs to k8s templates.

* working CRD dao implementation.

* Implement error checking for IsNotFound error int he dao impl

* implement errors returns for conversion methods
  • Loading branch information
shawn-hurley authored Mar 1, 2018
1 parent 2d57b60 commit 6b5cf4b
Show file tree
Hide file tree
Showing 10 changed files with 1,418 additions and 28 deletions.
35 changes: 19 additions & 16 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func CreateApp() App {
}

log.Debug("Connecting Dao")
app.dao, err = dao.NewDao()
app.dao, err = dao.NewDao(app.config)
if err != nil {
log.Error(err.Error())
os.Exit(1)
Expand Down Expand Up @@ -390,26 +390,29 @@ func initClients(c *config.Config) error {
// method on the app. Forces developers at authorship time to think about
// dependencies / make sure things are ready.
log.Notice("Initializing clients...")
log.Debug("Trying to connect to etcd")

// Intialize the etcd configuration
clients.InitEtcdConfig(c)
etcdClient, err := clients.Etcd()
if err != nil {
return err
}
if strings.ToLower(c.GetString("dao.type")) != "crd" {

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
log.Debug("Trying to connect to etcd")
// Intialize the etcd configuration
clients.InitEtcdConfig(c)
etcdClient, err := clients.Etcd()
if err != nil {
return err
}

version, err := etcdClient.GetVersion(ctx)
if err != nil {
return err
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

log.Infof("Etcd Version [Server: %s, Cluster: %s]", version.Server, version.Cluster)
version, err := etcdClient.GetVersion(ctx)
if err != nil {
return err
}

log.Infof("Etcd Version [Server: %s, Cluster: %s]", version.Server, version.Cluster)
}

_, err = clients.Kubernetes()
_, err := clients.Kubernetes()
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"regexp"
"strings"

"github.com/coreos/etcd/client"
"github.com/openshift/ansible-service-broker/pkg/apb"
"github.com/openshift/ansible-service-broker/pkg/config"
"github.com/openshift/ansible-service-broker/pkg/dao"
Expand Down Expand Up @@ -152,7 +151,7 @@ func NewAnsibleBroker(dao dao.Dao,
func (a AnsibleBroker) GetServiceInstance(instanceUUID uuid.UUID) (apb.ServiceInstance, error) {
instance, err := a.dao.GetServiceInstance(instanceUUID.String())
if err != nil {
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
log.Infof("Could not find a service instance in dao - %v", err)
return apb.ServiceInstance{}, ErrorNotFound
}
Expand All @@ -167,7 +166,7 @@ func (a AnsibleBroker) GetServiceInstance(instanceUUID uuid.UUID) (apb.ServiceIn
func (a AnsibleBroker) GetBindInstance(bindUUID uuid.UUID) (apb.BindInstance, error) {
instance, err := a.dao.GetBindInstance(bindUUID.String())
if err != nil {
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
return apb.BindInstance{}, ErrorNotFound
}
return apb.BindInstance{}, err
Expand Down Expand Up @@ -316,7 +315,7 @@ func (a AnsibleBroker) Recover() (string, error) {
recoverStatuses, err := a.dao.FindJobStateByState(apb.StateInProgress)
if err != nil {
// no jobs or states to recover, this is OK.
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
log.Info("No jobs to recover")
return "", nil
}
Expand Down Expand Up @@ -555,7 +554,7 @@ func (a AnsibleBroker) Provision(instanceUUID uuid.UUID, req *ProvisionRequest,
specID := req.ServiceID
if spec, err = a.dao.GetSpec(specID); err != nil {
// etcd return not found i.e. code 100
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
return nil, ErrorNotFound
}
// otherwise unknown error bubble it up
Expand Down Expand Up @@ -730,6 +729,7 @@ func (a AnsibleBroker) isJobInProgress(instance *apb.ServiceInstance,
method apb.JobMethod) (bool, string, error) {

allJobs, err := a.dao.GetSvcInstJobsByState(instance.ID.String(), apb.StateInProgress)
log.Infof("All Jobs for instance: %v in state: %v - \n%#v", instance.ID, apb.StateInProgress, allJobs)
if err != nil {
return false, "", err
}
Expand All @@ -756,7 +756,7 @@ func (a AnsibleBroker) GetBind(instance apb.ServiceInstance, bindingUUID uuid.UU

bi, err := a.dao.GetBindInstance(bindingUUID.String())
if err != nil {
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
log.Warningf("id: %v - could not find bind instance - %v", bindingUUID, err)
return nil, ErrorNotFound
}
Expand Down Expand Up @@ -856,12 +856,12 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,

switch {
// unknown error
case err != nil && !client.IsKeyNotFound(err):
case err != nil && !a.dao.IsNotFoundError(err):
return nil, false, err
// If there is a job in "succeeded" state, or no job at all, or
// the referenced job no longer exists (we assume it got
// cleaned up eventually), assume everything is complete.
case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", client.IsKeyNotFound(err):
case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", a.dao.IsNotFoundError(err):
log.Debug("already have this binding instance, returning 200")
resp, err := NewBindResponse(provExtCreds, bindExtCreds)
if err != nil {
Expand All @@ -882,7 +882,7 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,
// parameters are different
log.Info("duplicate binding instance diff params, returning 409 conflict")
return nil, false, ErrorDuplicate
} else if !client.IsKeyNotFound(err) {
} else if !a.dao.IsNotFoundError(err) {
return nil, false, err
}

Expand Down Expand Up @@ -1175,7 +1175,7 @@ func (a AnsibleBroker) Update(instanceUUID uuid.UUID, req *UpdateRequest, async
spec, err := a.dao.GetSpec(si.Spec.ID)
if err != nil {
// etcd return not found i.e. code 100
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
return nil, ErrorNotFound
}
// otherwise unknown error bubble it up
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func (a AnsibleBroker) AddSpec(spec apb.Spec) (*CatalogResponse, error) {
// RemoveSpec - remove the spec specified from the catalog/etcd
func (a AnsibleBroker) RemoveSpec(specID string) error {
spec, err := a.dao.GetSpec(specID)
if client.IsKeyNotFound(err) {
if a.dao.IsNotFoundError(err) {
return ErrorNotFound
}
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ var instances struct {
Etcd etcd.Client
Kubernetes *KubernetesClient
Openshift *OpenshiftClient
CRD *CRD
}

var once struct {
Etcd sync.Once
Kubernetes sync.Once
Openshift sync.Once
CRD sync.Once
}
55 changes: 55 additions & 0 deletions pkg/clients/crd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package clients

import (
"errors"

clientset "github.com/automationbroker/broker-client-go/client/clientset/versioned"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/homedir"
)

// CRD - Client to interact with automationbroker crd API
type CRD struct {
clientset.Interface
}

// CRDClient - Create a new kubernetes client if needed, returns reference
func CRDClient() (*CRD, error) {
once.CRD.Do(func() {
client, err := newCRDClient()
if err != nil {
log.Error(err.Error())
panic(err.Error())
}
instances.CRD = client
})
if instances.CRD == nil {
return nil, errors.New("CRDClient client instance is nil")
}
return instances.CRD, nil
}

func newCRDClient() (*CRD, error) {
// NOTE: Both the external and internal client object are using the same
// clientset library. Internal clientset normally uses a different
// library
clientConfig, err := rest.InClusterConfig()
if err != nil {
log.Warning("Failed to create a InternalClientSet: %v.", err)

log.Debug("Checking for a local Cluster Config")
clientConfig, err = createClientConfigFromFile(homedir.HomeDir() + "/.kube/config")
if err != nil {
log.Error("Failed to create LocalClientSet")
return nil, err
}
}

clientset, err := clientset.NewForConfig(clientConfig)
if err != nil {
log.Error("Failed to create LocalClientSet")
return nil, err
}
c := &CRD{clientset}
return c, err
}
Loading

0 comments on commit 6b5cf4b

Please sign in to comment.