Skip to content

Commit

Permalink
feat: Jetstream allows multiple dependencies to refer to the same Eve…
Browse files Browse the repository at this point in the history
…nt (#1870)

* selectively perform validation check for STAN and not Jetstream

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored Apr 21, 2022
1 parent 13341ef commit 45e40d8
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 37 deletions.
25 changes: 23 additions & 2 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -93,7 +96,25 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
controllerutil.AddFinalizer(sensor, finalizerName)

sensor.Status.InitConditions()
if err := ValidateSensor(sensor); err != nil {

eventBus := &eventbusv1alpha1.EventBus{}
eventBusName := common.DefaultEventBusName
if len(sensor.Spec.EventBusName) > 0 {
eventBusName = sensor.Spec.EventBusName
}
err := r.client.Get(ctx, types.NamespacedName{Namespace: sensor.Namespace, Name: eventBusName}, eventBus)
if err != nil {
if apierrors.IsNotFound(err) {
sensor.Status.MarkDeployFailed("EventBusNotFound", "EventBus not found.")
log.Errorw("EventBus not found", "eventBusName", eventBusName, "error", err)
return errors.Errorf("eventbus %s not found", eventBusName)
}
sensor.Status.MarkDeployFailed("GetEventBusFailed", "Failed to get EventBus.")
log.Errorw("failed to get EventBus", "eventBusName", eventBusName, "error", err)
return err
}

if err := ValidateSensor(sensor, eventBus); err != nil {
log.Errorw("validation error", "error", err)
return err
}
Expand All @@ -106,7 +127,7 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
common.LabelOwnerName: sensor.Name,
},
}
return Reconcile(r.client, args, log)
return Reconcile(r.client, eventBus, args, log)
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.Sensor) bool {
Expand Down
24 changes: 9 additions & 15 deletions controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/argoproj/argo-events/common"
Expand All @@ -48,28 +47,23 @@ type AdaptorArgs struct {
}

// Reconcile does the real logic
func Reconcile(client client.Client, args *AdaptorArgs, logger *zap.SugaredLogger) error {
func Reconcile(client client.Client, eventBus *eventbusv1alpha1.EventBus, args *AdaptorArgs, logger *zap.SugaredLogger) error {
ctx := context.Background()
sensor := args.Sensor
eventBus := &eventbusv1alpha1.EventBus{}

if eventBus == nil {
sensor.Status.MarkDeployFailed("GetEventBusFailed", "Failed to get EventBus.")
logger.Error("failed to get EventBus")
return errors.New("failed to get EventBus")
}

eventBusName := common.DefaultEventBusName
if len(sensor.Spec.EventBusName) > 0 {
eventBusName = sensor.Spec.EventBusName
}
err := client.Get(ctx, types.NamespacedName{Namespace: sensor.Namespace, Name: eventBusName}, eventBus)
if err != nil {
if apierrors.IsNotFound(err) {
sensor.Status.MarkDeployFailed("EventBusNotFound", "EventBus not found.")
logger.Errorw("EventBus not found", "eventBusName", eventBusName, "error", err)
return errors.Errorf("eventbus %s not found", eventBusName)
}
sensor.Status.MarkDeployFailed("GetEventBusFailed", "Failed to get EventBus.")
logger.Errorw("failed to get EventBus", "eventBusName", eventBusName, "error", err)
return err
}
if !eventBus.Status.IsReady() {
sensor.Status.MarkDeployFailed("EventBusNotReady", "EventBus not ready.")
logger.Errorw("event bus is not in ready status", "eventBusName", eventBusName, "error", err)
logger.Errorw("event bus is not in ready status", "eventBusName", eventBusName)
return errors.New("eventbus not ready")
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/sensor/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestResourceReconcile(t *testing.T) {
Sensor: sensorObj,
Labels: testLabels,
}
err := Reconcile(cl, args, logging.NewArgoEventsLogger())
err := Reconcile(cl, nil, args, logging.NewArgoEventsLogger())
assert.Error(t, err)
assert.False(t, sensorObj.Status.IsReady())
})
Expand All @@ -180,7 +180,7 @@ func TestResourceReconcile(t *testing.T) {
Sensor: sensorObj,
Labels: testLabels,
}
err = Reconcile(cl, args, logging.NewArgoEventsLogger())
err = Reconcile(cl, testBus, args, logging.NewArgoEventsLogger())
assert.Nil(t, err)
assert.True(t, sensorObj.Status.IsReady())

Expand Down
21 changes: 12 additions & 9 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
cronlib "github.com/robfig/cron/v3"

"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

// ValidateSensor accepts a sensor and performs validation against it
// we return an error so that it can be logged as a message on the sensor status
// the error is ignored by the operation context as subsequent re-queues would produce the same error.
// Exporting this function so that external APIs can use this to validate sensor resource.
func ValidateSensor(s *v1alpha1.Sensor) error {
if err := validateDependencies(s.Spec.Dependencies); err != nil {
func ValidateSensor(s *v1alpha1.Sensor, b *eventbusv1alpha1.EventBus) error {
if err := validateDependencies(s.Spec.Dependencies, b); err != nil {
s.Status.MarkDependenciesNotProvided("InvalidDependencies", err.Error())
return err
}
Expand Down Expand Up @@ -420,10 +421,11 @@ func validateTriggerParameter(parameter *v1alpha1.TriggerParameter) error {
}

// perform a check to see that each event dependency is in correct format and has valid filters set if any
func validateDependencies(eventDependencies []v1alpha1.EventDependency) error {
func validateDependencies(eventDependencies []v1alpha1.EventDependency, b *eventbusv1alpha1.EventBus) error {
if len(eventDependencies) < 1 {
return errors.New("no event dependencies found")
}

comboKeys := make(map[string]bool)
for _, dep := range eventDependencies {
if dep.Name == "" {
Expand All @@ -436,13 +438,14 @@ func validateDependencies(eventDependencies []v1alpha1.EventDependency) error {
if dep.EventName == "" {
return errors.New("event dependency must define the EventName")
}
// EventSourceName + EventName can not be referenced more than once in one Sensor object.
comboKey := fmt.Sprintf("%s-$$$-%s", dep.EventSourceName, dep.EventName)
// todo: conditionally perform this check for STAN and not Jetstream (may need to reverse the order of calls in order to perform validation after getting the EventBus configuration (to know the type))
if _, existing := comboKeys[comboKey]; existing {
return errors.Errorf("%s and %s are referenced more than once in this Sensor object", dep.EventSourceName, dep.EventName)
if b.Spec.NATS != nil {
// For STAN, EventSourceName + EventName can not be referenced more than once in one Sensor object.
comboKey := fmt.Sprintf("%s-$$$-%s", dep.EventSourceName, dep.EventName)
if _, existing := comboKeys[comboKey]; existing {
return errors.Errorf("Event '%s' from EventSource '%s' is referenced for more than one dependency in this Sensor object", dep.EventName, dep.EventSourceName)
}
comboKeys[comboKey] = true
}
comboKeys[comboKey] = true

if err := validateEventFilter(dep.Filters); err != nil {
return err
Expand Down
32 changes: 24 additions & 8 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"testing"

eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
Expand All @@ -41,35 +42,50 @@ func TestValidateSensor(t *testing.T) {
assert.NoError(t, err)

var sensor *v1alpha1.Sensor
eventBus := &eventbusv1alpha1.EventBus{Spec: eventbusv1alpha1.EventBusSpec{JetStream: &eventbusv1alpha1.JetStreamBus{}}}
err = yaml.Unmarshal(content, &sensor)
assert.NoError(t, err)

err = ValidateSensor(sensor)
err = ValidateSensor(sensor, eventBus)
assert.NoError(t, err)
})
}
}

func TestValidDependencies(t *testing.T) {
/*t.Run("test duplicate deps", func(t *testing.T) {
jetstreamBus := &eventbusv1alpha1.EventBus{Spec: eventbusv1alpha1.EventBusSpec{JetStream: &eventbusv1alpha1.JetStreamBus{}}}
stanBus := &eventbusv1alpha1.EventBus{Spec: eventbusv1alpha1.EventBusSpec{NATS: &eventbusv1alpha1.NATSBus{}}}

t.Run("test duplicate deps fail for STAN", func(t *testing.T) {
sObj := sensorObj.DeepCopy()
sObj.Spec.Dependencies = append(sObj.Spec.Dependencies, v1alpha1.EventDependency{
Name: "fake-dep2",
EventSourceName: "fake-source",
EventName: "fake-one",
})
err := ValidateSensor(sObj)
err := ValidateSensor(sObj, stanBus)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "more than once"))
})*/
assert.Equal(t, true, strings.Contains(err.Error(), "is referenced for more than one dependency"))
})

t.Run("test duplicate deps are fine for Jetstream", func(t *testing.T) {
sObj := sensorObj.DeepCopy()
sObj.Spec.Dependencies = append(sObj.Spec.Dependencies, v1alpha1.EventDependency{
Name: "fake-dep2",
EventSourceName: "fake-source",
EventName: "fake-one",
})
err := ValidateSensor(sObj, jetstreamBus)
assert.Nil(t, err)
})

t.Run("test empty event source name", func(t *testing.T) {
sObj := sensorObj.DeepCopy()
sObj.Spec.Dependencies = append(sObj.Spec.Dependencies, v1alpha1.EventDependency{
Name: "fake-dep2",
EventName: "fake-one",
})
err := ValidateSensor(sObj)
err := ValidateSensor(sObj, jetstreamBus)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "must define the EventSourceName"))
})
Expand All @@ -80,7 +96,7 @@ func TestValidDependencies(t *testing.T) {
Name: "fake-dep2",
EventSourceName: "fake-source",
})
err := ValidateSensor(sObj)
err := ValidateSensor(sObj, jetstreamBus)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "must define the EventName"))
})
Expand All @@ -91,7 +107,7 @@ func TestValidDependencies(t *testing.T) {
EventSourceName: "fake-source2",
EventName: "fake-one2",
})
err := ValidateSensor(sObj)
err := ValidateSensor(sObj, jetstreamBus)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "must define a name"))
})
Expand Down
14 changes: 13 additions & 1 deletion webhook/validator/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package validator

import (
"context"
"fmt"

"github.com/argoproj/argo-events/common"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

sensorcontroller "github.com/argoproj/argo-events/controllers/sensor"
Expand All @@ -30,7 +33,16 @@ func NewSensorValidator(client kubernetes.Interface, ebClient eventbusclient.Int
}

func (s *sensor) ValidateCreate(ctx context.Context) *admissionv1.AdmissionResponse {
if err := sensorcontroller.ValidateSensor(s.newSensor); err != nil {
eventBusName := common.DefaultEventBusName
if len(s.newSensor.Spec.EventBusName) > 0 {
eventBusName = s.newSensor.Spec.EventBusName
}
eventBus, err := s.eventBusClient.ArgoprojV1alpha1().EventBus(s.newSensor.Namespace).Get(ctx, eventBusName, metav1.GetOptions{})
if err != nil {
return DeniedResponse(fmt.Sprintf("failed to get EventBus eventBusName=%s; err=%v", eventBusName, err))
}

if err := sensorcontroller.ValidateSensor(s.newSensor, eventBus); err != nil {
return DeniedResponse(err.Error())
}
return AllowedResponse()
Expand Down
46 changes: 46 additions & 0 deletions webhook/validator/sensor_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,66 @@
package validator

import (
"context"
"fmt"
"io/ioutil"
"testing"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

var (
fakeBus = &eventbusv1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: eventbusv1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: common.DefaultEventBusName,
},
Spec: eventbusv1alpha1.EventBusSpec{
NATS: &eventbusv1alpha1.NATSBus{
Native: &eventbusv1alpha1.NativeStrategy{
Auth: &eventbusv1alpha1.AuthStrategyToken,
},
},
},
Status: eventbusv1alpha1.EventBusStatus{
Config: eventbusv1alpha1.BusConfig{
NATS: &eventbusv1alpha1.NATSConfig{
URL: "nats://xxxx",
Auth: &eventbusv1alpha1.AuthStrategyToken,
AccessSecret: &corev1.SecretKeySelector{
Key: "test-key",
LocalObjectReference: corev1.LocalObjectReference{
Name: "test-name",
},
},
},
},
},
}
)

func TestValidateSensor(t *testing.T) {
dir := "../../examples/sensors"
files, err := ioutil.ReadDir(dir)
assert.Nil(t, err)

testBus := fakeBus.DeepCopy()
testBus.Status.MarkDeployed("test", "test")
testBus.Status.MarkConfigured()
_, err = fakeEventBusClient.ArgoprojV1alpha1().EventBus(testNamespace).Create(context.TODO(), testBus, metav1.CreateOptions{})
assert.Nil(t, err)

for _, file := range files {
content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", dir, file.Name()))
assert.Nil(t, err)
Expand Down

0 comments on commit 45e40d8

Please sign in to comment.