Skip to content

Commit

Permalink
Room pool initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Kleonikos Kyriakis committed Dec 22, 2023
1 parent 625b68d commit 314db2e
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 63 deletions.
6 changes: 3 additions & 3 deletions examples/rpc/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1"
"context"
"fmt"
"log"
Expand All @@ -10,7 +11,6 @@ import (

"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc"
accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1"
typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1"
internalmetadata "github.com/chain4travel/camino-messenger-bot/internal/metadata"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -31,7 +31,7 @@ func main() {
argsWithoutProg := os.Args[1:]
unencrypted := len(argsWithoutProg) == 0
ppConfig := config.PartnerPluginConfig{
Host: "localhost",
Host: "127.0.0.1",
Port: 9092,
Unencrypted: unencrypted,
}
Expand Down Expand Up @@ -65,7 +65,7 @@ func main() {
panic(err)
}
md := metadata.New(map[string]string{
"recipient": "@t-kopernikus1dry573dcz6jefshfxgya68jd6s07xezpm27ng9:matrix.camino.network",
"recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

Expand Down
5 changes: 3 additions & 2 deletions internal/matrix/matrix_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) {
}

syncer.OnEventType(event.StateMember, func(source mautrix.EventSource, evt *event.Event) {
if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite {
if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite && !m.roomHandler.HasAlreadyJoined(id.UserID(evt.Sender.String()), evt.RoomID) {
_, err := m.client.JoinRoomByID(evt.RoomID)
if err == nil {
m.roomHandler.CacheRoom(id.UserID(evt.Sender.String()), evt.RoomID) // add room to cache
Expand Down Expand Up @@ -155,6 +155,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) {
m.client.cancelSync = cancelSync
m.client.syncStopWait.Add(1)

m.roomHandler.Init()
go func() {
err = m.client.SyncWithContext(syncCtx)
defer m.client.syncStopWait.Done()
Expand All @@ -175,7 +176,7 @@ func (m *messenger) StopReceiver() error {
}

func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error {
//m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID))
m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID))

roomID, err := m.roomHandler.GetOrCreateRoomForRecipient(id.UserID(msg.Metadata.Recipient))
if err != nil {
Expand Down
125 changes: 69 additions & 56 deletions internal/matrix/room_handler.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,101 @@
package matrix

import (
"math/rand"
"sync"

"go.uber.org/zap"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"sync"
)

const RoomPoolSize = 10

type RoomHandler interface {
Init()
GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error)
CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error)
EnableEncryptionForRoom(roomID id.RoomID) error
GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool)
CacheRoom(recipient id.UserID, roomID id.RoomID)
HasAlreadyJoined(recipient id.UserID, roomID id.RoomID) bool
}

type roomPool struct {
rooms []id.RoomID
}
type roomHandler struct {
client *mautrix.Client
logger *zap.SugaredLogger
rooms map[id.UserID]*roomPool
rooms map[id.UserID]*RoomPool
mu sync.RWMutex
}

func NewRoomHandler(client *mautrix.Client, logger *zap.SugaredLogger) RoomHandler {
return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]*roomPool)}
return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]*RoomPool)}
}
func (r *roomHandler) Init() {
rooms, err := r.client.JoinedRooms()
if err != nil {
r.logger.Warn("failed to fetch joined rooms - skipping room handler initialization")
return
}

// cache all encrypted rooms
for _, roomID := range rooms.JoinedRooms {
r.logger.Debugf("Caching room %v | encrypted: %v", roomID, r.client.StateStore.IsEncrypted(roomID))
if !r.client.StateStore.IsEncrypted(roomID) {
continue
}
members, err := r.client.JoinedMembers(roomID)
if err != nil {
r.logger.Debugf("failed to fetch members for room %v", roomID)
continue
}
for userID, _ := range members.Joined {
r.CacheRoom(userID, roomID)
}
}
}

func (r *roomHandler) CacheRoom(recipient id.UserID, roomID id.RoomID) {
r.mu.Lock()
defer r.mu.Unlock()
if _, found := r.rooms[recipient]; !found {
r.rooms[recipient] = NewRoomPool(RoomPoolSize)
}
pool := r.rooms[recipient]
pool.Add(roomID)
}

func (r *roomHandler) HasAlreadyJoined(recipient id.UserID, roomID id.RoomID) bool {
if _, found := r.rooms[recipient]; !found {
return false
}
for _, id := range r.rooms[recipient].rooms {
if roomID == id {
return true
}
}
return false
}
func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) {

// check if room already established with recipient
roomID, found := r.GetEncryptedRoomForRecipient(recipient)

var err error
// if not create room and invite recipient
if !found {
for i := 0; i < RoomPoolSize; i++ {
roomID, err = r.CreateRoomAndInviteUser(recipient)
if err != nil {
return "", err
}
// enable encryption for room
err = r.EnableEncryptionForRoom(roomID)
if err != nil {
return "", err
}
roomID, _ := r.getEncryptedRoomForRecipient(recipient)

addNewRoomToPool := func() (id.RoomID, error) {
roomID, err := r.createRoomAndInviteUser(recipient)
if err != nil {
return "", err
} else {
err = r.enableEncryptionForRoom(roomID)
return roomID, err
}
}

// return room id
// even if we have found a cached room, we will still create new rooms
if roomPool, found := r.rooms[recipient]; !found || roomPool.currentSize < RoomPoolSize {
return addNewRoomToPool()
}

return roomID, nil
}

func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) {
func (r *roomHandler) createRoomAndInviteUser(userID id.UserID) (id.RoomID, error) {
r.logger.Debugf("Creating room and inviting user %v", userID)
req := mautrix.ReqCreateRoom{
Visibility: "private",
Expand All @@ -74,14 +110,14 @@ func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, erro
return resp.RoomID, nil
}

func (r *roomHandler) EnableEncryptionForRoom(roomID id.RoomID) error {
func (r *roomHandler) enableEncryptionForRoom(roomID id.RoomID) error {
r.logger.Debugf("Enabling encryption for room %s", roomID)
_, err := r.client.SendStateEvent(roomID, event.StateEncryption, "",
event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1, RotationPeriodMessages: 10000})
return err
}

func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) {
func (r *roomHandler) getEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) {
roomID := r.fetchCachedRoom(recipient)
if roomID != "" {
return roomID, true
Expand Down Expand Up @@ -112,18 +148,8 @@ func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.Room
}
}

// add rooms to pool until pool is full
for i := createdRooms; i < RoomPoolSize; i++ {
roomID, err = r.CreateRoomAndInviteUser(recipient)
if err != nil {
r.logger.Debugf("Failed to create room for recipient %v", recipient)
// enable encryption for room
} else {
err = r.EnableEncryptionForRoom(roomID)
if err != nil {
r.logger.Debugf("Failed to enable encryption for room %v", roomID)
}
}
if createdRooms > 0 {
return r.fetchCachedRoom(recipient), true
}
return "", false
}
Expand All @@ -132,20 +158,7 @@ func (r *roomHandler) fetchCachedRoom(recipient id.UserID) id.RoomID {
r.mu.RLock()
defer r.mu.RUnlock()
if _, found := r.rooms[recipient]; found {
return r.rooms[recipient].rooms[rand.Intn(RoomPoolSize)]
return r.rooms[recipient].Get()
}
return ""
}
func (r *roomHandler) CacheRoom(recipient id.UserID, roomID id.RoomID) {
r.mu.Lock()
defer r.mu.Unlock()
if _, found := r.rooms[recipient]; !found {
r.rooms[recipient] = &roomPool{rooms: make([]id.RoomID, RoomPoolSize)}
}
for i := 0; i < RoomPoolSize; i++ {
if r.rooms[recipient].rooms[i] == "" {
r.rooms[recipient].rooms[i] = roomID
return
}
}
}
40 changes: 40 additions & 0 deletions internal/matrix/room_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved.
* See the file LICENSE for licensing terms.
*/

package matrix

import (
"math/rand"
"maunium.net/go/mautrix/id"
)

type RoomPool struct {
rooms []id.RoomID
size int
currentIdx int
currentSize int
}

func NewRoomPool(size int) *RoomPool {
return &RoomPool{
rooms: make([]id.RoomID, size),
size: size,
currentIdx: 0,
}
}

func (q *RoomPool) Add(roomID id.RoomID) {
q.rooms[q.currentIdx] = roomID
q.currentIdx = (q.currentIdx + 1) % q.size
if q.currentSize < q.size { // else we're just replacing old rooms
q.currentSize++
}
}
func (q *RoomPool) Get() id.RoomID {
if q.currentSize == 0 {
return ""
}
return q.rooms[rand.Intn(q.currentSize)]
}
4 changes: 2 additions & 2 deletions internal/rpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
"log"
"net"

"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc"
accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1"
activityv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/activity/v1alpha1"
networkv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/network/v1alpha1"
partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1"
pingv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/ping/v1alpha1"
transportv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/transport/v1alpha1"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc"
accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1"
"github.com/chain4travel/camino-messenger-bot/config"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
Expand Down

0 comments on commit 314db2e

Please sign in to comment.