Skip to content

Commit

Permalink
feat: Automatically join the raft cluster (#3895)
Browse files Browse the repository at this point in the history
Changes `Join` to take a cluster control endpoint to send the add member
request when joining as a new member.

Changes the raft tester to support joining.

Still, it seems that recovering from crashed nodes is not working
properly, fixing that as a followup.
  • Loading branch information
jvmakine authored Jan 5, 2025
1 parent 5fd23c6 commit 0c35cea
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 136 deletions.
103 changes: 44 additions & 59 deletions cmd/raft-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,65 @@ package main

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"os"
"os/signal"
"net/url"
"time"

"github.com/alecthomas/kong"
"github.com/lni/dragonboat/v4"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"

"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/raft"
)

var cli struct {
RaftConfig raft.RaftConfig `embed:"" prefix:"raft-"`
}

type IntStateMachine struct {
sum int64
}

type IntEvent int64

func (i *IntEvent) UnmarshalBinary(data []byte) error { //nolint:unparam
*i = IntEvent(binary.BigEndian.Uint64(data))
return nil
Start startCmd `cmd:"" help:"Start the raft tester cluster."`
Join joinCmd `cmd:"" help:"Join the raft tester cluster."`
}

func (i IntEvent) MarshalBinary() ([]byte, error) { //nolint:unparam
return binary.BigEndian.AppendUint64([]byte{}, uint64(i)), nil
}
type startCmd struct{}

var _ raft.StateMachine[int64, int64, IntEvent, *IntEvent] = &IntStateMachine{}

func (s IntStateMachine) Lookup(key int64) (int64, error) {
return s.sum, nil
}

func (s *IntStateMachine) Update(msg IntEvent) error {
s.sum += int64(msg)
return nil
}
func (s *startCmd) Run() error {
ctx := log.ContextWithNewDefaultLogger(context.Background())

func (s IntStateMachine) Close() error {
return nil
}
builder := raft.NewBuilder(&cli.RaftConfig)
shard := raft.AddShard(ctx, builder, 1, &IntStateMachine{})
cluster := builder.Build(ctx)

func (s IntStateMachine) Recover(reader io.Reader) error {
err := binary.Read(reader, binary.BigEndian, &s.sum)
if err != nil {
return fmt.Errorf("failed to recover from snapshot: %w", err)
if err := cluster.Start(ctx); err != nil {
return fmt.Errorf("failed to start cluster: %w", err)
}
return nil
defer cluster.Stop(ctx)

return run(ctx, shard)
}

func (s IntStateMachine) Save(writer io.Writer) error {
err := binary.Write(writer, binary.BigEndian, s.sum)
if err != nil {
return fmt.Errorf("failed to save snapshot: %w", err)
}
return nil
type joinCmd struct {
ControlAddress *url.URL `help:"Control address to use to join the cluster."`
}

func main() {
kctx := kong.Parse(&cli)
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
func (j *joinCmd) Run() error {
ctx := log.ContextWithNewDefaultLogger(context.Background())

builder := raft.NewBuilder(&cli.RaftConfig)
shard := raft.AddShard(ctx, builder, 1, &IntStateMachine{})
cluster := builder.Build(ctx)

wg, ctx := errgroup.WithContext(ctx)
if err := cluster.Join(ctx, j.ControlAddress.String()); err != nil {
return fmt.Errorf("failed to join cluster: %w", err)
}
defer cluster.Stop(ctx)

return run(ctx, shard)
}

func run(ctx context.Context, shard *raft.ShardHandle[IntEvent, int64, int64]) error {
messages := make(chan int)

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
defer close(messages)
// send a random number every 10 seconds
Expand All @@ -94,39 +75,43 @@ func main() {
}
}
})
wg.Go(func() error {
return cluster.Start(ctx)
})

wg.Go(func() error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case msg := <-messages:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err := shard.Propose(ctx, IntEvent(msg))
if errors.Is(err, dragonboat.ErrShardNotReady) {
log.Println("shard not ready")
} else if err != nil {
if err != nil {
return fmt.Errorf("failed to propose event: %w", err)
}
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

state, err := shard.Query(ctx, 1)
if err != nil {
return fmt.Errorf("failed to query shard: %w", err)
}
log.Println("state: ", state)
cancel()
fmt.Println("state: ", state)
case <-ctx.Done():
return nil
}
}
})

if err := wg.Wait(); err != nil {
return fmt.Errorf("failed to run: %w", err)
}

return nil
}

func main() {
kctx := kong.Parse(&cli)

if err := kctx.Run(); err != nil {
kctx.FatalIfErrorf(err)
}
}
55 changes: 55 additions & 0 deletions cmd/raft-tester/statemachine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"encoding/binary"
"fmt"
"io"

"github.com/block/ftl/internal/raft"
)

type IntStateMachine struct {
sum int64
}

type IntEvent int64

func (i *IntEvent) UnmarshalBinary(data []byte) error { //nolint:unparam
*i = IntEvent(binary.BigEndian.Uint64(data))
return nil
}

func (i IntEvent) MarshalBinary() ([]byte, error) { //nolint:unparam
return binary.BigEndian.AppendUint64([]byte{}, uint64(i)), nil
}

var _ raft.StateMachine[int64, int64, IntEvent, *IntEvent] = &IntStateMachine{}

func (s IntStateMachine) Lookup(key int64) (int64, error) {
return s.sum, nil
}

func (s *IntStateMachine) Update(msg IntEvent) error {
s.sum += int64(msg)
return nil
}

func (s IntStateMachine) Close() error {
return nil
}

func (s IntStateMachine) Recover(reader io.Reader) error {
err := binary.Read(reader, binary.BigEndian, &s.sum)
if err != nil {
return fmt.Errorf("failed to recover from snapshot: %w", err)
}
return nil
}

func (s IntStateMachine) Save(writer io.Writer) error {
err := binary.Write(writer, binary.BigEndian, s.sum)
if err != nil {
return fmt.Errorf("failed to save snapshot: %w", err)
}
return nil
}
Loading

0 comments on commit 0c35cea

Please sign in to comment.