Skip to content

Commit

Permalink
chore: add a raft library (#3795)
Browse files Browse the repository at this point in the history
This is a typed version of dragonboat APIs. It also does some
simplifications, namely
- No immutable files in snapshots
- No support for cancelling snapshot save / restore
- No active sessions, which can result into duplicate events on timeouts
/ errors.

Also, it does not support cleanly removing members from the cluster yet.

However, it supports
- typed events
- typed queries
- snapshotting
- joining to an existing cluster as a new member.

Also, I did not enable gossip protocol that can be used to retain node
identity over changing addresses, as stateful sets should have a
persistent address in K8s

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] authored Dec 19, 2024
1 parent e471620 commit eba6400
Show file tree
Hide file tree
Showing 75 changed files with 1,090 additions and 0 deletions.
1 change: 1 addition & 0 deletions backend/admin/testdata/go/dischema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/console/testdata/go/console/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/leases/testdata/go/leases/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/sql/testdata/go/database/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/sql/testdata/go/mysql/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/ingress/testdata/go/httpingress/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/provisioner/testdata/go/echo/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/runner/pubsub/testdata/go/publisher/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/runner/pubsub/testdata/go/subscriber/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/cron/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/echo/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/ingress/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/publisher/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/subscriber/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/timeline/testdata/go/time/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions cmd/raft-tester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"os"
"os/signal"
"time"

"github.com/alecthomas/kong"
"github.com/lni/dragonboat/v4"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"

"github.com/block/ftl/internal/raft"
)

var cli struct {
RaftConfig raft.RaftConfig `embed:"" prefix:"raft-"`
}

type IntStateMachine struct {
sum int64
}

type IntEvent int64

func (i *IntEvent) UnmarshalBinary(data []byte) error { //nolint:unparam
*i = IntEvent(binary.BigEndian.Uint64(data))
return nil
}

func (i IntEvent) MarshalBinary() ([]byte, error) { //nolint:unparam
return binary.BigEndian.AppendUint64([]byte{}, uint64(i)), nil
}

var _ raft.StateMachine[int64, int64, IntEvent, *IntEvent] = &IntStateMachine{}

func (s IntStateMachine) Lookup(key int64) (int64, error) {
return s.sum, nil
}

func (s *IntStateMachine) Update(msg IntEvent) error {
s.sum += int64(msg)
return nil
}

func (s IntStateMachine) Close() error {
return nil
}

func (s IntStateMachine) Recover(reader io.Reader) error {
err := binary.Read(reader, binary.BigEndian, &s.sum)
if err != nil {
return fmt.Errorf("failed to recover from snapshot: %w", err)
}
return nil
}

func (s IntStateMachine) Save(writer io.Writer) error {
err := binary.Write(writer, binary.BigEndian, s.sum)
if err != nil {
return fmt.Errorf("failed to save snapshot: %w", err)
}
return nil
}

func main() {
kctx := kong.Parse(&cli)
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)

cluster := raft.New(&cli.RaftConfig)
shard := raft.AddShard(ctx, cluster, 1, &IntStateMachine{})

wg, ctx := errgroup.WithContext(ctx)
messages := make(chan int)

wg.Go(func() error {
defer close(messages)
// send a random number every 10 seconds
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
messages <- rand.Intn(1000)
case <-ctx.Done():
return nil
}
}
})
wg.Go(func() error {
return cluster.Start(ctx)
})
wg.Go(func() error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case msg := <-messages:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err := shard.Propose(ctx, IntEvent(msg))
if errors.Is(err, dragonboat.ErrShardNotReady) {
log.Println("shard not ready")
} else if err != nil {
return fmt.Errorf("failed to propose event: %w", err)
}
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

state, err := shard.Query(ctx, 1)
if err != nil {
return fmt.Errorf("failed to query shard: %w", err)
}
log.Println("state: ", state)
case <-ctx.Done():
return nil
}
}
})

if err := wg.Wait(); err != nil {
kctx.FatalIfErrorf(err)
}
}
1 change: 1 addition & 0 deletions common/internal/tests/testdata/go/omitempty/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/internal/tests/testdata/go/runtimereflection/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/cron/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/echo/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/http/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/mysql/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/postgres/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/go/pubsub/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit eba6400

Please sign in to comment.