From e7a41c760f166a16ea65e95c167cc42006402163 Mon Sep 17 00:00:00 2001 From: arnauds5 Date: Sun, 30 Jun 2024 14:46:53 +0200 Subject: [PATCH 1/4] Add some delay before servers are sync --- .../federation/hfederation/federation.go | 50 +++++++++++++++++-- .../federation/hfederation/federation_test.go | 5 +- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation.go b/be1-go/internal/handler/channel/federation/hfederation/federation.go index 9f518f83f0..c322da470b 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation.go @@ -39,6 +39,7 @@ type Hub interface { type Subscribers interface { BroadcastToAllClients(msg mmessage.Message, channel string) error AddChannel(channel string) error + HasChannel(channel string) bool Subscribe(channel string, socket socket.Socket) error SendToAll(buf []byte, channel string) error } @@ -378,6 +379,7 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, // called => broadcast the result to both federation channels directly. _ = h.db.StoreMessageAndData(remoteChannel, resultMsg) _ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel) + _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) h.log.Info().Msgf("A federation was created with the local LAO %s", federationExpect.LaoId) @@ -398,10 +400,22 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, } h.log.Info().Msgf("A federation was created with the LAO %s from: %s", federationExpect.LaoId, federationExpect.ServerAddress) + + if h.subs.HasChannel(remoteChannel) { + // If the server was already sync, no need to add a goroutine + return h.subs.BroadcastToAllClients(msg, channelPath) + } + + go func() { + // wait until the remote channel is available to be subscribed on + h.waitSyncOrTimeout(remoteChannel, time.Second*30) + + // broadcast the FederationResult to the local organizer + _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) + }() } - // broadcast the FederationResult to the local organizer - return h.subs.BroadcastToAllClients(resultMsg, channelPath) + return nil } func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { @@ -445,7 +459,7 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { // try to get a matching FederationInit, if found then we know that // the local organizer was waiting this result - _, err = h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath) + federationInit, err := h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath) if err != nil { return err } @@ -455,7 +469,20 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { return err } - return h.subs.BroadcastToAllClients(msg, channelPath) + remoteChannel := fmt.Sprintf("/root/%s/federation", federationInit.LaoId) + if h.subs.HasChannel(remoteChannel) { + // If the server was already sync, no need to add a goroutine + return h.subs.BroadcastToAllClients(msg, channelPath) + } + + go func() { + // wait until the remote channel is available to be subscribed on + h.waitSyncOrTimeout(remoteChannel, time.Second*30) + + _ = h.subs.BroadcastToAllClients(msg, channelPath) + }() + + return nil } func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error { @@ -618,3 +645,18 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string, socket.Send(publishBytes) return nil } + +// waitSyncOrTimeout will wait at most maxTime or until the channel is created +func (h *Handler) waitSyncOrTimeout(channelPath string, maxTime time.Duration) { + timeout := time.NewTimer(maxTime) + for { + select { + case <-timeout.C: + return + case <-time.After(time.Second): + if h.subs.HasChannel(channelPath) { + return + } + } + } +} diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation_test.go b/be1-go/internal/handler/channel/federation/hfederation/federation_test.go index 0033dd4266..728bc9149c 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation_test.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation_test.go @@ -715,8 +715,6 @@ func Test_handleFederationChallenge(t *testing.T) { require.NoError(t, err) fakeSocket2 := mock2.FakeSocket{Id: "2"} - err = subs.Subscribe(channelPath2, &fakeSocket2) - require.NoError(t, err) rumors.On("SendRumorStateTo", &fakeSocket2).Return(nil) @@ -770,10 +768,9 @@ func Test_handleFederationChallenge(t *testing.T) { err = json.Unmarshal(fakeSocket2.Msg, &publishMsg) require.NoError(t, err) require.Equal(t, mquery.MethodPublish, publishMsg.Method) - require.Equal(t, broadcastMsg.Params.Message, publishMsg.Params.Message) var resultMsg mfederation.FederationResult - err = broadcastMsg.Params.Message.UnmarshalData(&resultMsg) + err = publishMsg.Params.Message.UnmarshalData(&resultMsg) require.NoError(t, err) // it should contain the challenge from organizer, not organizer2 From dc400a6f00e0eafb133c8af8be8590f614163018 Mon Sep 17 00:00:00 2001 From: arnauds5 Date: Sun, 30 Jun 2024 15:20:55 +0200 Subject: [PATCH 2/4] reduce complexity --- .../federation/hfederation/federation.go | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation.go b/be1-go/internal/handler/channel/federation/hfederation/federation.go index c322da470b..7c0e824293 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation.go @@ -383,38 +383,39 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, h.log.Info().Msgf("A federation was created with the local LAO %s", federationExpect.LaoId) - } else { - // Add the socket to the list of server sockets - h.sockets.Upsert(socket) - - // Send the rumor state directly to avoid delay while syncing - err = h.rumors.SendRumorStateTo(socket) - if err != nil { - return err - } - // publish the FederationResult to the other server - err = h.publishTo(resultMsg, remoteChannel, socket) - if err != nil { - return err - } - h.log.Info().Msgf("A federation was created with the LAO %s from: %s", - federationExpect.LaoId, federationExpect.ServerAddress) + return nil + } - if h.subs.HasChannel(remoteChannel) { - // If the server was already sync, no need to add a goroutine - return h.subs.BroadcastToAllClients(msg, channelPath) - } + // Add the socket to the list of server sockets + h.sockets.Upsert(socket) - go func() { - // wait until the remote channel is available to be subscribed on - h.waitSyncOrTimeout(remoteChannel, time.Second*30) + // Send the rumor state directly to avoid delay while syncing + err = h.rumors.SendRumorStateTo(socket) + if err != nil { + return err + } - // broadcast the FederationResult to the local organizer - _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) - }() + // publish the FederationResult to the other server + err = h.publishTo(resultMsg, remoteChannel, socket) + if err != nil { + return err } + h.log.Info().Msgf("A federation was created with the LAO %s from: %s", + federationExpect.LaoId, federationExpect.ServerAddress) + if h.subs.HasChannel(remoteChannel) { + // If the server was already sync, no need to add a goroutine + return h.subs.BroadcastToAllClients(msg, channelPath) + } + + go func() { + // wait until the remote channel is available to be subscribed on + h.waitSyncOrTimeout(remoteChannel, time.Second*30) + + // broadcast the FederationResult to the local organizer + _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) + }() return nil } From a0736a970dea534cb33a32c9756bb41c9869c8b6 Mon Sep 17 00:00:00 2001 From: arnauds5 Date: Sun, 30 Jun 2024 16:17:31 +0200 Subject: [PATCH 3/4] fix wrong channelPath --- .../federation/hfederation/federation.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation.go b/be1-go/internal/handler/channel/federation/hfederation/federation.go index 7c0e824293..ee3ddf20a9 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation.go @@ -367,16 +367,12 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, return err } - err = h.db.StoreMessageAndData(channelPath, resultMsg) - if err != nil { - return err - } - remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId) if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil { // In the edge case where the two LAOs are on the same server, the // result message would already be stored and handleResult will not be // called => broadcast the result to both federation channels directly. + _ = h.db.StoreMessageAndData(channelPath, resultMsg) _ = h.db.StoreMessageAndData(remoteChannel, resultMsg) _ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel) _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) @@ -410,8 +406,12 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, } go func() { + remoteLaoChannel := fmt.Sprintf("/root/%s", federationExpect.LaoId) + // wait until the remote channel is available to be subscribed on - h.waitSyncOrTimeout(remoteChannel, time.Second*30) + h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30) + + _ = h.db.StoreMessageAndData(channelPath, resultMsg) // broadcast the FederationResult to the local organizer _ = h.subs.BroadcastToAllClients(resultMsg, channelPath) @@ -470,15 +470,15 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { return err } - remoteChannel := fmt.Sprintf("/root/%s/federation", federationInit.LaoId) - if h.subs.HasChannel(remoteChannel) { + remoteLaoChannel := fmt.Sprintf("/root/%s", federationInit.LaoId) + if h.subs.HasChannel(remoteLaoChannel) { // If the server was already sync, no need to add a goroutine return h.subs.BroadcastToAllClients(msg, channelPath) } go func() { // wait until the remote channel is available to be subscribed on - h.waitSyncOrTimeout(remoteChannel, time.Second*30) + h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30) _ = h.subs.BroadcastToAllClients(msg, channelPath) }() @@ -656,6 +656,7 @@ func (h *Handler) waitSyncOrTimeout(channelPath string, maxTime time.Duration) { return case <-time.After(time.Second): if h.subs.HasChannel(channelPath) { + h.log.Info().Msgf("channel %s exists", channelPath) return } } From cf8dd55ab6e83ab5c208c24fcd7fbdf5cf973f47 Mon Sep 17 00:00:00 2001 From: arnauds5 Date: Sun, 30 Jun 2024 16:36:01 +0200 Subject: [PATCH 4/4] Add missing StoreMessageAndData, fix test --- .../channel/federation/hfederation/federation.go | 11 ++++++----- .../channel/federation/hfederation/federation_test.go | 9 +++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation.go b/be1-go/internal/handler/channel/federation/hfederation/federation.go index ee3ddf20a9..dae39a6fcd 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation.go @@ -401,6 +401,8 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string, federationExpect.LaoId, federationExpect.ServerAddress) if h.subs.HasChannel(remoteChannel) { + _ = h.db.StoreMessageAndData(channelPath, resultMsg) + // If the server was already sync, no need to add a goroutine return h.subs.BroadcastToAllClients(msg, channelPath) } @@ -465,13 +467,10 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { return err } - err = h.db.StoreMessageAndData(channelPath, msg) - if err != nil { - return err - } - remoteLaoChannel := fmt.Sprintf("/root/%s", federationInit.LaoId) if h.subs.HasChannel(remoteLaoChannel) { + _ = h.db.StoreMessageAndData(channelPath, msg) + // If the server was already sync, no need to add a goroutine return h.subs.BroadcastToAllClients(msg, channelPath) } @@ -480,6 +479,8 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error { // wait until the remote channel is available to be subscribed on h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30) + _ = h.db.StoreMessageAndData(channelPath, msg) + _ = h.subs.BroadcastToAllClients(msg, channelPath) }() diff --git a/be1-go/internal/handler/channel/federation/hfederation/federation_test.go b/be1-go/internal/handler/channel/federation/hfederation/federation_test.go index 728bc9149c..33976670a0 100644 --- a/be1-go/internal/handler/channel/federation/hfederation/federation_test.go +++ b/be1-go/internal/handler/channel/federation/hfederation/federation_test.go @@ -702,9 +702,15 @@ func Test_handleFederationChallenge(t *testing.T) { laoID := "lsWUv1bKBQ0t1DqWZTFwb0nhLsP_EtfGoXHny4hsrwA=" laoID2 := "OWY4NmQwODE4ODRjN2Q2NTlhMmZlYWEwYzU1YWQwMQ==" laoPath := fmt.Sprintf("/root/%s", laoID) + laoPath2 := fmt.Sprintf("/root/%s", laoID2) channelPath := fmt.Sprintf("/root/%s/federation", laoID) channelPath2 := fmt.Sprintf("/root/%s/federation", laoID2) + err = subs.AddChannel(laoPath) + require.NoError(t, err) + err = subs.AddChannel(laoPath2) + require.NoError(t, err) + err = subs.AddChannel(channelPath) require.NoError(t, err) err = subs.AddChannel(channelPath2) @@ -812,6 +818,9 @@ func Test_handleFederationResult(t *testing.T) { laoPath := fmt.Sprintf("/root/%s", laoID) channelPath := fmt.Sprintf("/root/%s/federation", laoID) + err = subs.AddChannel(laoPath) + require.NoError(t, err) + err = subs.AddChannel(channelPath) require.NoError(t, err)