Skip to content

Commit

Permalink
feat: Create interface for eval events. (#1288)
Browse files Browse the repository at this point in the history
## This PR

Before this change the eval services used a unexported
struct which prevented creating eval services outside of
this package.

This change creates a new IEvents interface that allows
providing custom impls of flag eval services.

### Related Issues


### Notes
<!-- any additional notes for this PR -->

### Follow-up Tasks
<!-- anything that is related to this PR but not done here should be
noted under this section -->
<!-- if there is a need for a new issue, please link it here -->

### How to test
<!-- if applicable, add testing instructions under this section -->

Signed-off-by: Connor Hindley <[email protected]>
  • Loading branch information
connyay authored Apr 25, 2024
1 parent e1752ba commit 9714215
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 22 deletions.
6 changes: 3 additions & 3 deletions flagd/pkg/service/flag-evaluation/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ConnectService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents

server *http.Server
metricsServer *http.Server
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio

// Notify emits change event notifications for subscriptions
func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.emitToAll(n)
s.eventingConfiguration.EmitToAll(n)
}

// nolint: funlen
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {

func (s *ConnectService) Shutdown() {
s.readinessEnabled = false
s.eventingConfiguration.emitToAll(service.Notification{
s.eventingConfiguration.EmitToAll(service.Notification{
Type: service.Shutdown,
Data: map[string]interface{}{},
})
Expand Down
4 changes: 2 additions & 2 deletions flagd/pkg/service/flag-evaluation/connect_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestConnectServiceNotify(t *testing.T) {

sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.subs["key"] = sChan
eventing.Subscribe("key", sChan)

// notification type
ofType := iservice.ConfigurationChange
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestConnectServiceShutdown(t *testing.T) {

sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.subs["key"] = sChan
eventing.Subscribe("key", sChan)

// notification type
ofType := iservice.Shutdown
Expand Down
15 changes: 11 additions & 4 deletions flagd/pkg/service/flag-evaluation/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@ import (
iservice "github.com/open-feature/flagd/core/pkg/service"
)

// IEvents is an interface for event subscriptions
type IEvents interface {
Subscribe(id any, notifyChan chan iservice.Notification)
Unsubscribe(id any)
EmitToAll(n iservice.Notification)
}

// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan iservice.Notification
subs map[any]chan iservice.Notification
}

func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
func (eventing *eventingConfiguration) Subscribe(id any, notifyChan chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

eventing.subs[id] = notifyChan
}

func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
func (eventing *eventingConfiguration) EmitToAll(n iservice.Notification) {
eventing.mu.RLock()
defer eventing.mu.RUnlock()

Expand All @@ -28,7 +35,7 @@ func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
}
}

func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
func (eventing *eventingConfiguration) Unsubscribe(id any) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

Expand Down
10 changes: 5 additions & 5 deletions flagd/pkg/service/flag-evaluation/eventing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func TestSubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)

// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
Expand All @@ -43,10 +43,10 @@ func TestUnsubscribe(t *testing.T) {
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)
eventing.Subscribe(idA, chanA)
eventing.Subscribe(idB, chanB)

eventing.unSubscribe(idA)
eventing.Unsubscribe(idA)

// then
require.Empty(t, eventing.subs[idA],
Expand Down
8 changes: 4 additions & 4 deletions flagd/pkg/service/flag-evaluation/flag_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ type OldFlagEvaluationService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents
flagEvalTracer trace.Tracer
}

// NewOldFlagEvaluationService creates a OldFlagEvaluationService with provided parameters
func NewOldFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
eval evaluator.IEvaluator, eventingCfg IEvents, metricsRecorder *telemetry.MetricsRecorder,
) *OldFlagEvaluationService {
return &OldFlagEvaluationService{
logger: log,
Expand Down Expand Up @@ -117,8 +117,8 @@ func (s *OldFlagEvaluationService) EventStream(
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)

requestNotificationChan <- service.Notification{
Type: service.ProviderReady,
Expand Down
8 changes: 4 additions & 4 deletions flagd/pkg/service/flag-evaluation/flag_evaluator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type FlagEvaluationService struct {
logger *logger.Logger
eval evaluator.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
eventingConfiguration IEvents
flagEvalTracer trace.Tracer
}

// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
func NewFlagEvaluationService(log *logger.Logger,
eval evaluator.IEvaluator,
eventingCfg *eventingConfiguration,
eventingCfg IEvents,
metricsRecorder *telemetry.MetricsRecorder,
) *FlagEvaluationService {
return &FlagEvaluationService{
Expand Down Expand Up @@ -116,8 +116,8 @@ func (s *FlagEvaluationService) EventStream(
stream *connect.ServerStream[evalV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)
s.eventingConfiguration.Subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.Unsubscribe(req)

requestNotificationChan <- service.Notification{
Type: service.ProviderReady,
Expand Down

0 comments on commit 9714215

Please sign in to comment.