Skip to content

Commit

Permalink
Bug 1543521 - fix async bind when job already in-progress (openshift#764
Browse files Browse the repository at this point in the history
)

The broker now stores a reference to a bind job with the BindInstance, so that
it can be looked up in the future if successive equivalent requests are
received.

fixes openshift#670

The solution was discussed here: openshift#680
  • Loading branch information
mhrivnak authored and djzager committed Feb 15, 2018
1 parent 3ebb44b commit 45396f3
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 39 deletions.
4 changes: 2 additions & 2 deletions pkg/apb/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func createExtraVars(context *Context, parameters *Parameters) (string, error) {
}

if context != nil {
paramsCopy["namespace"] = context.Namespace
paramsCopy[NamespaceKey] = context.Namespace
}

paramsCopy["cluster"] = runtime.Provider.GetRuntime()
paramsCopy[ClusterKey] = runtime.Provider.GetRuntime()
extraVars, err := json.Marshal(paramsCopy)
return string(extraVars), err
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/apb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package apb

import (
"encoding/json"
"reflect"

"github.com/openshift/ansible-service-broker/pkg/config"
logutil "github.com/openshift/ansible-service-broker/pkg/util/logging"
Expand Down Expand Up @@ -193,6 +194,15 @@ const (

// ApbContainerName - The name of the apb container
ApbContainerName = "apb"

// ProvisionCredentialsKey parameter name passed to APBs
ProvisionCredentialsKey = "_apb_provision_creds"
// BindCredentialsKey parameter name passed to APBs
BindCredentialsKey = "_apb_bind_creds"
// ClusterKey parameter name passed to APBs
ClusterKey = "cluster"
// NamespaceKey parameter name passed to APBs
NamespaceKey = "namespace"
)

// SpecLogDump - log spec for debug
Expand Down Expand Up @@ -274,9 +284,38 @@ func (si *ServiceInstance) RemoveBinding(bindingUUID uuid.UUID) {

// BindInstance - Binding Instance describes a completed binding
type BindInstance struct {
ID uuid.UUID `json:"id"`
ServiceID uuid.UUID `json:"service_id"`
Parameters *Parameters `json:"parameters"`
ID uuid.UUID `json:"id"`
ServiceID uuid.UUID `json:"service_id"`
Parameters *Parameters `json:"parameters"`
CreateJobKey string
}

// UserParameters - returns the Parameters field with any keys and values
// removed that are typically added by the broker itself. The return value
// should represent what a OSB API client provides in a bind request.
func (bi *BindInstance) UserParameters() Parameters {
if bi.Parameters == nil {
return nil
}
userparams := make(Parameters)
for key, value := range *bi.Parameters {
switch key {
// Do not copy keys that are generally added by the broker itself.
case ClusterKey, NamespaceKey, ProvisionCredentialsKey:
continue
}
userparams[key] = value
}
return userparams
}

// IsEqual - Determines if two BindInstances are equal, omitting any Parameters
// that generally get added by the broker itself.
func (bi *BindInstance) IsEqual(newbi *BindInstance) bool {
if !uuid.Equal(bi.ID, newbi.ID) || !uuid.Equal(bi.ServiceID, newbi.ServiceID) {
return false
}
return reflect.DeepEqual(bi.UserParameters(), newbi.UserParameters())
}

// LoadJSON - Generic function to unmarshal json
Expand Down
90 changes: 90 additions & 0 deletions pkg/apb/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

ft "github.com/openshift/ansible-service-broker/pkg/fusortest"
"github.com/pborman/uuid"
yaml "gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -239,3 +240,92 @@ func TestEncodedParameters(t *testing.T) {
ft.AssertEqual(t, sitelang.Pattern, "")
ft.AssertEqual(t, len(sitelang.Enum), 0)
}

func TestBindInstanceUserParamsNil(t *testing.T) {
a := BindInstance{
ID: uuid.NewUUID(),
ServiceID: uuid.NewUUID(),
}
up := a.UserParameters()
ft.AssertTrue(t, up == nil)
}

func TestBindInstanceUserParams(t *testing.T) {
a := BindInstance{
ID: uuid.NewUUID(),
ServiceID: uuid.NewUUID(),
}
a.Parameters = &Parameters{
"foo": "bar",
"cluster": "mycluster",
"namespace": "mynamespace",
"_apb_provision_creds": "letmein",
}

up := a.UserParameters()

// Make sure the "foo" key is still included
ft.AssertTrue(t, up["foo"] == "bar")

// Make sure all of these got filtered out
for _, key := range []string{"cluster", "namespace", "_apb_provision_creds"} {
_, ok := up[key]
ft.AssertFalse(t, ok)
}

}

func TestBindInstanceEqual(t *testing.T) {
a := BindInstance{
ID: uuid.NewUUID(),
ServiceID: uuid.NewUUID(),
Parameters: &Parameters{"foo": "bar"},
}
b := BindInstance{
ID: a.ID,
ServiceID: a.ServiceID,
Parameters: &Parameters{"foo": "bar"},
}
ft.AssertTrue(t, a.IsEqual(&b))
ft.AssertTrue(t, b.IsEqual(&a))
}

func TestBindInstanceNotEqual(t *testing.T) {

a := BindInstance{
ID: uuid.Parse(uuid.New()),
ServiceID: uuid.Parse(uuid.New()),
Parameters: &Parameters{"foo": "bar"},
}

data := map[string]BindInstance{
"different parameters": BindInstance{
ID: a.ID,
ServiceID: a.ServiceID,
Parameters: &Parameters{"foo": "notbar"},
},
"different ID": BindInstance{
ID: uuid.Parse(uuid.New()),
ServiceID: a.ServiceID,
Parameters: &Parameters{"foo": "bar"},
},
"different ServiceID": BindInstance{
ID: a.ID,
ServiceID: uuid.Parse(uuid.New()),
Parameters: &Parameters{"foo": "bar"},
},
"no parameters": BindInstance{
ID: a.ID,
ServiceID: a.ServiceID,
},
}

for key, binding := range data {
if a.IsEqual(&binding) {
t.Errorf("bindings were equal for case: %s", key)
}
if binding.IsEqual(&a) {
t.Errorf("bindings were equal for case: %s", key)
}
}
}
2 changes: 1 addition & 1 deletion pkg/broker/binding_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (b *BindingWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) {
log.Errorf("failed to set extracted credentials after bind %v", err)
}
}
if err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
if _, err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
log.Errorf("failed to set state after provision %v", err)
}
}
Expand Down
84 changes: 64 additions & 20 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ var (
)

const (
// provisionCredentialsKey - Key used to pass credentials to apb.
provisionCredentialsKey = "_apb_provision_creds"
// bindCredentialsKey - Key used to pas bind credentials to apb.
bindCredentialsKey = "_apb_bind_creds"
// fqNameRegex - regular expression used when forming FQName.
fqNameRegex = "[/.:-]"
)
Expand Down Expand Up @@ -823,6 +819,7 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,
if err != nil {
// etcd return not found i.e. code 100
if client.IsKeyNotFound(err) {
log.Debug("Plan not found")
return nil, false, ErrorNotFound
}
// otherwise unknown error bubble it up
Expand Down Expand Up @@ -864,36 +861,60 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,
log.Warningf("unable to retrieve provision time credentials - %v", err)
return nil, false, err
}
if bi, err := a.dao.GetBindInstance(bindingUUID.String()); err == nil {
if uuid.Equal(bi.ID, bindingInstance.ID) {
if reflect.DeepEqual(bi.Parameters, bindingInstance.Parameters) {
bindExtCreds, err := a.dao.GetExtractedCredentials(bi.ID.String())
if err != nil && !client.IsKeyNotFound(err) {
return nil, false, err
}

if existingBI, err := a.dao.GetBindInstance(bindingUUID.String()); err == nil {
if existingBI.IsEqual(bindingInstance) {
bindExtCreds, err := a.dao.GetExtractedCredentials(existingBI.ID.String())
// It's ok if there aren't any bind credentials yet.
if err != nil && !client.IsKeyNotFound(err) {
return nil, false, err
}
var createJob apb.JobState
if existingBI.CreateJobKey != "" {
createJob, err = a.dao.GetStateByKey(existingBI.CreateJobKey)
}

switch {
// unknown error
case err != nil && !client.IsKeyNotFound(err):
return nil, false, err
// If there is a job in "succeeded" state, or no job at all, or
// the referenced job no longer exists (we assume it got
// cleaned up eventually), assume everything is complete.
case createJob.State == apb.StateSucceeded, existingBI.CreateJobKey == "", client.IsKeyNotFound(err):
log.Debug("already have this binding instance, returning 200")
// since we have this already, we can set async to false
resp, err := NewBindResponse(provExtCreds, bindExtCreds)
if err != nil {
return nil, false, err
}
return resp, false, ErrorBindingExists
// If there is a job in any other state, send client through async flow.
case len(createJob.State) > 0:
return &BindResponse{Operation: createJob.Token}, true, nil
// This should not happen unless there is bad data in the data store.
default:
err = errors.New("found a JobState with no value for field State")
log.Error(err.Error())
return nil, false, err
}

// parameters are different
log.Info("duplicate binding instance diff params, returning 409 conflict")
return nil, false, ErrorDuplicate
}

// parameters are different
log.Info("duplicate binding instance diff params, returning 409 conflict")
return nil, false, ErrorDuplicate
} else if !client.IsKeyNotFound(err) {
return nil, false, err
}

// No existing BindInstance was found above, so proceed with saving this one
if err := a.dao.SetBindInstance(bindingUUID.String(), bindingInstance); err != nil {
return nil, false, err
}

// Add the DB Credentials this will allow the apb to use these credentials
// Add the DB Credentials. This will allow the apb to use these credentials
// if it so chooses.
if provExtCreds != nil {
params[provisionCredentialsKey] = provExtCreds.Credentials
params[apb.ProvisionCredentialsKey] = provExtCreds.Credentials
}

// NOTE: We are currently disabling running an APB on bind via
Expand All @@ -915,6 +936,20 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,
log.Error("Failed to start new job for async binding\n%s", err.Error())
return nil, false, err
}

stateKey, err := a.dao.SetState(instance.ID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodBind,
})
if err != nil {
log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error())
return nil, false, err
}
bindingInstance.CreateJobKey = stateKey
if err := a.dao.SetBindInstance(bindingUUID.String(), bindingInstance); err != nil {
return nil, false, err
}
return &BindResponse{Operation: token}, true, nil
} else if a.brokerConfig.LaunchApbOnBind {
// we are synchronous mode
Expand Down Expand Up @@ -974,10 +1009,10 @@ func (a AnsibleBroker) Unbind(
instance.ID, bindInstance.ID)
}
if provExtCreds != nil {
params[provisionCredentialsKey] = provExtCreds.Credentials
params[apb.ProvisionCredentialsKey] = provExtCreds.Credentials
}
if bindExtCreds != nil {
params[bindCredentialsKey] = bindExtCreds.Credentials
params[apb.BindCredentialsKey] = bindExtCreds.Credentials
}
serviceInstance, err := a.GetServiceInstance(instance.ID)
if err != nil {
Expand All @@ -1001,6 +1036,15 @@ func (a AnsibleBroker) Unbind(
log.Error("Failed to start new job for async unbind\n%s", jerr.Error())
return nil, false, jerr
}

if _, err := a.dao.SetState(serviceInstance.ID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodUnbind,
}); err != nil {
log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error())
}

} else if a.brokerConfig.LaunchApbOnBind {
// only launch apb if we are always launching the APB.
if skipApbExecution {
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/deprovision_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (d *DeprovisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) {
for msg := range msgBuffer {
log.Debug("received deprovision message from buffer")

if err := d.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
if _, err := d.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
log.Errorf("failed to set state after deprovision %v", err)
continue
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func (d *DeprovisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) {
func setFailedDeprovisionJob(dao dao.Dao, dmsg JobMsg) {
// have to set the state here manually as the logic that triggers this is in the subscriber
dmsg.State.State = apb.StateFailed
if err := dao.SetState(dmsg.InstanceUUID, dmsg.State); err != nil {
if _, err := dao.SetState(dmsg.InstanceUUID, dmsg.State); err != nil {
log.Errorf("failed to set state after deprovision %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/provision_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (p *ProvisionWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) {
log.Errorf("failed to set extracted credentials after provision %v", err)
}
}
if err := p.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
if _, err := p.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
log.Errorf("failed to set state after provision %v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/broker/provision_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ func (mp *mockProvisionSubscriberDAO) SetExtractedCredentials(id string, extCred
return mp.err

}
func (mp *mockProvisionSubscriberDAO) SetState(id string, state apb.JobState) error {
func (mp *mockProvisionSubscriberDAO) SetState(id string, state apb.JobState) (string, error) {
assert := mp.assertOn["SetState"]
if nil != assert {
if err := assert(id, state); err != nil {
mp.assertErr = append(mp.assertErr, err)
return err
return "", err
}
}
mp.calls["SetState"]++
return mp.err
return "", mp.err

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,5 +300,5 @@ func (jm JobMsg) Render() string {
// SubscriberDAO defines the interface subscribers use when persisting state
type SubscriberDAO interface {
SetExtractedCredentials(id string, extCreds *apb.ExtractedCredentials) error
SetState(id string, state apb.JobState) error
SetState(id string, state apb.JobState) (string, error)
}
2 changes: 1 addition & 1 deletion pkg/broker/unbinding_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (b *UnbindingWorkSubscriber) Subscribe(msgBuffer <-chan JobMsg) {
log.Info("Listening for binding messages")
for msg := range msgBuffer {
log.Debug("Processed binding message from buffer")
if err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
if _, err := b.dao.SetState(msg.InstanceUUID, msg.State); err != nil {
log.Errorf("failed to set state after deprovision %v", err)
}
}
Expand Down
Loading

0 comments on commit 45396f3

Please sign in to comment.