diff --git a/pkg/apb/executor.go b/pkg/apb/executor.go index fac5a47f50..fe04a36d04 100644 --- a/pkg/apb/executor.go +++ b/pkg/apb/executor.go @@ -207,10 +207,10 @@ func createExtraVars(context *Context, parameters *Parameters) (string, error) { } if context != nil { - paramsCopy["namespace"] = context.Namespace + paramsCopy[NamespaceKey] = context.Namespace } - paramsCopy["cluster"] = runtime.Provider.GetRuntime() + paramsCopy[ClusterKey] = runtime.Provider.GetRuntime() extraVars, err := json.Marshal(paramsCopy) return string(extraVars), err } diff --git a/pkg/apb/types.go b/pkg/apb/types.go index 76263e0d65..7a87176281 100644 --- a/pkg/apb/types.go +++ b/pkg/apb/types.go @@ -18,6 +18,7 @@ package apb import ( "encoding/json" + "reflect" "github.com/openshift/ansible-service-broker/pkg/config" logutil "github.com/openshift/ansible-service-broker/pkg/util/logging" @@ -193,6 +194,15 @@ const ( // ApbContainerName - The name of the apb container ApbContainerName = "apb" + + // ProvisionCredentialsKey parameter name passed to APBs + ProvisionCredentialsKey = "_apb_provision_creds" + // BindCredentialsKey parameter name passed to APBs + BindCredentialsKey = "_apb_bind_creds" + // ClusterKey parameter name passed to APBs + ClusterKey = "cluster" + // NamespaceKey parameter name passed to APBs + NamespaceKey = "namespace" ) // SpecLogDump - log spec for debug @@ -274,9 +284,38 @@ func (si *ServiceInstance) RemoveBinding(bindingUUID uuid.UUID) { // BindInstance - Binding Instance describes a completed binding type BindInstance struct { - ID uuid.UUID `json:"id"` - ServiceID uuid.UUID `json:"service_id"` - Parameters *Parameters `json:"parameters"` + ID uuid.UUID `json:"id"` + ServiceID uuid.UUID `json:"service_id"` + Parameters *Parameters `json:"parameters"` + CreateJobKey string +} + +// UserParameters - returns the Parameters field with any keys and values +// removed that are typically added by the broker itself. The return value +// should represent what a OSB API client provides in a bind request. +func (bi *BindInstance) UserParameters() Parameters { + if bi.Parameters == nil { + return nil + } + userparams := make(Parameters) + for key, value := range *bi.Parameters { + switch key { + // Do not copy keys that are generally added by the broker itself. + case ClusterKey, NamespaceKey, ProvisionCredentialsKey: + continue + } + userparams[key] = value + } + return userparams +} + +// IsEqual - Determines if two BindInstances are equal, omitting any Parameters +// that generally get added by the broker itself. +func (bi *BindInstance) IsEqual(newbi *BindInstance) bool { + if !uuid.Equal(bi.ID, newbi.ID) || !uuid.Equal(bi.ServiceID, newbi.ServiceID) { + return false + } + return reflect.DeepEqual(bi.UserParameters(), newbi.UserParameters()) } // LoadJSON - Generic function to unmarshal json diff --git a/pkg/apb/types_test.go b/pkg/apb/types_test.go index b917490ade..7453f246eb 100644 --- a/pkg/apb/types_test.go +++ b/pkg/apb/types_test.go @@ -24,6 +24,7 @@ import ( "testing" ft "github.com/openshift/ansible-service-broker/pkg/fusortest" + "github.com/pborman/uuid" yaml "gopkg.in/yaml.v2" ) @@ -239,3 +240,92 @@ func TestEncodedParameters(t *testing.T) { ft.AssertEqual(t, sitelang.Pattern, "") ft.AssertEqual(t, len(sitelang.Enum), 0) } + +func TestBindInstanceUserParamsNil(t *testing.T) { + a := BindInstance{ + ID: uuid.NewUUID(), + ServiceID: uuid.NewUUID(), + } + up := a.UserParameters() + ft.AssertTrue(t, up == nil) +} + +func TestBindInstanceUserParams(t *testing.T) { + a := BindInstance{ + ID: uuid.NewUUID(), + ServiceID: uuid.NewUUID(), + } + a.Parameters = &Parameters{ + "foo": "bar", + "cluster": "mycluster", + "namespace": "mynamespace", + "_apb_provision_creds": "letmein", + } + + up := a.UserParameters() + + // Make sure the "foo" key is still included + ft.AssertTrue(t, up["foo"] == "bar") + + // Make sure all of these got filtered out + for _, key := range []string{"cluster", "namespace", "_apb_provision_creds"} { + _, ok := up[key] + ft.AssertFalse(t, ok) + } + +} + +func TestBindInstanceEqual(t *testing.T) { + a := BindInstance{ + ID: uuid.NewUUID(), + ServiceID: uuid.NewUUID(), + Parameters: &Parameters{"foo": "bar"}, + } + b := BindInstance{ + ID: a.ID, + ServiceID: a.ServiceID, + Parameters: &Parameters{"foo": "bar"}, + } + ft.AssertTrue(t, a.IsEqual(&b)) + ft.AssertTrue(t, b.IsEqual(&a)) +} + +func TestBindInstanceNotEqual(t *testing.T) { + + a := BindInstance{ + ID: uuid.Parse(uuid.New()), + ServiceID: uuid.Parse(uuid.New()), + Parameters: &Parameters{"foo": "bar"}, + } + + data := map[string]BindInstance{ + "different parameters": BindInstance{ + ID: a.ID, + ServiceID: a.ServiceID, + Parameters: &Parameters{"foo": "notbar"}, + }, + "different ID": BindInstance{ + ID: uuid.Parse(uuid.New()), + ServiceID: a.ServiceID, + Parameters: &Parameters{"foo": "bar"}, + }, + "different ServiceID": BindInstance{ + ID: a.ID, + ServiceID: uuid.Parse(uuid.New()), + Parameters: &Parameters{"foo": "bar"}, + }, + "no parameters": BindInstance{ + ID: a.ID, + ServiceID: a.ServiceID, + }, + } + + for key, binding := range data { + if a.IsEqual(&binding) { + t.Errorf("bindings were equal for case: %s", key) + } + if binding.IsEqual(&a) { + t.Errorf("bindings were equal for case: %s", key) + } + } +} diff --git a/pkg/broker/binding_subscriber.go b/pkg/broker/binding_subscriber.go index 75f499813a..6f74c1cf2a 100644 --- a/pkg/broker/binding_subscriber.go +++ b/pkg/broker/binding_subscriber.go @@ -48,7 +48,7 @@ func (b *BindingWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { log.Errorf("failed to set extracted credentials after bind %v", err) } } - if err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil { + if _, err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil { log.Errorf("failed to set state after provision %v", err) } } diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go index fa4c584623..88ebaa5ec9 100644 --- a/pkg/broker/broker.go +++ b/pkg/broker/broker.go @@ -68,10 +68,6 @@ var ( ) const ( - // provisionCredentialsKey - Key used to pass credentials to apb. - provisionCredentialsKey = "_apb_provision_creds" - // bindCredentialsKey - Key used to pas bind credentials to apb. - bindCredentialsKey = "_apb_bind_creds" // fqNameRegex - regular expression used when forming FQName. fqNameRegex = "[/.:-]" ) @@ -823,6 +819,7 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, if err != nil { // etcd return not found i.e. code 100 if client.IsKeyNotFound(err) { + log.Debug("Plan not found") return nil, false, ErrorNotFound } // otherwise unknown error bubble it up @@ -864,36 +861,60 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, log.Warningf("unable to retrieve provision time credentials - %v", err) return nil, false, err } - if bi, err := a.dao.GetBindInstance(bindingUUID.String()); err == nil { - if uuid.Equal(bi.ID, bindingInstance.ID) { - if reflect.DeepEqual(bi.Parameters, bindingInstance.Parameters) { - bindExtCreds, err := a.dao.GetExtractedCredentials(bi.ID.String()) - if err != nil && !client.IsKeyNotFound(err) { - return nil, false, err - } + + if existingBI, err := a.dao.GetBindInstance(bindingUUID.String()); err == nil { + if existingBI.IsEqual(bindingInstance) { + bindExtCreds, err := a.dao.GetExtractedCredentials(existingBI.ID.String()) + // It's ok if there aren't any bind credentials yet. + if err != nil && !client.IsKeyNotFound(err) { + return nil, false, err + } + var createJob apb.JobState + if existingBI.CreateJobKey != "" { + createJob, err = a.dao.GetStateByKey(existingBI.CreateJobKey) + } + + switch { + // unknown error + case err != nil && !client.IsKeyNotFound(err): + return nil, false, err + // If there is a job in "succeeded" state, or no job at all, or + // the referenced job no longer exists (we assume it got + // cleaned up eventually), assume everything is complete. + case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", client.IsKeyNotFound(err): log.Debug("already have this binding instance, returning 200") - // since we have this already, we can set async to false resp, err := NewBindResponse(provExtCreds, bindExtCreds) if err != nil { return nil, false, err } return resp, false, ErrorBindingExists + // If there is a job in any other state, send client through async flow. + case len(createJob.State) > 0: + return &BindResponse{Operation: createJob.Token}, true, nil + // This should not happen unless there is bad data in the data store. + default: + err = errors.New("found a JobState with no value for field State") + log.Error(err.Error()) + return nil, false, err } - - // parameters are different - log.Info("duplicate binding instance diff params, returning 409 conflict") - return nil, false, ErrorDuplicate } + + // parameters are different + log.Info("duplicate binding instance diff params, returning 409 conflict") + return nil, false, ErrorDuplicate + } else if !client.IsKeyNotFound(err) { + return nil, false, err } + // No existing BindInstance was found above, so proceed with saving this one if err := a.dao.SetBindInstance(bindingUUID.String(), bindingInstance); err != nil { return nil, false, err } - // Add the DB Credentials this will allow the apb to use these credentials + // Add the DB Credentials. This will allow the apb to use these credentials // if it so chooses. if provExtCreds != nil { - params[provisionCredentialsKey] = provExtCreds.Credentials + params[apb.ProvisionCredentialsKey] = provExtCreds.Credentials } // NOTE: We are currently disabling running an APB on bind via @@ -915,6 +936,20 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, log.Error("Failed to start new job for async binding\n%s", err.Error()) return nil, false, err } + + stateKey, err := a.dao.SetState(instance.ID.String(), apb.JobState{ + Token: token, + State: apb.StateInProgress, + Method: apb.JobMethodBind, + }) + if err != nil { + log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error()) + return nil, false, err + } + bindingInstance.CreateJobKey = stateKey + if err := a.dao.SetBindInstance(bindingUUID.String(), bindingInstance); err != nil { + return nil, false, err + } return &BindResponse{Operation: token}, true, nil } else if a.brokerConfig.LaunchApbOnBind { // we are synchronous mode @@ -974,10 +1009,10 @@ func (a AnsibleBroker) Unbind( instance.ID, bindInstance.ID) } if provExtCreds != nil { - params[provisionCredentialsKey] = provExtCreds.Credentials + params[apb.ProvisionCredentialsKey] = provExtCreds.Credentials } if bindExtCreds != nil { - params[bindCredentialsKey] = bindExtCreds.Credentials + params[apb.BindCredentialsKey] = bindExtCreds.Credentials } serviceInstance, err := a.GetServiceInstance(instance.ID) if err != nil { @@ -1001,6 +1036,15 @@ func (a AnsibleBroker) Unbind( log.Error("Failed to start new job for async unbind\n%s", jerr.Error()) return nil, false, jerr } + + if _, err := a.dao.SetState(serviceInstance.ID.String(), apb.JobState{ + Token: token, + State: apb.StateInProgress, + Method: apb.JobMethodUnbind, + }); err != nil { + log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error()) + } + } else if a.brokerConfig.LaunchApbOnBind { // only launch apb if we are always launching the APB. if skipApbExecution { diff --git a/pkg/broker/deprovision_subscriber.go b/pkg/broker/deprovision_subscriber.go index d67e125e97..10946bdb3e 100644 --- a/pkg/broker/deprovision_subscriber.go +++ b/pkg/broker/deprovision_subscriber.go @@ -40,7 +40,7 @@ func (d *DeprovisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { for msg := range msgBuffer { log.Debug("received deprovision message from buffer") - if err := d.dao.SetState(msg.InstanceUUID, msg.State); err != nil { + if _, err := d.dao.SetState(msg.InstanceUUID, msg.State); err != nil { log.Errorf("failed to set state after deprovision %v", err) continue } @@ -70,7 +70,7 @@ func (d *DeprovisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { func setFailedDeprovisionJob(dao dao.Dao, dmsg JobMsg) { // have to set the state here manually as the logic that triggers this is in the subscriber dmsg.State.State = apb.StateFailed - if err := dao.SetState(dmsg.InstanceUUID, dmsg.State); err != nil { + if _, err := dao.SetState(dmsg.InstanceUUID, dmsg.State); err != nil { log.Errorf("failed to set state after deprovision %v", err) } } diff --git a/pkg/broker/provision_subscriber.go b/pkg/broker/provision_subscriber.go index 06c3fb7ca5..976dd2d5e7 100644 --- a/pkg/broker/provision_subscriber.go +++ b/pkg/broker/provision_subscriber.go @@ -43,7 +43,7 @@ func (p *ProvisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { log.Errorf("failed to set extracted credentials after provision %v", err) } } - if err := p.dao.SetState(msg.InstanceUUID, msg.State); err != nil { + if _, err := p.dao.SetState(msg.InstanceUUID, msg.State); err != nil { log.Errorf("failed to set state after provision %v", err) } } diff --git a/pkg/broker/provision_subscriber_test.go b/pkg/broker/provision_subscriber_test.go index edc33a441a..586afd6c67 100644 --- a/pkg/broker/provision_subscriber_test.go +++ b/pkg/broker/provision_subscriber_test.go @@ -166,16 +166,16 @@ func (mp *mockProvisionSubscriberDAO) SetExtractedCredentials(id string, extCred return mp.err } -func (mp *mockProvisionSubscriberDAO) SetState(id string, state apb.JobState) error { +func (mp *mockProvisionSubscriberDAO) SetState(id string, state apb.JobState) (string, error) { assert := mp.assertOn["SetState"] if nil != assert { if err := assert(id, state); err != nil { mp.assertErr = append(mp.assertErr, err) - return err + return "", err } } mp.calls["SetState"]++ - return mp.err + return "", mp.err } diff --git a/pkg/broker/types.go b/pkg/broker/types.go index 53f58f0214..238befc78e 100644 --- a/pkg/broker/types.go +++ b/pkg/broker/types.go @@ -300,5 +300,5 @@ func (jm JobMsg) Render() string { // SubscriberDAO defines the interface subscribers use when persisting state type SubscriberDAO interface { SetExtractedCredentials(id string, extCreds *apb.ExtractedCredentials) error - SetState(id string, state apb.JobState) error + SetState(id string, state apb.JobState) (string, error) } diff --git a/pkg/broker/unbinding_subscriber.go b/pkg/broker/unbinding_subscriber.go index d18c8c9eea..7ea4623f0f 100644 --- a/pkg/broker/unbinding_subscriber.go +++ b/pkg/broker/unbinding_subscriber.go @@ -41,7 +41,7 @@ func (b *UnbindingWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { log.Info("Listening for binding messages") for msg := range msgBuffer { log.Debug("Processed binding message from buffer") - if err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil { + if _, err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil { log.Errorf("failed to set state after deprovision %v", err) } } diff --git a/pkg/broker/update_subscriber.go b/pkg/broker/update_subscriber.go index fa341a96af..ad221f6d7b 100644 --- a/pkg/broker/update_subscriber.go +++ b/pkg/broker/update_subscriber.go @@ -43,7 +43,7 @@ func (u *UpdateWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) { log.Errorf("failed to set extracted credentials after update %v", err) } } - if err := u.dao.SetState(msg.InstanceUUID, msg.State); err != nil { + if _, err := u.dao.SetState(msg.InstanceUUID, msg.State); err != nil { log.Errorf("failed to set state after update %v", err) } } diff --git a/pkg/dao/dao.go b/pkg/dao/dao.go index 8296678ccb..b5a9558105 100644 --- a/pkg/dao/dao.go +++ b/pkg/dao/dao.go @@ -83,11 +83,14 @@ type Dao interface { DeleteExtractedCredentials(string) error // SetState - Set the Job State in the kvp API for id. - SetState(string, apb.JobState) error + SetState(string, apb.JobState) (string, error) // GetState - Retrieve a job state from the kvp API for an ID and Token. GetState(string, string) (apb.JobState, error) + // GetStateByKey - Retrieve a job state from the kvp API for a job key + GetStateByKey(key string) (apb.JobState, error) + // BatchSetPlanNames - set plannames based on PlanNameManifest in the kvp API. BatchSetPlanNames(map[string]string) error diff --git a/pkg/dao/etcd/dao.go b/pkg/dao/etcd/dao.go index 866a7a9113..8ce966637d 100644 --- a/pkg/dao/etcd/dao.go +++ b/pkg/dao/etcd/dao.go @@ -334,14 +334,20 @@ func (d *Dao) DeleteExtractedCredentials(id string) error { } // SetState - Set the Job State in the kvp API for id. -func (d *Dao) SetState(id string, state apb.JobState) error { - return d.setObject(stateKey(id, state.Token), state) +func (d *Dao) SetState(id string, state apb.JobState) (string, error) { + key := stateKey(id, state.Token) + return key, d.setObject(key, state) } // GetState - Retrieve a job state from the kvp API for an ID and Token. func (d *Dao) GetState(id string, token string) (apb.JobState, error) { + return d.GetStateByKey(stateKey(id, token)) +} + +// GetStateByKey - Retrieve a job state from the kvp API for a job key +func (d *Dao) GetStateByKey(key string) (apb.JobState, error) { state := apb.JobState{} - if err := d.getObject(stateKey(id, token), &state); err != nil { + if err := d.getObject(key, &state); err != nil { return apb.JobState{State: apb.StateFailed}, err } return state, nil