Skip to content

Commit

Permalink
chore: when closing a nodehost, self deregister from all shards (#3887)
Browse files Browse the repository at this point in the history
Also, adds retries in the case of timeouts when an election is in
progress
  • Loading branch information
jvmakine authored Jan 2, 2025
1 parent 17e177c commit e0932a1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 34 deletions.
56 changes: 36 additions & 20 deletions internal/raft/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type RaftConfig struct {
ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"`
// Raft configuration
RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"`
ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"`
ElectionTimeoutRTT uint64 `help:"Election timeout RTT as a multiple of RTT" default:"10"`
HeartbeatRTT uint64 `help:"Heartbeat RTT as a multiple of RTT" default:"1"`
SnapshotEntries uint64 `help:"Snapshot entries" default:"10"`
CompactionOverhead uint64 `help:"Compaction overhead" default:"100"`
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *ShardHandle[E, Q, R]) Propose(ctx context.Context, msg E) error {
s.session = s.cluster.nh.GetNoOPSession(s.shardID)
}

if err := s.cluster.withRetry(ctx, s.shardID, s.cluster.config.ReplicaID, func() error {
if err := s.cluster.withRetry(ctx, s.shardID, s.cluster.config.ReplicaID, func(ctx context.Context) error {
_, err := s.cluster.nh.SyncPropose(ctx, s.session, msgBytes)
return err //nolint:wrapcheck
}); err != nil {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *Cluster) start(ctx context.Context, join bool) error {
ReplicaID: c.config.ReplicaID,
ShardID: shardID,
CheckQuorum: true,
ElectionRTT: c.config.ElectionRTT,
ElectionRTT: c.config.ElectionTimeoutRTT,
HeartbeatRTT: c.config.HeartbeatRTT,
SnapshotEntries: c.config.SnapshotEntries,
CompactionOverhead: c.config.CompactionOverhead,
Expand Down Expand Up @@ -208,11 +208,17 @@ func (c *Cluster) start(ctx context.Context, join bool) error {
}

// Stop the node host and all shards.
func (c *Cluster) Stop() error {
func (c *Cluster) Stop(ctx context.Context) error {
if c.nh == nil {
return nil
}

for shardID := range c.shards {
if err := c.removeShardMember(ctx, shardID, c.config.ReplicaID); err != nil {
return fmt.Errorf("failed to remove shard (%d) member: %w", shardID, err)
}
}

c.nh.Close()
c.nh = nil

Expand All @@ -225,35 +231,31 @@ func (c *Cluster) AddMember(ctx context.Context, shardID uint64, replicaID uint6
logger := log.FromContext(ctx).Scope("raft")
logger.Infof("adding member %s to shard %d on replica %d", address, shardID, replicaID)

if err := c.withRetry(ctx, shardID, replicaID, func() error {
if err := c.withRetry(ctx, shardID, replicaID, func(ctx context.Context) error {
return c.nh.SyncRequestAddReplica(ctx, shardID, replicaID, address, 0)
}); err != nil {
return fmt.Errorf("failed to add member: %w", err)
}
return nil
}

// RemoveMember from the cluster. This needs to be called on an existing running cluster member,
// before the member is stopped.
func (c *Cluster) RemoveMember(ctx context.Context, shardID uint64, replicaID uint64) error {
// removeShardMember from the given shard. This removes the given member from the membership group
// and blocks until the change has been committed
func (c *Cluster) removeShardMember(ctx context.Context, shardID uint64, replicaID uint64) error {
logger := log.FromContext(ctx).Scope("raft")
logger.Infof("removing member from shard %d on replica %d", shardID, replicaID)
logger.Infof("removing replica %d from shard %d", shardID, replicaID)

if err := c.withRetry(ctx, shardID, replicaID, func() error {
if err := c.withRetry(ctx, shardID, replicaID, func(ctx context.Context) error {
return c.nh.SyncRequestDeleteReplica(ctx, shardID, replicaID, 0)
}); err != nil {
return fmt.Errorf("failed to remove member: %w", err)
}
// if we removed the leader, we wait for the shard to be ready again
if err := c.waitReady(ctx, shardID); err != nil {
return fmt.Errorf("failed to wait for shard %d to be ready on replica %d: %w", shardID, replicaID, err)
}
return nil
}

// withTimeout runs an async dragonboat call and blocks until it succeeds or the context is cancelled.
// the call is retried if the request is dropped, which can happen if the leader is not available.
func (c *Cluster) withRetry(ctx context.Context, shardID, replicaID uint64, f func() error) error {
func (c *Cluster) withRetry(ctx context.Context, shardID, replicaID uint64, f func(ctx context.Context) error) error {
retry := backoff.Backoff{
Min: c.config.RTT,
Max: c.config.ShardReadyTimeout,
Expand All @@ -263,16 +265,30 @@ func (c *Cluster) withRetry(ctx context.Context, shardID, replicaID uint64, f fu
logger := log.FromContext(ctx).Scope("raft")

for {
err := f()
// Timeout for the proposal to reach the leader and reach a quorum.
// If the leader is not available, the proposal will time out, in which case
// we retry the operation.
timeout := time.Duration(c.config.ElectionTimeoutRTT) * c.config.RTT
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

err := f(ctx)
duration := retry.Duration()
if errors.Is(err, dragonboat.ErrShardNotReady) {
logger.Debugf("shard not ready, retrying in %s", retry.Duration())
time.Sleep(retry.Duration())
logger.Debugf("shard not ready, retrying in %s", duration)
time.Sleep(duration)
if _, ok := <-ctx.Done(); ok {
return fmt.Errorf("context cancelled")
}
continue
}
if err != nil {
} else if errors.Is(err, dragonboat.ErrTimeout) {
logger.Debugf("timeout, retrying in %s", duration)
time.Sleep(duration)
if _, ok := <-ctx.Done(); ok {
return fmt.Errorf("context cancelled")
}
continue
} else if err != nil {
return fmt.Errorf("failed to submit request to shard %d on replica %d: %w", shardID, replicaID, err)
}
return nil
Expand Down
23 changes: 11 additions & 12 deletions internal/raft/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestCluster(t *testing.T) {
wg.Go(func() error { return cluster1.Start(wctx) })
wg.Go(func() error { return cluster2.Start(wctx) })
assert.NoError(t, wg.Wait())
defer cluster1.Stop()
defer cluster2.Stop()
defer cluster1.Stop(ctx) //nolint:errcheck
defer cluster2.Stop(ctx) //nolint:errcheck

assert.NoError(t, shard1_1.Propose(ctx, IntEvent(1)))
assert.NoError(t, shard2_1.Propose(ctx, IntEvent(2)))
Expand Down Expand Up @@ -97,8 +97,8 @@ func TestJoiningExistingCluster(t *testing.T) {
wg.Go(func() error { return cluster1.Start(wctx) })
wg.Go(func() error { return cluster2.Start(wctx) })
assert.NoError(t, wg.Wait())
defer cluster1.Stop()
defer cluster2.Stop()
defer cluster1.Stop(ctx) //nolint:errcheck
defer cluster2.Stop(ctx) //nolint:errcheck

t.Log("join to the existing cluster as a new member")
builder3 := testBuilder(t, nil, 3, members[2].String())
Expand All @@ -108,7 +108,7 @@ func TestJoiningExistingCluster(t *testing.T) {
assert.NoError(t, cluster1.AddMember(ctx, 1, 3, members[2].String()))

assert.NoError(t, cluster3.Join(ctx))
defer cluster3.Stop()
defer cluster3.Stop(ctx) //nolint:errcheck

assert.NoError(t, shard3.Propose(ctx, IntEvent(1)))

Expand All @@ -121,7 +121,7 @@ func TestJoiningExistingCluster(t *testing.T) {

assert.NoError(t, cluster3.AddMember(ctx, 1, 4, members[3].String()))
assert.NoError(t, cluster4.Join(ctx))
defer cluster4.Stop()
defer cluster4.Stop(ctx) //nolint:errcheck

assert.NoError(t, shard4.Propose(ctx, IntEvent(1)))

Expand Down Expand Up @@ -151,17 +151,16 @@ func TestLeavingCluster(t *testing.T) {
wg.Go(func() error { return cluster2.Start(wctx) })
wg.Go(func() error { return cluster3.Start(wctx) })
assert.NoError(t, wg.Wait())
defer cluster1.Stop() //nolint:errcheck
defer cluster2.Stop() //nolint:errcheck
defer cluster3.Stop() //nolint:errcheck
defer cluster1.Stop(ctx) //nolint:errcheck
defer cluster2.Stop(ctx) //nolint:errcheck
defer cluster3.Stop(ctx) //nolint:errcheck

t.Log("proposing event")
assert.NoError(t, shard1.Propose(ctx, IntEvent(1)))
assertShardValue(ctx, t, 1, shard1, shard2, shard3)

t.Log("removing member")
assert.NoError(t, cluster2.RemoveMember(ctx, 1, 1))
assert.NoError(t, cluster1.Stop())
assert.NoError(t, cluster1.Stop(ctx))

t.Log("proposing event after removal")
assert.NoError(t, shard2.Propose(ctx, IntEvent(1)))
Expand All @@ -180,7 +179,7 @@ func testBuilder(t *testing.T, addresses []*net.TCPAddr, id uint64, address stri
DataDir: t.TempDir(),
InitialMembers: members,
HeartbeatRTT: 1,
ElectionRTT: 10,
ElectionTimeoutRTT: 10,
SnapshotEntries: 10,
CompactionOverhead: 10,
RTT: 10 * time.Millisecond,
Expand Down
4 changes: 2 additions & 2 deletions internal/raft/eventview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestEventView(t *testing.T) {
eg.Go(func() error { return cluster1.Start(wctx) })
eg.Go(func() error { return cluster2.Start(wctx) })
assert.NoError(t, eg.Wait())
defer cluster1.Stop()
defer cluster2.Stop()
defer cluster1.Stop(ctx) //nolint:errcheck
defer cluster2.Stop(ctx) //nolint:errcheck

assert.NoError(t, view1.Publish(ctx, IntStreamEvent{Value: 1}))

Expand Down

0 comments on commit e0932a1

Please sign in to comment.