Skip to content

Commit

Permalink
Merge pull request #1970 from dedis/work-be1-arnauds5-delay-federatio…
Browse files Browse the repository at this point in the history
…nresult-after-sync

Add some delay before servers are sync
  • Loading branch information
arnauds5 authored Jun 30, 2024
2 parents 239d7b5 + cf8dd55 commit 4901524
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 32 deletions.
101 changes: 73 additions & 28 deletions be1-go/internal/handler/channel/federation/hfederation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Hub interface {
type Subscribers interface {
BroadcastToAllClients(msg mmessage.Message, channel string) error
AddChannel(channel string) error
HasChannel(channel string) bool
Subscribe(channel string, socket socket.Socket) error
SendToAll(buf []byte, channel string) error
}
Expand Down Expand Up @@ -366,42 +367,58 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
return err
}

err = h.db.StoreMessageAndData(channelPath, resultMsg)
if err != nil {
return err
}

remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil {
// In the edge case where the two LAOs are on the same server, the
// result message would already be stored and handleResult will not be
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(channelPath, resultMsg)
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)
_ = h.subs.BroadcastToAllClients(resultMsg, channelPath)

h.log.Info().Msgf("A federation was created with the local LAO %s",
federationExpect.LaoId)
} else {
// Add the socket to the list of server sockets
h.sockets.Upsert(socket)

// Send the rumor state directly to avoid delay while syncing
err = h.rumors.SendRumorStateTo(socket)
if err != nil {
return err
}

// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}
h.log.Info().Msgf("A federation was created with the LAO %s from: %s",
federationExpect.LaoId, federationExpect.ServerAddress)
return nil
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)

// Send the rumor state directly to avoid delay while syncing
err = h.rumors.SendRumorStateTo(socket)
if err != nil {
return err
}

// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}
h.log.Info().Msgf("A federation was created with the LAO %s from: %s",
federationExpect.LaoId, federationExpect.ServerAddress)

if h.subs.HasChannel(remoteChannel) {
_ = h.db.StoreMessageAndData(channelPath, resultMsg)

// If the server was already sync, no need to add a goroutine
return h.subs.BroadcastToAllClients(msg, channelPath)
}

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
go func() {
remoteLaoChannel := fmt.Sprintf("/root/%s", federationExpect.LaoId)

// wait until the remote channel is available to be subscribed on
h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30)

_ = h.db.StoreMessageAndData(channelPath, resultMsg)

// broadcast the FederationResult to the local organizer
_ = h.subs.BroadcastToAllClients(resultMsg, channelPath)
}()
return nil
}

func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {
Expand Down Expand Up @@ -445,17 +462,29 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {

// try to get a matching FederationInit, if found then we know that
// the local organizer was waiting this result
_, err = h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath)
federationInit, err := h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath)
if err != nil {
return err
}

err = h.db.StoreMessageAndData(channelPath, msg)
if err != nil {
return err
remoteLaoChannel := fmt.Sprintf("/root/%s", federationInit.LaoId)
if h.subs.HasChannel(remoteLaoChannel) {
_ = h.db.StoreMessageAndData(channelPath, msg)

// If the server was already sync, no need to add a goroutine
return h.subs.BroadcastToAllClients(msg, channelPath)
}

return h.subs.BroadcastToAllClients(msg, channelPath)
go func() {
// wait until the remote channel is available to be subscribed on
h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30)

_ = h.db.StoreMessageAndData(channelPath, msg)

_ = h.subs.BroadcastToAllClients(msg, channelPath)
}()

return nil
}

func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error {
Expand Down Expand Up @@ -618,3 +647,19 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string,
socket.Send(publishBytes)
return nil
}

// waitSyncOrTimeout will wait at most maxTime or until the channel is created
func (h *Handler) waitSyncOrTimeout(channelPath string, maxTime time.Duration) {
timeout := time.NewTimer(maxTime)
for {
select {
case <-timeout.C:
return
case <-time.After(time.Second):
if h.subs.HasChannel(channelPath) {
h.log.Info().Msgf("channel %s exists", channelPath)
return
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,15 @@ func Test_handleFederationChallenge(t *testing.T) {
laoID := "lsWUv1bKBQ0t1DqWZTFwb0nhLsP_EtfGoXHny4hsrwA="
laoID2 := "OWY4NmQwODE4ODRjN2Q2NTlhMmZlYWEwYzU1YWQwMQ=="
laoPath := fmt.Sprintf("/root/%s", laoID)
laoPath2 := fmt.Sprintf("/root/%s", laoID2)
channelPath := fmt.Sprintf("/root/%s/federation", laoID)
channelPath2 := fmt.Sprintf("/root/%s/federation", laoID2)

err = subs.AddChannel(laoPath)
require.NoError(t, err)
err = subs.AddChannel(laoPath2)
require.NoError(t, err)

err = subs.AddChannel(channelPath)
require.NoError(t, err)
err = subs.AddChannel(channelPath2)
Expand All @@ -715,8 +721,6 @@ func Test_handleFederationChallenge(t *testing.T) {
require.NoError(t, err)

fakeSocket2 := mock2.FakeSocket{Id: "2"}
err = subs.Subscribe(channelPath2, &fakeSocket2)
require.NoError(t, err)

rumors.On("SendRumorStateTo", &fakeSocket2).Return(nil)

Expand Down Expand Up @@ -770,10 +774,9 @@ func Test_handleFederationChallenge(t *testing.T) {
err = json.Unmarshal(fakeSocket2.Msg, &publishMsg)
require.NoError(t, err)
require.Equal(t, mquery.MethodPublish, publishMsg.Method)
require.Equal(t, broadcastMsg.Params.Message, publishMsg.Params.Message)

var resultMsg mfederation.FederationResult
err = broadcastMsg.Params.Message.UnmarshalData(&resultMsg)
err = publishMsg.Params.Message.UnmarshalData(&resultMsg)
require.NoError(t, err)

// it should contain the challenge from organizer, not organizer2
Expand Down Expand Up @@ -815,6 +818,9 @@ func Test_handleFederationResult(t *testing.T) {
laoPath := fmt.Sprintf("/root/%s", laoID)
channelPath := fmt.Sprintf("/root/%s/federation", laoID)

err = subs.AddChannel(laoPath)
require.NoError(t, err)

err = subs.AddChannel(channelPath)
require.NoError(t, err)

Expand Down

0 comments on commit 4901524

Please sign in to comment.