-
Notifications
You must be signed in to change notification settings - Fork 405
/
provision.go
492 lines (422 loc) · 19.4 KB
/
provision.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
package broker
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/kubernetes-sigs/service-catalog/pkg/apis/servicecatalog/v1beta1"
"github.com/pkg/errors"
osb "github.com/pmorie/go-open-service-broker-client/v2"
"github.com/sirupsen/logrus"
istioauthenticationalpha1 "istio.io/api/authentication/v1alpha1"
istiov1alpha1 "istio.io/client-go/pkg/apis/authentication/v1alpha1"
istioversionedclient "istio.io/client-go/pkg/clientset/versioned"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1"
"github.com/kyma-project/kyma/components/application-broker/internal"
"github.com/kyma-project/kyma/components/application-broker/internal/access"
"github.com/kyma-project/kyma/components/application-broker/internal/knative"
"github.com/kyma-project/kyma/components/application-broker/pkg/apis/applicationconnector/v1alpha1"
v1client "github.com/kyma-project/kyma/components/application-broker/pkg/client/clientset/versioned/typed/applicationconnector/v1alpha1"
)
const (
integrationNamespace = "kyma-integration"
serviceCatalogAPIVersion = "servicecatalog.k8s.io/v1beta1"
// knativeEventingInjectionLabelKey used for enabling Knative eventing default broker for a given namespace
knativeEventingInjectionLabelKey = "knative-eventing-injection"
knativeEventingInjectionLabelValueEnabled = "enabled"
// applicationNameLabelKey is used to selected Knative Channels and Subscriptions
applicationNameLabelKey = "application-name"
// brokerNamespaceLabelKey is used to selected Knative Subscriptions
brokerNamespaceLabelKey = "broker-namespace"
// knSubscriptionNamePrefix is the prefix used for the generated Knative Subscription name
knSubscriptionNamePrefix = "brokersub"
// brokerTargetSelectorName used for targeting the default-broker svc while creating an istio policy
brokerTargetSelectorName = "default-broker"
// filterTargetSelectorName used for targeting the default-broker-filter svc while creating an istio policy
filterTargetSelectorName = "default-broker-filter"
istioMtlsPermissiveMode = 1
policyNameSuffix = "-broker"
)
// NewProvisioner creates provisioner
func NewProvisioner(instanceInserter instanceInserter, instanceGetter instanceGetter,
instanceStateGetter instanceStateGetter, operationInserter operationInserter, operationUpdater operationUpdater,
accessChecker access.ProvisionChecker, appSvcFinder appSvcFinder, serviceInstanceGetter serviceInstanceGetter,
eaClient v1client.ApplicationconnectorV1alpha1Interface, knClient knative.Client,
istioClient istioversionedclient.Interface, iStateUpdater instanceStateUpdater,
operationIDProvider func() (internal.OperationID, error), log logrus.FieldLogger) *ProvisionService {
return &ProvisionService{
instanceInserter: instanceInserter,
instanceGetter: instanceGetter,
instanceStateGetter: instanceStateGetter,
instanceStateUpdater: iStateUpdater,
operationInserter: operationInserter,
operationUpdater: operationUpdater,
operationIDProvider: operationIDProvider,
accessChecker: accessChecker,
appSvcFinder: appSvcFinder,
eaClient: eaClient,
knClient: knClient,
istioClient: istioClient,
serviceInstanceGetter: serviceInstanceGetter,
maxWaitTime: time.Minute,
log: log.WithField("service", "provisioner"),
}
}
// ProvisionService performs provisioning action
type ProvisionService struct {
instanceInserter instanceInserter
instanceGetter instanceGetter
instanceStateUpdater instanceStateUpdater
operationInserter operationInserter
operationUpdater operationUpdater
instanceStateGetter instanceStateGetter
operationIDProvider func() (internal.OperationID, error)
appSvcFinder appSvcFinder
eaClient v1client.ApplicationconnectorV1alpha1Interface
accessChecker access.ProvisionChecker
serviceInstanceGetter serviceInstanceGetter
knClient knative.Client
istioClient istioversionedclient.Interface
mu sync.Mutex
maxWaitTime time.Duration
log logrus.FieldLogger
asyncHook func()
}
// Provision action
func (svc *ProvisionService) Provision(ctx context.Context, osbCtx osbContext, req *osb.ProvisionRequest) (*osb.ProvisionResponse, *osb.HTTPStatusCodeError) {
if len(req.Parameters) > 0 {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusBadRequest, ErrorMessage: strPtr("application-broker does not support configuration options for provisioning")}
}
if !req.AcceptsIncomplete {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusBadRequest, ErrorMessage: strPtr("asynchronous operation mode required")}
}
svc.mu.Lock()
defer svc.mu.Unlock()
iID := internal.InstanceID(req.InstanceID)
switch state, err := svc.instanceStateGetter.IsProvisioned(iID); true {
case err != nil:
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusInternalServerError, ErrorMessage: strPtr(fmt.Sprintf("while checking if instance is already provisioned: %v", err))}
case state:
return &osb.ProvisionResponse{Async: false}, nil
}
switch opIDInProgress, inProgress, err := svc.instanceStateGetter.IsProvisioningInProgress(iID); true {
case err != nil:
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusInternalServerError, ErrorMessage: strPtr(fmt.Sprintf("while checking if instance is being provisioned: %v", err))}
case inProgress:
opKeyInProgress := osb.OperationKey(opIDInProgress)
return &osb.ProvisionResponse{Async: true, OperationKey: &opKeyInProgress}, nil
}
id, err := svc.operationIDProvider()
if err != nil {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusInternalServerError, ErrorMessage: strPtr(fmt.Sprintf("while generating ID for operation: %v", err))}
}
op := internal.InstanceOperation{
InstanceID: iID,
OperationID: id,
Type: internal.OperationTypeCreate,
State: internal.OperationStateInProgress,
}
if err := svc.operationInserter.Insert(&op); err != nil {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusInternalServerError, ErrorMessage: strPtr(fmt.Sprintf("while inserting instance operation to storage: %v", err))}
}
svcID := internal.ServiceID(req.ServiceID)
svcPlanID := internal.ServicePlanID(req.PlanID)
app, err := svc.appSvcFinder.FindOneByServiceID(internal.ApplicationServiceID(req.ServiceID))
if err != nil {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusInternalServerError, ErrorMessage: strPtr(fmt.Sprintf("while getting application with id: %s to storage: %v", req.ServiceID, err))}
}
namespace := internal.Namespace(osbCtx.BrokerNamespace)
service, err := getSvcByID(app.Services, internal.ApplicationServiceID(req.ServiceID))
if err != nil {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusBadRequest, ErrorMessage: strPtr(fmt.Sprintf("while getting service [%s] from Application [%s]: %v", req.ServiceID, app.Name, err))}
}
i := internal.Instance{
ID: iID,
Namespace: namespace,
ServiceID: svcID,
ServicePlanID: svcPlanID,
State: internal.InstanceStatePending,
}
if err = svc.instanceInserter.Insert(&i); err != nil {
return nil, &osb.HTTPStatusCodeError{StatusCode: http.StatusBadRequest, ErrorMessage: strPtr(fmt.Sprintf("while inserting instance to storage: %v", err))}
}
opKey := osb.OperationKey(op.OperationID)
resp := &osb.ProvisionResponse{
OperationKey: &opKey,
Async: true,
}
svc.doAsync(iID, id, app.Name, getApplicationServiceID(req), namespace, service.EventProvider, service.DisplayName)
return resp, nil
}
func getApplicationServiceID(req *osb.ProvisionRequest) internal.ApplicationServiceID {
return internal.ApplicationServiceID(req.ServiceID)
}
func (svc *ProvisionService) doAsync(iID internal.InstanceID, opID internal.OperationID, appName internal.ApplicationName, appID internal.ApplicationServiceID, ns internal.Namespace, eventProvider bool, displayName string) {
go svc.do(iID, opID, appName, appID, ns, eventProvider, displayName)
}
func (svc *ProvisionService) do(iID internal.InstanceID, opID internal.OperationID, appName internal.ApplicationName, appID internal.ApplicationServiceID, ns internal.Namespace, eventProvider bool, displayName string) {
if svc.asyncHook != nil {
defer svc.asyncHook()
}
instanceState := internal.InstanceStateSucceeded
opState := internal.OperationStateSucceeded
opDesc := internal.OperationDescriptionProvisioningSucceeded
canProvisionOutput, err := svc.accessChecker.CanProvision(iID, appID, ns, svc.maxWaitTime)
svc.log.Infof("Access checker: canProvisionInstance(appName=[%s], appID=[%s], ns=[%s]) returned: canProvisionOutput=[%+v], error=[%v]", appName, appID, ns, canProvisionOutput, err)
if err != nil {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed on error: %s", err)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
if !canProvisionOutput.Allowed {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("Forbidden provisioning instance [%s] for application [name: %s, id: %s] in namespace: [%s]. Reason: [%s]", iID, appName, appID, ns, canProvisionOutput.Reason)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
if !eventProvider {
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
// create Kyma EventActivation
if err := svc.createEaOnSuccessProvision(appName, appID, ns, displayName, iID); err != nil {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed while creating EventActivation on error: %s", err)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
// persist Knative Subscription
if err := svc.persistKnativeSubscription(appName, ns); err != nil {
svc.log.Printf("Error persisting Knative Subscription: %v", err)
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed while persisting Knative Subscription for application: %s namespace: %s on error: %s", appName, ns, err)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
// enable the namespace default Knative Broker
if err := svc.enableDefaultKnativeBroker(ns); err != nil {
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed while enabling default Knative Broker for namespace: %s"+
" on error: %s", ns, err)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
// Create istio policy
if err := svc.createIstioPolicy(ns); err != nil {
svc.log.Errorf("Error creating istio policy: %v", err)
instanceState = internal.InstanceStateFailed
opState = internal.OperationStateFailed
opDesc = fmt.Sprintf("provisioning failed while creating an istio policy for application: %s"+
" namespace: %s on error: %s", appName, ns, err)
svc.updateStates(iID, opID, instanceState, opState, opDesc)
return
}
svc.updateStates(iID, opID, instanceState, opState, opDesc)
}
func (svc *ProvisionService) updateStates(iID internal.InstanceID, opID internal.OperationID,
instanceState internal.InstanceState, opState internal.OperationState, opDesc string) {
if err := svc.instanceStateUpdater.UpdateState(iID, instanceState); err != nil {
svc.log.Errorf("Cannot update state of the stored instance [%s]: [%v]", iID, err)
}
if err := svc.operationUpdater.UpdateStateDesc(iID, opID, opState, &opDesc); err != nil {
svc.log.Errorf("Cannot update state for ServiceInstance [%s]: [%v]", iID, err)
}
}
func (svc *ProvisionService) createEaOnSuccessProvision(appName internal.ApplicationName,
appID internal.ApplicationServiceID, ns internal.Namespace, displayName string, iID internal.InstanceID) error {
// instance ID is the serviceInstance.Spec.ExternalID
si, err := svc.serviceInstanceGetter.GetByNamespaceAndExternalID(string(ns), string(iID))
if err != nil {
return errors.Wrapf(err, "while getting service instance with external id: %q in namespace: %q", iID, ns)
}
ea := &v1alpha1.EventActivation{
TypeMeta: v1.TypeMeta{
Kind: "EventActivation",
APIVersion: "applicationconnector.kyma-project.io/v1alpha1",
},
ObjectMeta: v1.ObjectMeta{
Name: string(appID),
Namespace: string(ns),
OwnerReferences: []v1.OwnerReference{
{
APIVersion: serviceCatalogAPIVersion,
Kind: "ServiceInstance",
Name: si.Name,
UID: si.UID,
},
},
},
Spec: v1alpha1.EventActivationSpec{
DisplayName: displayName,
SourceID: string(appName),
},
}
_, err = svc.eaClient.EventActivations(string(ns)).Create(ea)
switch {
case err == nil:
svc.log.Infof("Created EventActivation: [%s], in namespace: [%s]", appID, ns)
case apiErrors.IsAlreadyExists(err):
// We perform update action to adjust OwnerReference of the EventActivation after the backup restore.
if err = svc.ensureEaUpdate(string(appID), string(ns), si); err != nil {
return errors.Wrapf(err, "while ensuring update on EventActivation")
}
svc.log.Infof("Updated EventActivation: [%s], in namespace: [%s]", appID, ns)
default:
return errors.Wrapf(err, "while creating EventActivation with name: %q in namespace: %q", appID, ns)
}
return nil
}
func (svc *ProvisionService) ensureEaUpdate(appID, ns string, si *v1beta1.ServiceInstance) error {
ea, err := svc.eaClient.EventActivations(ns).Get(appID, v1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "while getting EventActivation with name: %q from namespace: %q", appID, ns)
}
ea.OwnerReferences = []v1.OwnerReference{
{
APIVersion: serviceCatalogAPIVersion,
Kind: "ServiceInstance",
Name: si.Name,
UID: si.UID,
},
}
_, err = svc.eaClient.EventActivations(ns).Update(ea)
if err != nil {
return errors.Wrapf(err, "while updating EventActivation with name: %q in namespace: %q", appID, ns)
}
return nil
}
// enableDefaultKnativeBroker enables the Knative Eventing default broker for the given namespace
// by adding the proper label to the namespace.
func (svc *ProvisionService) enableDefaultKnativeBroker(ns internal.Namespace) error {
// get the namespace
namespace, err := svc.knClient.GetNamespace(string(ns))
if err != nil {
return errors.Wrap(err, "namespace not found")
}
// check if the namespace has the injection label
if val, ok := namespace.Labels[knativeEventingInjectionLabelKey]; ok && val == knativeEventingInjectionLabelValueEnabled {
svc.log.Printf("the default Knative Eventing Broker is already enabled for the namespace: [%s]", namespace.Name)
return nil
}
// add the injection label to the namespace
if len(namespace.Labels) == 0 {
namespace.Labels = make(map[string]string, 1)
}
namespace.Labels[knativeEventingInjectionLabelKey] = knativeEventingInjectionLabelValueEnabled
// update the namespace
_, err = svc.knClient.UpdateNamespace(namespace)
if err != nil {
svc.log.Printf("error enabling the default Knative Eventing Broker for namespace: [%v] [%v]", namespace, err)
}
return err
}
// persistKnativeSubscription will get a Knative Subscription given application name and namespace and will
// update and persist it. If there is no Knative Subscription found, a new one will be created.
func (svc *ProvisionService) persistKnativeSubscription(applicationName internal.ApplicationName, ns internal.Namespace) error {
// construct the default broker URI using the given namespace.
defaultBrokerURI := knative.GetDefaultBrokerURI(ns)
// get the Knative channel for the application
channel, err := svc.channelForApp(applicationName)
if err != nil {
return errors.Wrapf(err, "getting the Knative channel for the application [%v]", applicationName)
}
// subscription selector labels
labels := map[string]string{
brokerNamespaceLabelKey: string(ns),
applicationNameLabelKey: string(applicationName),
}
// get Knative subscription by labels
subscription, err := svc.knClient.GetSubscriptionByLabels(integrationNamespace, labels)
switch {
case apiErrors.IsNotFound(err):
// subscription not found, create a new one
newSubscription := knative.Subscription(knSubscriptionNamePrefix, integrationNamespace).Spec(channel, defaultBrokerURI).Labels(labels).Build()
if _, err := svc.knClient.CreateSubscription(newSubscription); err != nil {
return errors.Wrapf(err, "creating Subscription %s", newSubscription.Name)
}
svc.log.Printf("created Knative Subscription: [%v]", newSubscription.Name)
return nil
case err != nil:
return errors.Wrapf(err, "getting Subscription by labels [%v]", labels)
}
// update Knative Subscription
subscription = knative.FromSubscription(subscription).Spec(channel, defaultBrokerURI).Labels(labels).Build()
subscription, err = svc.knClient.UpdateSubscription(subscription)
if err != nil {
return errors.Wrapf(err, "updating existing Knative Subscription with labels [%v] for channel: [%v]", labels, channel.Name)
}
svc.log.Printf("Updated Knative Subscription: [%v]", subscription.Name)
return nil
}
func (svc *ProvisionService) channelForApp(applicationName internal.ApplicationName) (*messagingv1alpha1.Channel, error) {
labels := map[string]string{
applicationNameLabelKey: string(applicationName),
}
return svc.knClient.GetChannelByLabels(integrationNamespace, labels)
}
// Create a new policy
func (svc *ProvisionService) createIstioPolicy(ns internal.Namespace) error {
policyName := fmt.Sprintf("%s%s", ns, policyNameSuffix)
policyLabels := make(map[string]string)
policyLabels["eventing.knative.dev/broker"] = "default"
svc.log.Infof("Creating Policy %s in namespace: %s", policyName, string(ns))
brokerTargetSelector := &istioauthenticationalpha1.TargetSelector{
Name: brokerTargetSelectorName,
}
filterTargetSelector := &istioauthenticationalpha1.TargetSelector{
Name: filterTargetSelectorName,
}
mtls := &istioauthenticationalpha1.MutualTls{
Mode: istioMtlsPermissiveMode,
}
peerAuthenticationMethod := &istioauthenticationalpha1.PeerAuthenticationMethod{
Params: &istioauthenticationalpha1.PeerAuthenticationMethod_Mtls{
Mtls: mtls,
},
}
policy := &istiov1alpha1.Policy{
ObjectMeta: v1.ObjectMeta{
Name: policyName,
Namespace: string(ns),
Labels: policyLabels,
},
Spec: istioauthenticationalpha1.Policy{
Targets: []*istioauthenticationalpha1.TargetSelector{brokerTargetSelector, filterTargetSelector},
Peers: []*istioauthenticationalpha1.PeerAuthenticationMethod{peerAuthenticationMethod},
},
}
_, err := svc.istioClient.AuthenticationV1alpha1().Policies(string(ns)).Create(policy)
if err != nil {
if apiErrors.IsAlreadyExists(err) {
if _, err := svc.istioClient.AuthenticationV1alpha1().Policies(string(ns)).Update(policy); err != nil {
return errors.Wrapf(err, "while updating istio policy with name: %q in namespace: %q", policyName, ns)
}
return nil
}
return errors.Wrapf(err, "while creating istio policy with name: %q in namespace: %q", policyName, ns)
}
return nil
}
func getNamespaceFromContext(contextProfile map[string]interface{}) (internal.Namespace, error) {
return internal.Namespace(contextProfile["namespace"].(string)), nil
}
func getSvcByID(services []internal.Service, id internal.ApplicationServiceID) (internal.Service, error) {
for _, svc := range services {
if svc.ID == id {
return svc, nil
}
}
return internal.Service{}, errors.Errorf("cannot find service with ID [%s]", id)
}
func strPtr(str string) *string {
return &str
}