From d22c5ebd72c1743d7623bd9e49ef94ae763a226a Mon Sep 17 00:00:00 2001 From: alwx Date: Mon, 25 Mar 2024 13:32:12 +0100 Subject: [PATCH] Fixes, more data syncing functions --- protocol/common/message_sender.go | 2 +- protocol/messenger_peersyncing.go | 79 ++++++++++++++++--- protocol/messenger_peersyncing_test.go | 2 + protocol/peersyncing/peersyncing.go | 8 +- protocol/peersyncing/peersyncing_test.go | 22 +++--- .../peersyncing/sync_message_persistence.go | 8 +- 6 files changed, 88 insertions(+), 33 deletions(-) diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index dad56cd3bc6..a2c51b2766d 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -748,7 +748,6 @@ func (s *MessageSender) SendPublic( // unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success // returns cloned messages with replaced payloads -// TODO(alwx): peer syncing func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error { datasyncMessage, err := s.datasync.Unwrap( @@ -762,6 +761,7 @@ func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, respo response.DatasyncSender = m.SigPubKey() response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...) response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...) + // TODO(alwx): peer syncing for _, o := range datasyncMessage.GroupOffers { for _, mID := range o.MessageIds { response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{ChatID: o.GroupId, MessageID: mID}) diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index f692e573e61..3f403691f41 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -22,7 +22,7 @@ import ( v1protocol "github.com/status-im/status-go/protocol/v1" ) -var peerSyncingLoopInterval time.Duration = 60 * time.Second +var peerSyncingLoopInterval = 60 * time.Second var maxAdvertiseMessages = 40 func (m *Messenger) markDeliveredMessages(acks [][]byte) { @@ -118,12 +118,30 @@ func (m *Messenger) sendDatasyncOffers() error { return nil } - communities, err := m.communitiesManager.Joined() + err = m.sendDatasyncOffersForCommunities() if err != nil { return err } - for _, community := range communities { + err = m.sendDatasyncOffersForChats() + if err != nil { + return err + } + + // Check all the group ids that need to be on offer + // Get all the messages that need to be offered + // Prepare datasync messages + // Dispatch them to the right group + return nil +} + +func (m *Messenger) sendDatasyncOffersForCommunities() error { + joinedCommunities, err := m.communitiesManager.Joined() + if err != nil { + return err + } + + for _, community := range joinedCommunities { var chatIDs [][]byte for id := range community.Chats() { chatIDs = append(chatIDs, []byte(community.IDString()+id)) @@ -133,22 +151,22 @@ func (m *Messenger) sendDatasyncOffers() error { continue } - availableMessages, err := m.peersyncing.AvailableMessagesByGroupIDs(chatIDs, maxAdvertiseMessages) + availableMessages, err := m.peersyncing.AvailableMessagesByChatIDs(chatIDs, maxAdvertiseMessages) if err != nil { return err } availableMessagesMap := make(map[string][][]byte) for _, m := range availableMessages { - groupID := types.Bytes2Hex(m.ChatID) - availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID) + chatID := types.Bytes2Hex(m.ChatID) + availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID) } datasyncMessage := &datasyncproto.Payload{} if len(availableMessages) == 0 { continue } - for groupID, m := range availableMessagesMap { - datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(groupID), MessageIds: m}) + for chatID, m := range availableMessagesMap { + datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m}) } payload, err := proto.Marshal(datasyncMessage) if err != nil { @@ -164,12 +182,48 @@ func (m *Messenger) sendDatasyncOffers() error { if err != nil { return err } + } + return nil +} + +func (m *Messenger) sendDatasyncOffersForChats() error { + for _, chat := range m.Chats() { + availableMessages, err := m.peersyncing.AvailableMessagesByChatID([]byte(chat.ID), maxAdvertiseMessages) + if err != nil { + return err + } + availableMessagesMap := make(map[string][][]byte) + for _, m := range availableMessages { + chatID := types.Bytes2Hex(m.ChatID) + availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID) + } + + datasyncMessage := &datasyncproto.Payload{} + if len(availableMessages) == 0 { + continue + } + for chatID, m := range availableMessagesMap { + datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m}) + } + payload, err := proto.Marshal(datasyncMessage) + if err != nil { + return err + } + publicKey, err := chat.PublicKey() + if err != nil { + return err + } + rawMessage := common.RawMessage{ + Payload: payload, + Ephemeral: true, + SkipApplicationWrap: true, + } + _, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage) + if err != nil { + return err + } } - // Check all the group ids that need to be on offer - // Get all the messages that need to be offered - // Prepare datasync messages - // Dispatch them to the right group return nil } @@ -231,7 +285,6 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro return nil } -// TODO(alwx): peer syncing // canSyncMessageWith checks the permission of a message func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) { switch message.Type { diff --git a/protocol/messenger_peersyncing_test.go b/protocol/messenger_peersyncing_test.go index 244082002ee..932b0e6b818 100644 --- a/protocol/messenger_peersyncing_test.go +++ b/protocol/messenger_peersyncing_test.go @@ -140,6 +140,8 @@ func (s *MessengerPeersyncingSuite) thirdPartyTest(community *communities.Commun s.joinCommunity(community, s.owner, s.bob) + // TODO(alwx): test needs to be written + // Bob should now send an offer _, err = WaitOnMessengerResponse( s.bob, diff --git a/protocol/peersyncing/peersyncing.go b/protocol/peersyncing/peersyncing.go index b598d162445..c8331d53d74 100644 --- a/protocol/peersyncing/peersyncing.go +++ b/protocol/peersyncing/peersyncing.go @@ -25,12 +25,12 @@ func (p *PeerSyncing) AvailableMessages() ([]SyncMessage, error) { return p.persistence.All() } -func (p *PeerSyncing) AvailableMessagesByGroupID(groupID []byte, limit int) ([]SyncMessage, error) { - return p.persistence.ByGroupID(groupID, limit) +func (p *PeerSyncing) AvailableMessagesByChatID(groupID []byte, limit int) ([]SyncMessage, error) { + return p.persistence.ByChatID(groupID, limit) } -func (p *PeerSyncing) AvailableMessagesByGroupIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) { - return p.persistence.ByGroupIDs(groupIDs, limit) +func (p *PeerSyncing) AvailableMessagesByChatIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) { + return p.persistence.ByChatIDs(groupIDs, limit) } func (p *PeerSyncing) MessagesByIDs(messageIDs [][]byte) ([]SyncMessage, error) { diff --git a/protocol/peersyncing/peersyncing_test.go b/protocol/peersyncing/peersyncing_test.go index b03ccc3283f..dcf5cd76590 100644 --- a/protocol/peersyncing/peersyncing_test.go +++ b/protocol/peersyncing/peersyncing_test.go @@ -29,13 +29,13 @@ func (s *PeerSyncingSuite) SetupTest() { s.p = New(Config{Database: db}) } -var testGroupID = []byte("group-id") +var testCommunityID = []byte("community-id") func (s *PeerSyncingSuite) TestBasic() { syncMessage := SyncMessage{ ID: []byte("test-id"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test"), Timestamp: 1, @@ -48,19 +48,19 @@ func (s *PeerSyncingSuite) TestBasic() { s.Require().NoError(err) s.Require().Len(allMessages, 1) - byGroupID, err := s.p.AvailableMessagesByGroupID(syncMessage.ChatID, 10) + byGroupID, err := s.p.AvailableMessagesByChatID(syncMessage.ChatID, 10) s.Require().NoError(err) s.Require().Len(byGroupID, 1) - byGroupID, err = s.p.AvailableMessagesByGroupID([]byte("random-group-id"), 10) + byGroupID, err = s.p.AvailableMessagesByChatID([]byte("random-group-id"), 10) s.Require().NoError(err) s.Require().Len(byGroupID, 0) newSyncMessage := SyncMessage{ ID: []byte("test-id-2"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test-2"), Timestamp: 2, @@ -77,7 +77,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { syncMessage1 := SyncMessage{ ID: []byte("test-id-1"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test"), Timestamp: 1, @@ -85,7 +85,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { syncMessage2 := SyncMessage{ ID: []byte("test-id-2"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test"), Timestamp: 2, @@ -93,7 +93,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { syncMessage3 := SyncMessage{ ID: []byte("test-id-3"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test"), Timestamp: 3, @@ -101,7 +101,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { syncMessage4 := SyncMessage{ ID: []byte("test-id-4"), - ChatID: testGroupID, + ChatID: testCommunityID, Type: SyncMessageCommunityType, Payload: []byte("test"), Timestamp: 4, @@ -112,7 +112,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { s.Require().NoError(s.p.Add(syncMessage3)) s.Require().NoError(s.p.Add(syncMessage4)) - byGroupID, err := s.p.AvailableMessagesByGroupID(testGroupID, 10) + byGroupID, err := s.p.AvailableMessagesByChatID(testCommunityID, 10) s.Require().NoError(err) s.Require().Len(byGroupID, 4) @@ -122,7 +122,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() { s.Require().Equal(syncMessage3.ID, byGroupID[1].ID) s.Require().Equal(syncMessage4.ID, byGroupID[0].ID) - byGroupID, err = s.p.AvailableMessagesByGroupID(testGroupID, 3) + byGroupID, err = s.p.AvailableMessagesByChatID(testCommunityID, 3) s.Require().NoError(err) s.Require().Len(byGroupID, 3) diff --git a/protocol/peersyncing/sync_message_persistence.go b/protocol/peersyncing/sync_message_persistence.go index 35f16f615c1..998ee7b4cc7 100644 --- a/protocol/peersyncing/sync_message_persistence.go +++ b/protocol/peersyncing/sync_message_persistence.go @@ -11,8 +11,8 @@ type SyncMessagePersistence interface { Add(SyncMessage) error All() ([]SyncMessage, error) Complement([]SyncMessage) ([]SyncMessage, error) - ByGroupID([]byte, int) ([]SyncMessage, error) - ByGroupIDs([][]byte, int) ([]SyncMessage, error) + ByChatID([]byte, int) ([]SyncMessage, error) + ByChatIDs([][]byte, int) ([]SyncMessage, error) ByMessageIDs([][]byte) ([]SyncMessage, error) } @@ -54,7 +54,7 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) { return messages, nil } -func (p *SyncMessageSQLitePersistence) ByGroupID(groupID []byte, limit int) ([]SyncMessage, error) { +func (p *SyncMessageSQLitePersistence) ByChatID(groupID []byte, limit int) ([]SyncMessage, error) { var messages []SyncMessage rows, err := p.db.Query(`SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE group_id = ? ORDER BY timestamp DESC LIMIT ?`, groupID, limit) if err != nil { @@ -120,7 +120,7 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn return complement, nil } -func (p *SyncMessageSQLitePersistence) ByGroupIDs(ids [][]byte, limit int) ([]SyncMessage, error) { +func (p *SyncMessageSQLitePersistence) ByChatIDs(ids [][]byte, limit int) ([]SyncMessage, error) { if len(ids) == 0 { return nil, nil }