Skip to content

Commit

Permalink
Add event bus for scheduler-internal events (#99)
Browse files Browse the repository at this point in the history
* Add bus-based event hub

Use mustafaturan/bus as a seemingly well-featured bus implementation.
Use an atomic counter as its sequence generator, to avoid adding further
dependencies for a simple construct.
The sequence generator is per hub to minimise the risk of shared state
during testing.

* Embed event bus in hub for method use

* Register topics for hub on creation

* Add source field to event hub logger

* Add docstring for NewEventHub

* Add sync -> async adapter func for model event message handling

This function attempts to solve two issues:
* Limit leaking implementation details about the use of mustafaturan/bus to the wider application
* Provide a simple interface for users with an asynchronous approach

* Refactor model event message handler creation

This should be slightly more legible, but more importantly gives a
name to the actual handle function, to improve logging.

* Use bus-based event in scheduler's main package

* Use bus-based event hub in in-memory store

* Use bus-based event hub in XDS incremental processor

* Make XDS processor's sync-handling internal

* Use bus-based event hub in agent server + make handling of syncs internal

* Use bus-based event hub in version cleaner + make handling of syncs internal

* Use bus-based event hub in scheduler server + make handling of events/listeners internal

* Add comment re use of Goroutines in agent server

* Add comment re (graceful) shutdown of scheduler

* Fix typos in event hub implementation

* Format of long lines

* Add model event publish method for event hub

Also add comment on use of goroutines.
The only place writing into the event hub previously was memory_status.go.
That chose to publish events from goroutines, but that may not be the
best policy - we should consider this.

* Replace event hub publish calls when updating model state

These used to be 'trigger' calls to the old event hub,
but are now 'publish' calls to the bus-based event hub.

* Remove unused methods for ModelEventHub

* Fix typos & build issues

* Add comment re closing channels for event hub

* Fix store package tests re use of event hub

* Fix scheduler server package tests re use of event hub

* Fix agent server package tests re use of event hub

* Remove ModelEventHub as fully replaced

* Refactor event hub types to separate file within package

* Use consistent handler keys for event hub handlers

Rather than using free-form text as keys, this change uses dot-separated fields.
The fields identify the handling type's purpose first, followed by the event's semantics.
For example, the scheduler server listens to model events but treats them in two
different ways: as model events, but also as events that can affect servers.

* Refactor event hub handler names to package-level constants

* Add logging for event publishing failures

* Refactor event source names to package-level constants

* Add mechanism for closing event hub & its handler channels

* Stop processing messages of invalid type

* Remove exposure of 'bus' library from event hub interface

* Fix typos in channel consumption

* Refactor to abstract creation of event hub handlers from 'bus' implementation

* Make event publishing synchronous

This avoids potential reordering of events caused by the use of goroutines.
Instead, callers have explicit control over sync/async operation.

* Make event hub topic name private

* Add test for event hub creation

* Use 'require' for assertion on error in test

* Add test cases for registering handlers

* Simplify event handler test to use single counter with atomic increments

* Add test cases for publishing events

* Add test cases for closing event hub
  • Loading branch information
agrski authored Feb 25, 2022
1 parent 54ccba6 commit 3f1faac
Show file tree
Hide file tree
Showing 17 changed files with 713 additions and 174 deletions.
29 changes: 16 additions & 13 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ func main() {
flag.Parse()

// Create event Hub
eventHub := &coordinator.ModelEventHub{}
eventHub, err := coordinator.NewEventHub(logger)
if err != nil {
log.WithError(err).Fatal("Unable to create event hub")
}
defer eventHub.Close()

// Create a cache
cache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
Expand All @@ -93,34 +97,33 @@ func main() {
}()

ss := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
es := processor.NewIncrementalProcessor(cache, nodeID, logger, ss, eventHub)
sched := scheduler.NewSimpleScheduler(logger,
_ = processor.NewIncrementalProcessor(cache, nodeID, logger, ss, eventHub)
sched := scheduler.NewSimpleScheduler(
logger,
ss,
scheduler.DefaultSchedulerConfig())
scheduler.DefaultSchedulerConfig(),
)
as := agent.NewAgentServer(logger, ss, sched, eventHub)

go as.ListenForSyncs() // Start agent syncs
go es.ListenForSyncs() // Start envoy syncs

s := server2.NewSchedulerServer(logger, ss, sched, eventHub)
go s.ListenForModelEvents()
go s.ListenForServerEvents()
go func() {
err := s.StartGrpcServer(schedulerPort)
if err != nil {
log.WithError(err).Fatalf("Scheduler start server error")
}
}()

versionCleaner := cleaner.NewVersionCleaner(ss, logger, eventHub)
go versionCleaner.ListenForEvents()
_ = cleaner.NewVersionCleaner(ss, logger, eventHub)

err := as.StartGrpcServer(agentPort)
// TODO - it's subtle (and thus fragile) to use the fact that this method
// is blocking to await shutdown.
// We should instead use a done channel (as elsewhere) and defer stops/shutdowns
// OR use a wait-group as defers runs sequentially.
err = as.StartGrpcServer(agentPort)
if err != nil {
log.Fatalf("Failed to start agent grpc server %s", err.Error())
}

s.StopSendModelEvents()
s.StopSendServerEvents()
eventHub.Close()
}
1 change: 1 addition & 0 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/jarcoal/httpmock v1.0.8
github.com/mustafaturan/bus/v3 v3.0.3
github.com/onsi/gomega v1.16.0
github.com/otiai10/copy v1.7.0
github.com/prometheus/client_golang v1.12.1
Expand Down
2 changes: 2 additions & 0 deletions scheduler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mustafaturan/bus/v3 v3.0.3 h1:PMEUVKpfI9FOUw32o3wAHRaBS1XGxh6cFCy/VHktPfo=
github.com/mustafaturan/bus/v3 v3.0.3/go.mod h1:JVCyq6Pb6S/IGI6LrzKH5vlBZ9ifsd1Js+wd4Y2+7Xg=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
Expand Down
34 changes: 22 additions & 12 deletions scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
)

const (
grpcMaxConcurrentStreams = 1_000_000
grpcMaxConcurrentStreams = 1_000_000
pendingSyncsQueueSize int = 10
modelEventHandlerName = "agent.server.models"
)

type ServerKey struct {
Expand All @@ -32,7 +34,6 @@ type Server struct {
logger log.FieldLogger
agents map[ServerKey]*AgentSubscriber
store store.SchedulerStore
source chan coordinator.ModelEventMsg
scheduler scheduler.Scheduler
}

Expand All @@ -46,27 +47,36 @@ type AgentSubscriber struct {
stream pb.AgentService_SubscribeServer
}

func NewAgentServer(logger log.FieldLogger,
func NewAgentServer(
logger log.FieldLogger,
store store.SchedulerStore,
scheduler scheduler.Scheduler,
hub *coordinator.ModelEventHub) *Server {
hub *coordinator.EventHub,
) *Server {
s := &Server{
logger: logger.WithField("source", "AgentServer"),
agents: make(map[ServerKey]*AgentSubscriber),
store: store,
source: make(chan coordinator.ModelEventMsg, 1),
scheduler: scheduler,
}
hub.AddListener(s.source)

hub.RegisterHandler(
modelEventHandlerName,
pendingSyncsQueueSize,
s.logger,
s.handleSyncs,
)

return s
}

func (s *Server) ListenForSyncs() {
for evt := range s.source {
s.logger.Infof("Received sync for model %s", evt.String())
modelEvtMsg := evt
go s.Sync(modelEvtMsg.ModelName)
}
func (s *Server) handleSyncs(event coordinator.ModelEventMsg) {
logger := s.logger.WithField("func", "handleSyncs")
logger.Infof("Received sync for model %s", event.String())

// TODO - Should this spawn a goroutine?
// Surely we're risking reordering of events, e.g. load/unload -> unload/load?
go s.Sync(event.ModelName)
}

func (s *Server) StartGrpcServer(agentPort uint) error {
Expand Down
3 changes: 2 additions & 1 deletion scheduler/pkg/agent/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func TestSync(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := log.New()
eventHub := &coordinator.ModelEventHub{}
eventHub, err := coordinator.NewEventHub(logger)
g.Expect(err).To(BeNil())
server := NewAgentServer(logger, test.store, nil, eventHub)
server.agents = test.agents
server.Sync(test.modelName)
Expand Down
148 changes: 122 additions & 26 deletions scheduler/pkg/coordinator/hub.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,143 @@
package coordinator

import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"

busV3 "github.com/mustafaturan/bus/v3"
log "github.com/sirupsen/logrus"
)

const (
topicModelEvents = "model.event"
)

type ModelEventMsg struct {
ModelName string
ModelVersion uint32
type SequenceGenerator struct {
counter int64
}

func (m ModelEventMsg) String() string {
return fmt.Sprintf("%s:%d", m.ModelName, m.ModelVersion)
func (g *SequenceGenerator) Generate() string {
next := atomic.AddInt64(&g.counter, 1)
return fmt.Sprintf("%d", next)
}

var _ busV3.IDGenerator = (*SequenceGenerator)(nil)

type EventHub struct {
bus *busV3.Bus
logger log.FieldLogger
modelEventHandlerChannels []chan ModelEventMsg
lock sync.RWMutex
closed bool
}

// NewEventHub creates a new EventHub with topics pre-registered.
// The logger l does not need fields preset.
func NewEventHub(l log.FieldLogger) (*EventHub, error) {
generator := &SequenceGenerator{}
bus, err := busV3.NewBus(generator)
if err != nil {
return nil, err
}

hub := EventHub{
logger: l.WithField("source", "EventHub"),
bus: bus,
}

hub.bus.RegisterTopics(topicModelEvents)

return &hub, nil
}

type ModelEventHub struct {
mu sync.RWMutex
closed bool
listeners []chan<- ModelEventMsg
func (h *EventHub) Close() {
h.lock.Lock()
defer h.lock.Unlock()

h.closed = true

for _, c := range h.modelEventHandlerChannels {
close(c)
}
}

func (h *ModelEventHub) AddListener(c chan<- ModelEventMsg) {
h.mu.Lock()
defer h.mu.Unlock()
h.listeners = append(h.listeners, c)
func (h *EventHub) RegisterHandler(
name string,
queueSize int,
logger log.FieldLogger,
handle func(event ModelEventMsg),
) {
events := make(chan ModelEventMsg, queueSize)
h.addModelEventHandlerChannel(events)

go func() {
for e := range events {
handle(e)
}
}()

handler := h.newModelEventHandler(logger, events, handle)
h.bus.RegisterHandler(name, handler)
}

func (h *ModelEventHub) TriggerModelEvent(event ModelEventMsg) {
h.mu.RLock()
defer h.mu.RUnlock()
if h.closed {
return
func (h *EventHub) newModelEventHandler(
logger log.FieldLogger,
events chan ModelEventMsg,
handle func(event ModelEventMsg),
) busV3.Handler {
handleModelEventMessage := func(_ context.Context, e busV3.Event) {
l := logger.WithField("func", "handleModelEventMessage")
l.Debugf("Received event on %s from %s (ID: %s, TxID: %s)", e.Topic, e.Source, e.ID, e.TxID)

me, ok := e.Data.(ModelEventMsg)
if !ok {
l.Warnf(
"Event (ID %s, TxID %s) on topic %s from %s is not a ModelEventMsg: %s",
e.ID,
e.TxID,
e.Topic,
e.Source,
reflect.TypeOf(e.Data).String(),
)
return
}

h.lock.RLock()
if h.closed {
return
}
events <- me
h.lock.RUnlock()
}
for _, listener := range h.listeners {
listener <- event

return busV3.Handler{
Matcher: topicModelEvents,
Handle: handleModelEventMessage,
}
}

func (h *ModelEventHub) Close() {
h.mu.Lock()
defer h.mu.Unlock()
for _, listener := range h.listeners {
close(listener)
func (h *EventHub) addModelEventHandlerChannel(c chan ModelEventMsg) {
h.lock.Lock()
defer h.lock.Unlock()

h.modelEventHandlerChannels = append(h.modelEventHandlerChannels, c)
}

func (h *EventHub) PublishModelEvent(source string, event ModelEventMsg) {
err := h.bus.EmitWithOpts(
context.Background(),
topicModelEvents,
event,
busV3.WithSource(source),
)
if err != nil {
h.logger.WithError(err).Errorf(
"unable to publish model event message from %s to %s",
source,
topicModelEvents,
)
}
h.closed = true
}
Loading

0 comments on commit 3f1faac

Please sign in to comment.