Skip to content

Commit

Permalink
feat: Channel to listen to changes (#3902)
Browse files Browse the repository at this point in the history
Unfortunately, it seems that polling for log id changes is the only way
to listen to changes in dragonboat.
  • Loading branch information
jvmakine authored Jan 6, 2025
1 parent 0bf7356 commit b0a1843
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 112 deletions.
18 changes: 7 additions & 11 deletions cmd/raft-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,23 @@ func run(ctx context.Context, shard *raft.ShardHandle[IntEvent, int64, int64]) e
}
})

changes, err := shard.Changes(ctx, 1)
if err != nil {
return fmt.Errorf("failed to get changes: %w", err)
}

wg.Go(func() error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case msg := <-messages:
err := shard.Propose(ctx, IntEvent(msg))
if err != nil {
return fmt.Errorf("failed to propose event: %w", err)
}
case <-ticker.C:
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

state, err := shard.Query(ctx, 1)
if err != nil {
return fmt.Errorf("failed to query shard: %w", err)
}
cancel()
fmt.Println("state: ", state)
case <-ctx.Done():
return nil
case c := <-changes:
fmt.Println("state: ", c)
}
}
})
Expand Down
21 changes: 21 additions & 0 deletions internal/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type EventView[View any, E Event[View]] interface {
View(ctx context.Context) (View, error)

Publish(ctx context.Context, event E) error
Changes(ctx context.Context) (chan View, error)
}

// EventStream is a stream of events that can be published and subscribed to, that update a materialized view
Expand Down Expand Up @@ -71,6 +72,26 @@ func (i *inMemoryEventStream[T, E]) Publish(ctx context.Context, e E) error {
return nil
}

func (i *inMemoryEventStream[T, E]) Changes(ctx context.Context) (chan T, error) {
updates := i.Updates().Subscribe(nil)
result := make(chan T)
go func() {
for {
select {
case <-ctx.Done():
return
case _, ok := <-updates:
if !ok {
close(result)
return
}
result <- i.view
}
}
}()
return result, nil
}

func (i *inMemoryEventStream[T, E]) View(ctx context.Context) (T, error) {
return i.view, nil
}
Expand Down
94 changes: 89 additions & 5 deletions internal/raft/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/atomic"
"github.com/jpillora/backoff"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/client"
Expand All @@ -33,6 +34,9 @@ type RaftConfig struct {
ControlBind *url.URL `help:"Address to listen for control traffic. If empty, no control listener will be started."`
ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"`
Retry retry.RetryConfig `help:"Connection retry configuration" prefix:"retry-" embed:""`
ChangesInterval time.Duration `help:"Interval for changes to be checked" default:"10ms"`
ChangesTimeout time.Duration `help:"Timeout for changes to be checked" default:"1s"`

// Raft configuration
RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"`
ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"`
Expand Down Expand Up @@ -87,7 +91,9 @@ type Cluster struct {
shards map[uint64]statemachine.CreateStateMachineFunc
controlClient *http.Client

closeControlService context.CancelFunc
// runningCtx is cancelled when the cluster is stopped.
runningCtx context.Context
runningCtxCancel context.CancelFunc
}

var _ raftpbconnect.RaftServiceHandler = (*Cluster)(nil)
Expand Down Expand Up @@ -122,6 +128,8 @@ type ShardHandle[E Event, Q any, R any] struct {
cluster *Cluster
session *client.Session

lastKnownIndex atomic.Value[uint64]

mu sync.Mutex
}

Expand Down Expand Up @@ -183,6 +191,81 @@ func (s *ShardHandle[E, Q, R]) Query(ctx context.Context, query Q) (R, error) {
return response, nil
}

// Changes returns a channel that will receive the result of the query when the
// shard state changes.
//
// This can only be called when the cluster is running.
//
// Note, that this is not guaranteed to receive an event for every change, but
// will always receive the latest state of the shard.
func (s *ShardHandle[E, Q, R]) Changes(ctx context.Context, query Q) (chan R, error) {
if s.cluster.nh == nil {
panic("cluster not started")
}

result := make(chan R)
logger := log.FromContext(ctx).Scope("raft")

// get the last known index as the starting point
reader, err := s.cluster.nh.GetLogReader(s.shardID)
if err != nil {
logger.Errorf(err, "failed to get log reader")
}
_, last := reader.GetRange()
s.lastKnownIndex.Store(last)

go func() {
// poll, as dragoboat does not have a way to listen to changes directly
timer := time.NewTicker(s.cluster.config.ChangesInterval)
defer timer.Stop()

for {
select {
case <-s.cluster.runningCtx.Done():
logger.Infof("changes channel closed")
close(result)

return
case <-timer.C:
last, err := s.getLastIndex()
if err != nil {
logger.Warnf("failed to get last index: %s", err)
} else if last > s.lastKnownIndex.Load() {
logger.Debugf("changes detected, last known index: %d, new index: %d", s.lastKnownIndex.Load(), last)

s.lastKnownIndex.Store(last)

ctx, cancel := context.WithTimeout(ctx, s.cluster.config.ChangesTimeout)
res, err := s.Query(ctx, query)
cancel()

if err != nil {
logger.Errorf(err, "failed to query shard")
} else {
result <- res
}
}
}
}
}()

return result, nil
}

func (s *ShardHandle[E, Q, R]) getLastIndex() (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

s.verifyReady()

reader, err := s.cluster.nh.GetLogReader(s.shardID)
if err != nil {
return 0, fmt.Errorf("failed to get log reader: %w", err)
}
_, last := reader.GetRange()
return last, nil
}

func (s *ShardHandle[E, Q, R]) verifyReady() {
if s.cluster == nil {
panic("cluster not built")
Expand Down Expand Up @@ -263,6 +346,10 @@ func (c *Cluster) start(ctx context.Context, join bool) error {
}
}

ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
c.runningCtxCancel = cancel
c.runningCtx = ctx

if err := c.startControlServer(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -299,9 +386,6 @@ func (c *Cluster) startShard(nh *dragonboat.NodeHost, shardID uint64, sm statema
func (c *Cluster) startControlServer(ctx context.Context) error {
logger := log.FromContext(ctx).Scope("raft")

ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
c.closeControlService = cancel

if c.config.ControlBind == nil {
return nil
}
Expand All @@ -326,8 +410,8 @@ func (c *Cluster) Stop(ctx context.Context) {
for shardID := range c.shards {
c.removeShardMember(ctx, shardID, c.config.ReplicaID)
}
c.runningCtxCancel()
c.nh.Close()
c.closeControlService()
c.nh = nil
c.shards = nil
}
Expand Down
Loading

0 comments on commit b0a1843

Please sign in to comment.