From 100f6fcb893b1ad17c42eaad6da7746a77e488e8 Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Wed, 29 Nov 2023 14:21:12 +0200 Subject: [PATCH 1/5] Introduce room pool (matrix) --- internal/matrix/matrix_messenger.go | 1 + internal/matrix/msg_assembler.go | 11 +++-- internal/matrix/room_handler.go | 72 ++++++++++++++++++++++------- scripts/sendXRequests.sh | 9 +++- 4 files changed, 71 insertions(+), 22 deletions(-) diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 0de9f7d8..4b86f096 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -83,6 +83,7 @@ func (m *messenger) StartReceiver() (string, error) { if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite { _, err := m.client.JoinRoomByID(evt.RoomID) if err == nil { + m.roomHandler.CacheRoom(id.UserID(evt.Sender.String()), evt.RoomID) // add room to cache m.logger.Info("Joined room after invite", zap.String("room_id", evt.RoomID.String()), zap.String("inviter", evt.Sender.String())) diff --git a/internal/matrix/msg_assembler.go b/internal/matrix/msg_assembler.go index b1f37056..6ea4d75d 100644 --- a/internal/matrix/msg_assembler.go +++ b/internal/matrix/msg_assembler.go @@ -24,9 +24,10 @@ type messageAssembler struct { func NewMessageAssembler(logger *zap.SugaredLogger) MessageAssembler { return &messageAssembler{logger: logger, partialMessages: make(map[string][]CaminoMatrixMessage)} } -func (a *messageAssembler) AssembleMessage(msg CaminoMatrixMessage) (CaminoMatrixMessage, error, bool) { +func (a *messageAssembler) AssembleMessage(msg CaminoMatrixMessage) (decompressedCaminoMsg CaminoMatrixMessage, err error, completed bool) { + // standalone message if msg.Metadata.NumberOfChunks == 1 { - decompressedCaminoMsg, err := assembleAndDecompressCaminoMatrixMessages([]CaminoMatrixMessage{msg}) + decompressedCaminoMsg, err = assembleAndDecompressCaminoMatrixMessages([]CaminoMatrixMessage{msg}) return decompressedCaminoMsg, err, true } a.mu.Lock() @@ -36,11 +37,13 @@ func (a *messageAssembler) AssembleMessage(msg CaminoMatrixMessage) (CaminoMatri a.partialMessages[id] = []CaminoMatrixMessage{} } + // add chunk to partial message slice a.partialMessages[id] = append(a.partialMessages[id], msg) + // check if message is complete if len(a.partialMessages[id]) == int(msg.Metadata.NumberOfChunks) { - decompressedCaminoMsg, err := assembleAndDecompressCaminoMatrixMessages(a.partialMessages[id]) + decompressedCaminoMsg, err = assembleAndDecompressCaminoMatrixMessages(a.partialMessages[id]) delete(a.partialMessages, id) return decompressedCaminoMsg, err, true } - return CaminoMatrixMessage{}, nil, false + return decompressedCaminoMsg, nil, false } diff --git a/internal/matrix/room_handler.go b/internal/matrix/room_handler.go index 5c231e07..2a239827 100644 --- a/internal/matrix/room_handler.go +++ b/internal/matrix/room_handler.go @@ -1,6 +1,7 @@ package matrix import ( + "math/rand" "sync" "go.uber.org/zap" @@ -9,21 +10,28 @@ import ( "maunium.net/go/mautrix/id" ) +const RoomPoolSize = 10 + type RoomHandler interface { GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) EnableEncryptionForRoom(roomID id.RoomID) error GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) + CacheRoom(recipient id.UserID, roomID id.RoomID) +} + +type roomPool struct { + rooms []id.RoomID } type roomHandler struct { client *mautrix.Client logger *zap.SugaredLogger - rooms map[id.UserID]id.RoomID + rooms map[id.UserID]*roomPool mu sync.RWMutex } func NewRoomHandler(client *mautrix.Client, logger *zap.SugaredLogger) RoomHandler { - return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]id.RoomID)} + return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]*roomPool)} } func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) { @@ -34,14 +42,16 @@ func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomI var err error // if not create room and invite recipient if !found { - roomID, err = r.CreateRoomAndInviteUser(recipient) - if err != nil { - return "", err - } - // enable encryption for room - err = r.EnableEncryptionForRoom(roomID) - if err != nil { - return "", err + for i := 0; i < RoomPoolSize; i++ { + roomID, err = r.CreateRoomAndInviteUser(recipient) + if err != nil { + return "", err + } + // enable encryption for room + err = r.EnableEncryptionForRoom(roomID) + if err != nil { + return "", err + } } } @@ -60,7 +70,7 @@ func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, erro if err != nil { return "", err } - r.cacheRoom(userID, resp.RoomID) + r.CacheRoom(userID, resp.RoomID) return resp.RoomID, nil } @@ -81,6 +91,8 @@ func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.Room if err != nil { return "", false } + + createdRooms := 0 for _, roomID := range rooms.JoinedRooms { if !r.client.StateStore.IsEncrypted(roomID) { continue @@ -92,8 +104,25 @@ func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.Room _, found := members.Joined[recipient] if found { - r.cacheRoom(recipient, roomID) - return roomID, found + r.CacheRoom(recipient, roomID) + createdRooms++ + } + if createdRooms == RoomPoolSize { + return r.fetchCachedRoom(recipient), true + } + } + + // add rooms to pool until pool is full + for i := createdRooms; i < RoomPoolSize; i++ { + roomID, err = r.CreateRoomAndInviteUser(recipient) + if err != nil { + r.logger.Debugf("Failed to create room for recipient %v", recipient) + // enable encryption for room + } else { + err = r.EnableEncryptionForRoom(roomID) + if err != nil { + r.logger.Debugf("Failed to enable encryption for room %v", roomID) + } } } return "", false @@ -102,10 +131,21 @@ func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.Room func (r *roomHandler) fetchCachedRoom(recipient id.UserID) id.RoomID { r.mu.RLock() defer r.mu.RUnlock() - return r.rooms[recipient] + if _, found := r.rooms[recipient]; found { + return r.rooms[recipient].rooms[rand.Intn(RoomPoolSize)] + } + return "" } -func (r *roomHandler) cacheRoom(recipient id.UserID, roomID id.RoomID) { +func (r *roomHandler) CacheRoom(recipient id.UserID, roomID id.RoomID) { r.mu.Lock() defer r.mu.Unlock() - r.rooms[recipient] = roomID + if _, found := r.rooms[recipient]; !found { + r.rooms[recipient] = &roomPool{rooms: make([]id.RoomID, RoomPoolSize)} + } + for i := 0; i < RoomPoolSize; i++ { + if r.rooms[recipient].rooms[i] == "" { + r.rooms[recipient].rooms[i] = roomID + return + } + } } diff --git a/scripts/sendXRequests.sh b/scripts/sendXRequests.sh index 4c6e9bfb..3a12edb7 100755 --- a/scripts/sendXRequests.sh +++ b/scripts/sendXRequests.sh @@ -1,5 +1,10 @@ #!/bin/bash +if ! [[ "$0" =~ scripts/sendXRequests.sh ]]; then + echo "must be run from repository root" + exit 255 +fi + # Check if the number of arguments provided is correct if [ "$#" -ne 1 ]; then echo "Usage: $0 " @@ -10,12 +15,12 @@ fi times_to_run=$1 # Change the path to your Go file below -go_file_path="../examples/rpc/client.go" +go_file_path="examples/rpc/client.go" # Loop to run the Go file X times in parallel for ((i=1; i<=$times_to_run; i++)) do - echo "Sending $i request..." +# echo "Sending $i request..." go run $go_file_path & done From 625b68d3b2f404b413a45810a036079967dffb71 Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Thu, 30 Nov 2023 14:43:22 +0200 Subject: [PATCH 2/5] Introduce botmode - configurable flag to determine distributor/provider/all bot mode --- config/config.go | 1 + config/flag_keys.go | 1 + config/flags.go | 1 + examples/rpc/client.go | 32 ++++++++++-- internal/app/app.go | 9 +++- internal/matrix/matrix_messenger.go | 76 ++++++++++++++++++++--------- internal/matrix/room_handler.go | 2 +- internal/messaging/messenger.go | 2 +- internal/messaging/processor.go | 2 +- internal/metadata/metadata.go | 6 +++ internal/rpc/server/server.go | 24 +++++++-- 11 files changed, 122 insertions(+), 34 deletions(-) diff --git a/config/config.go b/config/config.go index 472eabd1..f34656de 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,7 @@ type SupportedRequestTypesFlag []string type AppConfig struct { DeveloperMode bool `mapstructure:"developer_mode"` SupportedRequestTypes SupportedRequestTypesFlag `mapstructure:"supported_request_types"` + BotMode uint `mapstructure:"bot_mode"` // 0 both, 1 request, 2 response } type MatrixConfig struct { Key string `mapstructure:"matrix_key"` // TODO @evlekht I'd suggest to add some parsed config, so we'll see on config read if some fields are invalid diff --git a/config/flag_keys.go b/config/flag_keys.go index 6ab42f27..e8dd9cea 100644 --- a/config/flag_keys.go +++ b/config/flag_keys.go @@ -15,4 +15,5 @@ const ( PartnerPluginCAFileKey = "partner_plugin_ca_file" MessengerTimeoutKey = "messenger_timeout" SupportedRequestTypesKey = "supported_request_types" + BotModeKey = "bot_mode" ) diff --git a/config/flags.go b/config/flags.go index b8b8f725..2e82cd68 100644 --- a/config/flags.go +++ b/config/flags.go @@ -5,6 +5,7 @@ import "flag" func readAppConfig(cfg AppConfig, fs *flag.FlagSet) { fs.BoolVar(&cfg.DeveloperMode, DeveloperMode, false, "Sets developer mode") fs.Var(&cfg.SupportedRequestTypes, SupportedRequestTypesKey, "The list of supported request types") + fs.UintVar(&cfg.BotMode, BotModeKey, 0, "The bot mode") flag.Parse() } diff --git a/examples/rpc/client.go b/examples/rpc/client.go index e3866b4e..b9b00188 100644 --- a/examples/rpc/client.go +++ b/examples/rpc/client.go @@ -1,15 +1,16 @@ package main import ( - typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" "context" "fmt" "log" "os" + "sort" "time" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1" + typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" internalmetadata "github.com/chain4travel/camino-messenger-bot/internal/metadata" "go.uber.org/zap" "google.golang.org/grpc" @@ -64,23 +65,46 @@ func main() { panic(err) } md := metadata.New(map[string]string{ - "recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", + "recipient": "@t-kopernikus1dry573dcz6jefshfxgya68jd6s07xezpm27ng9:matrix.camino.network", }) ctx := metadata.NewOutgoingContext(context.Background(), md) ass := accommodationv1alpha1grpc.NewAccommodationSearchServiceClient(c.ClientConn) - begin := time.Now() var header metadata.MD + begin := time.Now() resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header)) if err != nil { log.Fatal(err) } + totalTime := time.Since(begin) + fmt.Printf("Total time|%s|%s\n", resp.Metadata.SearchId, totalTime) metadata := &internalmetadata.Metadata{} err = metadata.FromGrpcMD(header) if err != nil { fmt.Print("error extracting metadata") } - fmt.Printf("Received response after %s => ID: %s\n", time.Since(begin), resp.Metadata.SearchId) + + var entries []struct { + Key string + Value int64 + } + // Populate the slice with map entries + for key, value := range metadata.Timestamps { + entries = append(entries, struct { + Key string + Value int64 + }{Key: key, Value: value}) + } + + // Sort the slice based on values + sort.Slice(entries, func(i, j int) bool { + return entries[i].Value < entries[j].Value + }) + lastValue := int64(0) + for _, entry := range entries { + fmt.Printf("%s|%s|%d|%d|%f\n", entry.Key, resp.Metadata.SearchId, entry.Value, entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime.Milliseconds())) + lastValue = entry.Value + } c.Shutdown() } diff --git a/internal/app/app.go b/internal/app/app.go index a35ba6fc..71465ef8 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -56,11 +56,16 @@ func (a *App) Run(ctx context.Context) error { return nil }) + if a.cfg.BotMode > 2 { + a.logger.Error("Invalid bot mode") + return nil + } + messenger := matrix.NewMessenger(&a.cfg.MatrixConfig, a.logger) userIDUpdated := make(chan string) // Channel to pass the userID g.Go(func() error { - a.logger.Info("Starting message receiver...") - userID, err := messenger.StartReceiver() + a.logger.Infof("Starting message receiver with botmode %d ...", a.cfg.BotMode) + userID, err := messenger.StartReceiver(a.cfg.BotMode) if err != nil { panic(err) } diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 4b86f096..9f620958 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/formatting" @@ -22,7 +23,8 @@ import ( var _ messaging.Messenger = (*messenger)(nil) -var C4TMessage = event.Type{Type: "m.room.c4t-msg", Class: event.MessageEventType} +var C4TMessageRequest = event.Type{Type: "m.room.c4t-msg-request", Class: event.MessageEventType} +var C4TMessageResponse = event.Type{Type: "m.room.c4t-msg-response", Class: event.MessageEventType} type client struct { *mautrix.Client @@ -38,6 +40,7 @@ type messenger struct { client client roomHandler RoomHandler msgAssembler MessageAssembler + mu sync.Mutex } func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) *messenger { @@ -58,27 +61,49 @@ func (m *messenger) Checkpoint() string { return "messenger-gateway" } -func (m *messenger) StartReceiver() (string, error) { +func (m *messenger) StartReceiver(botMode uint) (string, error) { syncer := m.client.Syncer.(*mautrix.DefaultSyncer) - event.TypeMap[C4TMessage] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly + event.TypeMap[C4TMessageRequest] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly + event.TypeMap[C4TMessageResponse] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly - syncer.OnEventType(C4TMessage, func(source mautrix.EventSource, evt *event.Event) { + processCamMsg := func(source mautrix.EventSource, evt *event.Event) { msg := evt.Content.Parsed.(*CaminoMatrixMessage) - completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg) - if err != nil { - m.logger.Errorf("failed to assemble message: %v", err) - return - } - if !completed { - return // partial messages are not passed down to the msgChannel - } - completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "received")) - m.msgChannel <- messaging.Message{ - Metadata: completeMsg.Metadata, - Content: completeMsg.Content, - Type: messaging.MessageType(msg.MsgType), - } - }) + + go func() { + t := time.Now() + completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg) + if err != nil { + m.logger.Errorf("failed to assemble message: %v", err) + return + } + if !completed { + return // partial messages are not passed down to the msgChannel + } + fmt.Printf("received-message: |%s|%d\n", completeMsg.Metadata.RequestID, t.UnixMilli()) + completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s", m.Checkpoint(), "received"), t.UnixMilli()) + completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "assembled")) + + m.mu.Lock() + m.msgChannel <- messaging.Message{ + Metadata: completeMsg.Metadata, + Content: completeMsg.Content, + Type: messaging.MessageType(msg.MsgType), + } + m.mu.Unlock() + }() + } + switch botMode { + case 0: + syncer.OnEventType(C4TMessageResponse, processCamMsg) + syncer.OnEventType(C4TMessageRequest, processCamMsg) + case 1: + syncer.OnEventType(C4TMessageRequest, processCamMsg) + case 2: + syncer.OnEventType(C4TMessageResponse, processCamMsg) + default: + return "", fmt.Errorf("invalid bot mode: %d", botMode) + } + syncer.OnEventType(event.StateMember, func(source mautrix.EventSource, evt *event.Event) { if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite { _, err := m.client.JoinRoomByID(evt.RoomID) @@ -150,7 +175,7 @@ func (m *messenger) StopReceiver() error { } func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error { - m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID)) + //m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID)) roomID, err := m.roomHandler.GetOrCreateRoomForRecipient(id.UserID(msg.Metadata.Recipient)) if err != nil { @@ -162,13 +187,20 @@ func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error { return err } - return m.sendMessageEvents(roomID, C4TMessage, messages) + switch msg.Type.Category() { + case messaging.Request: + return m.sendMessageEvents(roomID, C4TMessageRequest, messages) + case messaging.Response: + return m.sendMessageEvents(roomID, C4TMessageResponse, messages) + default: + return fmt.Errorf("no message category defined for type: %s", msg.Type) + } } func (m *messenger) sendMessageEvents(roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error { //TODO add retry logic? for _, msg := range messages { - _, err := m.client.SendMessageEvent(roomID, eventType, msg) + _, err := m.client.SendMessageEvent(roomID, eventType, msg, mautrix.ReqSendEvent{TransactionID: msg.Metadata.RequestID}) if err != nil { return err } diff --git a/internal/matrix/room_handler.go b/internal/matrix/room_handler.go index 2a239827..e92a4b22 100644 --- a/internal/matrix/room_handler.go +++ b/internal/matrix/room_handler.go @@ -77,7 +77,7 @@ func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, erro func (r *roomHandler) EnableEncryptionForRoom(roomID id.RoomID) error { r.logger.Debugf("Enabling encryption for room %s", roomID) _, err := r.client.SendStateEvent(roomID, event.StateEncryption, "", - event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1}) + event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1, RotationPeriodMessages: 10000}) return err } diff --git a/internal/messaging/messenger.go b/internal/messaging/messenger.go index d443f777..d785cfb8 100644 --- a/internal/messaging/messenger.go +++ b/internal/messaging/messenger.go @@ -12,7 +12,7 @@ type APIMessageResponse struct { } type Messenger interface { metadata.Checkpoint - StartReceiver() (string, error) // start receiving messages. Returns the user id + StartReceiver(botMode uint) (string, error) // start receiving messages. Returns the user id StopReceiver() error // stop receiving messages SendAsync(ctx context.Context, m Message) error // asynchronous call (fire and forget) Inbound() chan Message // channel where incoming messages are written diff --git a/internal/messaging/processor.go b/internal/messaging/processor.go index 80e4d22e..648729bd 100644 --- a/internal/messaging/processor.go +++ b/internal/messaging/processor.go @@ -146,7 +146,7 @@ func (p *processor) Request(ctx context.Context, msg Message) (Message, error) { if response.Metadata.RequestID == msg.Metadata.RequestID { return response, nil } - //p.logger.Debugf("Ignoring response message with request id: %s, expecting: %s", response.Metadata.RequestID, msg.Metadata.RequestID) + p.logger.Debugf("Ignoring response message with request id: %s, expecting: %s", response.Metadata.RequestID, msg.Metadata.RequestID) case <-ctx.Done(): return Message{}, fmt.Errorf("response exceeded configured timeout of %v seconds for request: %s", p.timeout, msg.Metadata.RequestID) } diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index e39d0ddb..9fc8cb1f 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -82,3 +82,9 @@ func (m *Metadata) Stamp(checkpoint string) { } m.Timestamps[checkpoint] = time.Now().UnixMilli() } +func (m *Metadata) StampOn(checkpoint string, t int64) { + if m.Timestamps == nil { + m.Timestamps = make(map[string]int64) + } + m.Timestamps[checkpoint] = t +} diff --git a/internal/rpc/server/server.go b/internal/rpc/server/server.go index 9dbf23a3..88e1f9e6 100644 --- a/internal/rpc/server/server.go +++ b/internal/rpc/server/server.go @@ -17,7 +17,6 @@ import ( partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1" pingv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/ping/v1alpha1" transportv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/transport/v1alpha1" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1" "github.com/chain4travel/camino-messenger-bot/config" @@ -27,6 +26,7 @@ import ( "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) var ( @@ -72,6 +72,7 @@ func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor } server := &server{cfg: cfg, logger: logger, processor: processor, serviceRegistry: serviceRegistry} server.grpcServer = createGrpcServerAndRegisterServices(server, opts...) + reflection.Register(server.grpcServer) return server } @@ -152,11 +153,28 @@ func (s *server) processExternalRequest(ctx context.Context, requestType messagi Metadata: md, } response, err := s.processor.ProcessOutbound(ctx, *m) - response.Metadata.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "processed")) + response.Metadata.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "response")) grpc.SendHeader(ctx, response.Metadata.ToGrpcMD()) return response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } +func (s *server) parseMetadata(ctx context.Context) (error, metadata.Metadata) { + requestID, err := uuid.NewRandom() + if err != nil { + return nil, metadata.Metadata{} + } + + md := metadata.Metadata{ + RequestID: requestID.String(), + } + md.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "request")) + err = md.ExtractMetadata(ctx) + if err != nil && md.RequestID == "" { + return fmt.Errorf("no recipient assigned"), md + } + return err, md +} + func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) { requestID, err := uuid.NewRandom() if err != nil { @@ -166,7 +184,7 @@ func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) md := metadata.Metadata{ RequestID: requestID.String(), } - md.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "received")) + md.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "request")) err = md.ExtractMetadata(ctx) return err, md } From 314db2ed0a816812c24172c4444931dd86456882 Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Wed, 13 Dec 2023 09:39:19 +0200 Subject: [PATCH 3/5] Room pool initialization --- examples/rpc/client.go | 6 +- internal/matrix/matrix_messenger.go | 5 +- internal/matrix/room_handler.go | 125 +++++++++++++++------------- internal/matrix/room_pool.go | 40 +++++++++ internal/rpc/server/server.go | 4 +- 5 files changed, 117 insertions(+), 63 deletions(-) create mode 100644 internal/matrix/room_pool.go diff --git a/examples/rpc/client.go b/examples/rpc/client.go index b9b00188..229d3581 100644 --- a/examples/rpc/client.go +++ b/examples/rpc/client.go @@ -1,6 +1,7 @@ package main import ( + typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" "context" "fmt" "log" @@ -10,7 +11,6 @@ import ( "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1" - typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" internalmetadata "github.com/chain4travel/camino-messenger-bot/internal/metadata" "go.uber.org/zap" "google.golang.org/grpc" @@ -31,7 +31,7 @@ func main() { argsWithoutProg := os.Args[1:] unencrypted := len(argsWithoutProg) == 0 ppConfig := config.PartnerPluginConfig{ - Host: "localhost", + Host: "127.0.0.1", Port: 9092, Unencrypted: unencrypted, } @@ -65,7 +65,7 @@ func main() { panic(err) } md := metadata.New(map[string]string{ - "recipient": "@t-kopernikus1dry573dcz6jefshfxgya68jd6s07xezpm27ng9:matrix.camino.network", + "recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", }) ctx := metadata.NewOutgoingContext(context.Background(), md) diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 9f620958..0db34e5c 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -105,7 +105,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { } syncer.OnEventType(event.StateMember, func(source mautrix.EventSource, evt *event.Event) { - if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite { + if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite && !m.roomHandler.HasAlreadyJoined(id.UserID(evt.Sender.String()), evt.RoomID) { _, err := m.client.JoinRoomByID(evt.RoomID) if err == nil { m.roomHandler.CacheRoom(id.UserID(evt.Sender.String()), evt.RoomID) // add room to cache @@ -155,6 +155,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { m.client.cancelSync = cancelSync m.client.syncStopWait.Add(1) + m.roomHandler.Init() go func() { err = m.client.SyncWithContext(syncCtx) defer m.client.syncStopWait.Done() @@ -175,7 +176,7 @@ func (m *messenger) StopReceiver() error { } func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error { - //m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID)) + m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID)) roomID, err := m.roomHandler.GetOrCreateRoomForRecipient(id.UserID(msg.Metadata.Recipient)) if err != nil { diff --git a/internal/matrix/room_handler.go b/internal/matrix/room_handler.go index e92a4b22..1645323c 100644 --- a/internal/matrix/room_handler.go +++ b/internal/matrix/room_handler.go @@ -1,65 +1,101 @@ package matrix import ( - "math/rand" - "sync" - "go.uber.org/zap" "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" + "sync" ) const RoomPoolSize = 10 type RoomHandler interface { + Init() GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) - CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) - EnableEncryptionForRoom(roomID id.RoomID) error - GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) CacheRoom(recipient id.UserID, roomID id.RoomID) + HasAlreadyJoined(recipient id.UserID, roomID id.RoomID) bool } -type roomPool struct { - rooms []id.RoomID -} type roomHandler struct { client *mautrix.Client logger *zap.SugaredLogger - rooms map[id.UserID]*roomPool + rooms map[id.UserID]*RoomPool mu sync.RWMutex } func NewRoomHandler(client *mautrix.Client, logger *zap.SugaredLogger) RoomHandler { - return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]*roomPool)} + return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]*RoomPool)} +} +func (r *roomHandler) Init() { + rooms, err := r.client.JoinedRooms() + if err != nil { + r.logger.Warn("failed to fetch joined rooms - skipping room handler initialization") + return + } + + // cache all encrypted rooms + for _, roomID := range rooms.JoinedRooms { + r.logger.Debugf("Caching room %v | encrypted: %v", roomID, r.client.StateStore.IsEncrypted(roomID)) + if !r.client.StateStore.IsEncrypted(roomID) { + continue + } + members, err := r.client.JoinedMembers(roomID) + if err != nil { + r.logger.Debugf("failed to fetch members for room %v", roomID) + continue + } + for userID, _ := range members.Joined { + r.CacheRoom(userID, roomID) + } + } } +func (r *roomHandler) CacheRoom(recipient id.UserID, roomID id.RoomID) { + r.mu.Lock() + defer r.mu.Unlock() + if _, found := r.rooms[recipient]; !found { + r.rooms[recipient] = NewRoomPool(RoomPoolSize) + } + pool := r.rooms[recipient] + pool.Add(roomID) +} + +func (r *roomHandler) HasAlreadyJoined(recipient id.UserID, roomID id.RoomID) bool { + if _, found := r.rooms[recipient]; !found { + return false + } + for _, id := range r.rooms[recipient].rooms { + if roomID == id { + return true + } + } + return false +} func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) { // check if room already established with recipient - roomID, found := r.GetEncryptedRoomForRecipient(recipient) - - var err error - // if not create room and invite recipient - if !found { - for i := 0; i < RoomPoolSize; i++ { - roomID, err = r.CreateRoomAndInviteUser(recipient) - if err != nil { - return "", err - } - // enable encryption for room - err = r.EnableEncryptionForRoom(roomID) - if err != nil { - return "", err - } + roomID, _ := r.getEncryptedRoomForRecipient(recipient) + + addNewRoomToPool := func() (id.RoomID, error) { + roomID, err := r.createRoomAndInviteUser(recipient) + if err != nil { + return "", err + } else { + err = r.enableEncryptionForRoom(roomID) + return roomID, err } } - // return room id + // even if we have found a cached room, we will still create new rooms + if roomPool, found := r.rooms[recipient]; !found || roomPool.currentSize < RoomPoolSize { + return addNewRoomToPool() + } + return roomID, nil } -func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) { +func (r *roomHandler) createRoomAndInviteUser(userID id.UserID) (id.RoomID, error) { r.logger.Debugf("Creating room and inviting user %v", userID) req := mautrix.ReqCreateRoom{ Visibility: "private", @@ -74,14 +110,14 @@ func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, erro return resp.RoomID, nil } -func (r *roomHandler) EnableEncryptionForRoom(roomID id.RoomID) error { +func (r *roomHandler) enableEncryptionForRoom(roomID id.RoomID) error { r.logger.Debugf("Enabling encryption for room %s", roomID) _, err := r.client.SendStateEvent(roomID, event.StateEncryption, "", event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1, RotationPeriodMessages: 10000}) return err } -func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) { +func (r *roomHandler) getEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) { roomID := r.fetchCachedRoom(recipient) if roomID != "" { return roomID, true @@ -112,18 +148,8 @@ func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.Room } } - // add rooms to pool until pool is full - for i := createdRooms; i < RoomPoolSize; i++ { - roomID, err = r.CreateRoomAndInviteUser(recipient) - if err != nil { - r.logger.Debugf("Failed to create room for recipient %v", recipient) - // enable encryption for room - } else { - err = r.EnableEncryptionForRoom(roomID) - if err != nil { - r.logger.Debugf("Failed to enable encryption for room %v", roomID) - } - } + if createdRooms > 0 { + return r.fetchCachedRoom(recipient), true } return "", false } @@ -132,20 +158,7 @@ func (r *roomHandler) fetchCachedRoom(recipient id.UserID) id.RoomID { r.mu.RLock() defer r.mu.RUnlock() if _, found := r.rooms[recipient]; found { - return r.rooms[recipient].rooms[rand.Intn(RoomPoolSize)] + return r.rooms[recipient].Get() } return "" } -func (r *roomHandler) CacheRoom(recipient id.UserID, roomID id.RoomID) { - r.mu.Lock() - defer r.mu.Unlock() - if _, found := r.rooms[recipient]; !found { - r.rooms[recipient] = &roomPool{rooms: make([]id.RoomID, RoomPoolSize)} - } - for i := 0; i < RoomPoolSize; i++ { - if r.rooms[recipient].rooms[i] == "" { - r.rooms[recipient].rooms[i] = roomID - return - } - } -} diff --git a/internal/matrix/room_pool.go b/internal/matrix/room_pool.go new file mode 100644 index 00000000..960119c5 --- /dev/null +++ b/internal/matrix/room_pool.go @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package matrix + +import ( + "math/rand" + "maunium.net/go/mautrix/id" +) + +type RoomPool struct { + rooms []id.RoomID + size int + currentIdx int + currentSize int +} + +func NewRoomPool(size int) *RoomPool { + return &RoomPool{ + rooms: make([]id.RoomID, size), + size: size, + currentIdx: 0, + } +} + +func (q *RoomPool) Add(roomID id.RoomID) { + q.rooms[q.currentIdx] = roomID + q.currentIdx = (q.currentIdx + 1) % q.size + if q.currentSize < q.size { // else we're just replacing old rooms + q.currentSize++ + } +} +func (q *RoomPool) Get() id.RoomID { + if q.currentSize == 0 { + return "" + } + return q.rooms[rand.Intn(q.currentSize)] +} diff --git a/internal/rpc/server/server.go b/internal/rpc/server/server.go index 88e1f9e6..e2f170c1 100644 --- a/internal/rpc/server/server.go +++ b/internal/rpc/server/server.go @@ -7,18 +7,18 @@ import ( "log" "net" + "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc" + accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1" activityv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/activity/v1alpha1" networkv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/network/v1alpha1" partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1" pingv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/ping/v1alpha1" transportv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/transport/v1alpha1" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" - accommodationv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/accommodation/v1alpha1" "github.com/chain4travel/camino-messenger-bot/config" "github.com/chain4travel/camino-messenger-bot/internal/messaging" "github.com/chain4travel/camino-messenger-bot/internal/metadata" From 99d6e36f07925627820d2f40fb801fed1ab40b40 Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Tue, 19 Dec 2023 13:23:15 +0200 Subject: [PATCH 4/5] Generate csv report when running example rpc client - add configurable flags for host/port/recipient --- examples/rpc/client.go | 117 +++++++++++++++++++++++----- internal/matrix/matrix_messenger.go | 5 +- scripts/printReport.sh | 42 ++++++++++ scripts/sendXRequests.sh | 11 +-- 4 files changed, 143 insertions(+), 32 deletions(-) mode change 100644 => 100755 examples/rpc/client.go create mode 100755 scripts/printReport.sh diff --git a/examples/rpc/client.go b/examples/rpc/client.go old mode 100644 new mode 100755 index 229d3581..8af1d375 --- a/examples/rpc/client.go +++ b/examples/rpc/client.go @@ -3,10 +3,13 @@ package main import ( typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" "context" + "encoding/csv" + "flag" "fmt" - "log" "os" "sort" + "strconv" + "sync" "time" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" @@ -21,6 +24,8 @@ import ( ) func main() { + var mu sync.Mutex + var wg sync.WaitGroup var logger *zap.Logger cfg := zap.NewDevelopmentConfig() cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) @@ -28,20 +33,43 @@ func main() { sLogger := logger.Sugar() logger.Sync() - argsWithoutProg := os.Args[1:] - unencrypted := len(argsWithoutProg) == 0 + times := flag.Int("requests", 1, "Repeat the request n times") + host := flag.String("host", "127.0.0.1", "Distributor bot host") + port := flag.Int("port", 9092, "Distributor bot port") + recipient := flag.String("recipient", "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", "Recipient address (format: @t-kopernikus[...]:matrix.camino.network") + caCertFile := flag.String("ca-cert-file", "", "CA certificate file (optional)") + flag.Parse() + ppConfig := config.PartnerPluginConfig{ - Host: "127.0.0.1", - Port: 9092, - Unencrypted: unencrypted, + Host: *host, + Port: *port, + Unencrypted: *caCertFile == "", } - if !unencrypted { - ppConfig.CACertFile = argsWithoutProg[0] + ppConfig.CACertFile = *caCertFile + + loadTestData := make([][]string, *times) + for i := 0; i < *times; i++ { + loadTestData[i] = make([]string, 6) + wg.Add(1) + go func(counter int) { + defer wg.Done() + createClientAndRunRequest(counter, ppConfig, sLogger, *recipient, loadTestData, mu) + }(i) } + + wg.Wait() + + if len(loadTestData) > 1 || len(loadTestData) == 1 && loadTestData[0][0] != "" { // otherwise no data have been recorded + persistToCSV(loadTestData) + } +} + +func createClientAndRunRequest(i int, ppConfig config.PartnerPluginConfig, sLogger *zap.SugaredLogger, recipient string, loadTestData [][]string, mu sync.Mutex) { c := client.NewClient(&ppConfig, sLogger) err := c.Start() if err != nil { - panic(err) + fmt.Errorf("error starting client: %v", err) + return } request := &accommodationv1alpha1.AccommodationSearchRequest{ Header: nil, @@ -60,12 +88,8 @@ func main() { }, } - err = c.Start() - if err != nil { - panic(err) - } md := metadata.New(map[string]string{ - "recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", + "recipient": recipient, }) ctx := metadata.NewOutgoingContext(context.Background(), md) @@ -74,16 +98,25 @@ func main() { begin := time.Now() resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header)) if err != nil { - log.Fatal(err) + sLogger.Errorf("error when performing search: %v", err) + return } totalTime := time.Since(begin) - fmt.Printf("Total time|%s|%s\n", resp.Metadata.SearchId, totalTime) + fmt.Println(totalTime.Milliseconds()) + //fmt.Printf("Total time(ms)|%s|%d\n", resp.Metadata.SearchId.GetValue(), totalTime.Milliseconds()) metadata := &internalmetadata.Metadata{} err = metadata.FromGrpcMD(header) if err != nil { - fmt.Print("error extracting metadata") + sLogger.Errorf("error extracting metadata: %v", err) } + addToDataset(int64(i), totalTime.Milliseconds(), resp, metadata, loadTestData, mu) + + c.Shutdown() +} + +func addToDataset(counter int64, totalTime int64, resp *accommodationv1alpha1.AccommodationSearchResponse, metadata *internalmetadata.Metadata, loadTestData [][]string, mu sync.Mutex) { + var data []string var entries []struct { Key string Value int64 @@ -101,10 +134,56 @@ func main() { return entries[i].Value < entries[j].Value }) lastValue := int64(0) + data = append(data, strconv.FormatInt(counter+1, 10)) + data = append(data, strconv.FormatInt(totalTime, 10)) for _, entry := range entries { - fmt.Printf("%s|%s|%d|%d|%f\n", entry.Key, resp.Metadata.SearchId, entry.Value, entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime.Milliseconds())) + + if entry.Key == "request-gateway-request" { + lastValue = entry.Value + continue //skip + } + if entry.Key == "processor-request" { + + //lastValue = entry.Value + continue //skip + } + fmt.Printf("%d|%s|%s|%d|%.2f\n", entry.Value, entry.Key, resp.Metadata.SearchId.GetValue(), entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime)) + + data = append(data, strconv.FormatInt(entry.Value-lastValue, 10)) lastValue = entry.Value } - c.Shutdown() + mu.Lock() + loadTestData[counter] = data + mu.Unlock() +} +func persistToCSV(dataset [][]string) { + // Open a new CSV file + file, err := os.Create("load_test_data.csv") + if err != nil { + fmt.Println("Error creating CSV file:", err) + return + } + defer file.Close() + + // Create a CSV writer + writer := csv.NewWriter(file) + defer writer.Flush() + + // Write the header row + header := []string{"Request ID", "Total Time", "distributor -> matrix", "matrix -> provider", "provider -> matrix", "matrix -> distributor", "process-response"} + if err := writer.Write(header); err != nil { + fmt.Println("Error writing header:", err) + return + } + + // Write the load test data rows + for _, dataRow := range dataset { + if err := writer.Write(dataRow); err != nil { + fmt.Println("Error writing data row:", err) + return + } + } + + fmt.Println("CSV file created successfully.") } diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 0db34e5c..148f156e 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -79,9 +79,8 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { if !completed { return // partial messages are not passed down to the msgChannel } - fmt.Printf("received-message: |%s|%d\n", completeMsg.Metadata.RequestID, t.UnixMilli()) - completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s", m.Checkpoint(), "received"), t.UnixMilli()) - completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "assembled")) + completeMsg.Metadata.StampOn(fmt.Sprintf("matrix-sent-%s", completeMsg.MsgType), evt.Timestamp) + completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s-%s", m.Checkpoint(), "received", completeMsg.MsgType), t.UnixMilli()) m.mu.Lock() m.msgChannel <- messaging.Message{ diff --git a/scripts/printReport.sh b/scripts/printReport.sh new file mode 100755 index 00000000..37fa31ca --- /dev/null +++ b/scripts/printReport.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Usage: $0 " + exit 1 +fi + +filename=$1 + +if [ ! -f "$filename" ]; then + echo "File not found: $filename" + exit 1 +fi + +# Read data from the file into an array +mapfile -t data < "$filename" + +# Function to calculate the average of an array +calculate_average() { + local sum=0 + local count=${#data[@]} + for value in "${data[@]}"; do + sum=$((sum + value)) + done + echo "scale=2; $sum / $count" | bc +} + +# Sort the array +sorted_data=($(for i in "${data[@]}"; do echo $i; done | sort -n)) + +# Calculate min, max, median, and average +min=${sorted_data[0]} +max=${sorted_data[-1]} +median=${sorted_data[${#sorted_data[@]}/2]} +average=$(calculate_average) +total=${#data[@]} +# Print the results +echo "Min: $min" +echo "Max: $max" +echo "Median: $median" +echo "Average: $average" +echo "Total: $total" diff --git a/scripts/sendXRequests.sh b/scripts/sendXRequests.sh index 3a12edb7..9e1a4849 100755 --- a/scripts/sendXRequests.sh +++ b/scripts/sendXRequests.sh @@ -16,13 +16,4 @@ times_to_run=$1 # Change the path to your Go file below go_file_path="examples/rpc/client.go" - -# Loop to run the Go file X times in parallel -for ((i=1; i<=$times_to_run; i++)) -do -# echo "Sending $i request..." - go run $go_file_path & -done - -# Wait for all background processes to finish -wait \ No newline at end of file +go run $go_file_path $times_to_run From 43088b852fc316756f6343b5765286fbf5acbfea Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Tue, 19 Dec 2023 15:38:00 +0200 Subject: [PATCH 5/5] Introduce own e2ee encryption using RSA key-pairs and symmetric AES key (envelope enc) - add public key repo and cache - hard-code public keys temporarily - increase room pool size to 10 - fix broken multi-chunk messages transmission --- config/config.go | 1 + config/flag_keys.go | 1 + config/flags.go | 1 + go.sum | 11 -- internal/app/app.go | 9 +- .../{matrix_compressor.go => compress.go} | 6 +- .../{matrix_decompressor.go => decompress.go} | 0 internal/matrix/encryption.go | 85 +++++++++++++++ internal/matrix/matrix_messenger.go | 102 +++++++++++++++--- internal/matrix/room_handler.go | 12 +-- internal/matrix/types.go | 8 +- internal/messaging/messenger.go | 8 +- utils/aes/aes.go | 102 ++++++++++++++++++ utils/rsa/rsa.go | 101 +++++++++++++++++ 14 files changed, 402 insertions(+), 45 deletions(-) rename internal/matrix/{matrix_compressor.go => compress.go} (93%) rename internal/matrix/{matrix_decompressor.go => decompress.go} (100%) create mode 100644 internal/matrix/encryption.go create mode 100644 utils/aes/aes.go create mode 100644 utils/rsa/rsa.go diff --git a/config/config.go b/config/config.go index f34656de..5c41f99a 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ type AppConfig struct { DeveloperMode bool `mapstructure:"developer_mode"` SupportedRequestTypes SupportedRequestTypesFlag `mapstructure:"supported_request_types"` BotMode uint `mapstructure:"bot_mode"` // 0 both, 1 request, 2 response + PrivateRSAFileKey string `mapstructure:"private_rsa_file_key"` } type MatrixConfig struct { Key string `mapstructure:"matrix_key"` // TODO @evlekht I'd suggest to add some parsed config, so we'll see on config read if some fields are invalid diff --git a/config/flag_keys.go b/config/flag_keys.go index e8dd9cea..cd871dbd 100644 --- a/config/flag_keys.go +++ b/config/flag_keys.go @@ -16,4 +16,5 @@ const ( MessengerTimeoutKey = "messenger_timeout" SupportedRequestTypesKey = "supported_request_types" BotModeKey = "bot_mode" + PrivateRSAKey = "private_rsa_key" ) diff --git a/config/flags.go b/config/flags.go index 2e82cd68..102e7131 100644 --- a/config/flags.go +++ b/config/flags.go @@ -6,6 +6,7 @@ func readAppConfig(cfg AppConfig, fs *flag.FlagSet) { fs.BoolVar(&cfg.DeveloperMode, DeveloperMode, false, "Sets developer mode") fs.Var(&cfg.SupportedRequestTypes, SupportedRequestTypesKey, "The list of supported request types") fs.UintVar(&cfg.BotMode, BotModeKey, 0, "The bot mode") + fs.StringVar(&cfg.PrivateRSAFileKey, PrivateRSAKey, "", "The private RSA key file") flag.Parse() } diff --git a/go.sum b/go.sum index fefda482..9f0dc933 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,6 @@ -buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2 h1:Yy0x91aZhzQOikR33x5eEIFEWS1TZzuzRc+LP8NuCgQ= -buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2/go.mod h1:xDIPwKMomacOmFbzRICgdUP/gpjEoetNVYVTVr29H0k= -buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2 h1:ykl0rTU4nNvPtJRm2lqOCRpgOd93RSt3ev8hNkbozDE= -buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2/go.mod h1:tKtDR8xG+DIFkSv8PiW1YM64GxJ/44n3UfZAN+5jfJ8= buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231220001345-2dbff1450b98.2 h1:E1OG6V0s//gFBHK/aAniN4Cb2l/QFYsKdTT11Ymgh6g= buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231220001345-2dbff1450b98.2/go.mod h1:6OlE1AqRT7EzKZ6ukFLo6Qmf7iv4I59YUlYkiJFxly8= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231211091155-5467620e05ed.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231214132539-21b35d953f3d.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231220001345-2dbff1450b98.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2 h1:8HbCQyMVfu/+Spx4yOPwWThwJpr0JELRxJgt8Kdoso4= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2 h1:HTcdQrjEKtCEizgMVc1kmNtsGSQQ04WTh7fUNEuqCFE= -buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231220001345-2dbff1450b98.2 h1:Wne/F/pUbrMAIQ874Akd5nXxoXM2tjzM14PdimMB3X8= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231220001345-2dbff1450b98.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -668,7 +658,6 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/internal/app/app.go b/internal/app/app.go index 71465ef8..15682642 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + rsa_util "github.com/chain4travel/camino-messenger-bot/utils/rsa" "github.com/chain4travel/camino-messenger-bot/config" "github.com/chain4travel/camino-messenger-bot/internal/matrix" @@ -60,8 +61,12 @@ func (a *App) Run(ctx context.Context) error { a.logger.Error("Invalid bot mode") return nil } - - messenger := matrix.NewMessenger(&a.cfg.MatrixConfig, a.logger) + privateRSAKey, err := rsa_util.ParseRSAPrivateKeyFromFile(a.cfg.PrivateRSAFileKey) + if err != nil { + a.logger.Error("Error while parsing private RSA key") + return nil + } + messenger := matrix.NewMessenger(&a.cfg.MatrixConfig, a.logger, privateRSAKey) userIDUpdated := make(chan string) // Channel to pass the userID g.Go(func() error { a.logger.Infof("Starting message receiver with botmode %d ...", a.cfg.BotMode) diff --git a/internal/matrix/matrix_compressor.go b/internal/matrix/compress.go similarity index 93% rename from internal/matrix/matrix_compressor.go rename to internal/matrix/compress.go index bc50865a..3154174e 100644 --- a/internal/matrix/matrix_compressor.go +++ b/internal/matrix/compress.go @@ -7,7 +7,6 @@ package matrix import ( "fmt" - "github.com/chain4travel/camino-messenger-bot/internal/compression" "github.com/chain4travel/camino-messenger-bot/internal/messaging" "github.com/chain4travel/camino-messenger-bot/internal/metadata" @@ -21,6 +20,9 @@ func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessa bytes []byte err error ) + if err != nil { + return nil, err + } switch msg.Type.Category() { case messaging.Request, messaging.Response: @@ -53,7 +55,7 @@ func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessa for i, chunk := range splitCompressedContent[1:] { messages = append(messages, CaminoMatrixMessage{ MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)}, - Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)}, + Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, Recipient: msg.Metadata.Recipient, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)}, CompressedContent: chunk, }) } diff --git a/internal/matrix/matrix_decompressor.go b/internal/matrix/decompress.go similarity index 100% rename from internal/matrix/matrix_decompressor.go rename to internal/matrix/decompress.go diff --git a/internal/matrix/encryption.go b/internal/matrix/encryption.go new file mode 100644 index 00000000..649e8103 --- /dev/null +++ b/internal/matrix/encryption.go @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package matrix + +import ( + "crypto/rsa" + "encoding/base64" + "fmt" + aes_util "github.com/chain4travel/camino-messenger-bot/utils/aes" + rsa_util "github.com/chain4travel/camino-messenger-bot/utils/rsa" + "sync" +) + +type EncryptionKeyRepository struct { + pubKeyCache map[string]*rsa.PublicKey + symmetricKeyCache map[string][]byte + mu sync.Mutex +} + +func NewEncryptionKeyRepository() *EncryptionKeyRepository { + return &EncryptionKeyRepository{pubKeyCache: make(map[string]*rsa.PublicKey), symmetricKeyCache: make(map[string][]byte)} +} +func (p *EncryptionKeyRepository) getPublicKeyForRecipient(recipient string) (*rsa.PublicKey, error) { + pkey := p.fetchKeyFromCache(recipient) + if pkey != nil { + return pkey, nil + } + var encodedPubKey string + //TODO for now it's all keys are hardcoded. Later we need to get the keys from the key server + switch recipient { + case "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network": + encodedPubKey = "LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JSUJpZ0tDQVlFQWpwR2R4NlloMkJnR3V2QUdQQTZiMnAzd1dBWnZYalQvWTBQR1ViRlR0U1lVY1hydEc1eEIKcy9DYVo4NGtmTEpHRnhEV3d3d3E4bzRoNHBzOGd0aHJkaG5QMUFIOEtGRFFCTzJNbDY5ZmFZYWd4ajdtVUxnSQpqTWIzUEorVjQzZUQrRktHZis2R0E5aHpXd09RZDhvVWdZVnhSQ2xMU0tMYi82WXFqaU81LzFxK3plTWowZWF2CmJQY2VsK2V6UVlpQmE3UzNJcHFGUFdhL0N0TTd3Qi90UWI2MnAzWFRkU0pnenR1SlJpTU5MeFI2NFU3WWlHcGsKR0VYSjFyd2lPZFhPMjJaRyt4UkxmOEl3ZFF2dEUxR1VnL1llTEtSOWd5blI5WTNiZzA5UWErRkpYQ1FrTjVHUApJY2E0S2R4UGpjQ0xHTklGVlVSTnNkTjFrZnJzcXpLTXNQOVgwQkFUMHNWYTk3WTd5RnAxUTFKTmU4Uy96T1FWCm9XOHJpSVFvWGRqSDNES2Q3cERQekN2TEpQRm50dzF5YWRUZ1pLbGs5Y21tT0dDbXh5SUZwMW5mTXk1R1FDM20KS1AxZ2NIV3J5UmFBcG4reG9BSFdIcHErcVNicmpka0h2MEt1MDRaMTRYcWhaK2Ezc3FtM3oreWpNYTF2OExDUApvK2I3OFI4OGpqVDFBZ01CQUFFPQotLS0tLUVORCBSU0EgUFVCTElDIEtFWS0tLS0tCg==" + case "@t-kopernikus15ss2mvy86h0hcwdhfukdx7y3cvuwdqxm6a0aqm:matrix.camino.network": + encodedPubKey = "LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JSUJpZ0tDQVlFQXI5a1RrWHkyNWlIaTNhai9ib2VER3VFTmNJZ1dqVmlYRHVmUFJUQ1FSV0I0TEt1eCtaWXoKaElWckdPb2l6eDNoR29NTnMwOUlvODFzb2wyd3crQ0tENUtTYTVHVHNJelh6ZytGcEErTmsrcDFOOGlsWFVINQp4d2NFTlRBclVCK2Y0SmU4Vkl0dEc5ZVhHQW9aQ1RYc2FTRWNmVG1Lc24vVUdsSHVQdGs5WHVpTlNTb3k0ZTMvCnpGcGFjZngyaVR6TVJOMjc1Ky9aZjllZ3RtSnVXS0JKcnNOcC9iQ245Q2ErcURheDNMTmJpdG55TUF2eG5rUmgKTWZrQ1lHTHZmSkFoRVVlSVJUUkRLT0xCSEtRVFJpQ1F4SHlXSVVEQkswbkZtbkt5Ti80RUI1RWkzTkg4RkpFKwpaK1NobmExdmlkdWV0R2NtMjhKRFRweXhGRStyZXZQWWs3aXVJZGF3VEZtTUlabkRrTnpRRkxlSStHaXFPN2JNCkRlT0NSa2FBRDhnSkEzT29OeXBmUlRuaEMvVHFvMWk1VjZ1RlV5RU9LT3dvMHk4cEFCSmNTRzBoUVRxQUh3blAKZkZFLzI2REtsMzQzZ1oxV3lBa29QcUUyVk1ESklSVFVUcHhBR09IMk9qZDRnWjBJWk1QTks0RDYyMWk4V2NrZApNTlI5ZEZRQW1mOS9BZ01CQUFFPQotLS0tLUVORCBSU0EgUFVCTElDIEtFWS0tLS0tCg==" + default: + return nil, fmt.Errorf("no public key found for recipient: %s", recipient) + } + pubKeyBytes, err := base64.StdEncoding.DecodeString(encodedPubKey) + if err != nil { + return nil, err + } + pKey, err := rsa_util.ParseRSAPublicKey(pubKeyBytes) + if err != nil { + return nil, err + } + p.cachePublicKey(recipient, pKey) + return pKey, nil +} + +func (p *EncryptionKeyRepository) cachePublicKey(recipient string, key *rsa.PublicKey) { + p.mu.Lock() + defer p.mu.Unlock() + p.pubKeyCache[recipient] = key +} + +func (p *EncryptionKeyRepository) fetchKeyFromCache(recipient string) *rsa.PublicKey { + p.mu.Lock() + defer p.mu.Unlock() + return p.pubKeyCache[recipient] +} + +func (p *EncryptionKeyRepository) getSymmetricKeyForRecipient(recipient string) []byte { + key := p.fetchSymmetricKeyFromCache(recipient) + if key != nil { + return key + } + key = aes_util.GenerateAESKey() + p.cacheSymmetricKey(recipient, key) + return key +} + +func (p *EncryptionKeyRepository) cacheSymmetricKey(recipient string, key []byte) { + p.mu.Lock() + defer p.mu.Unlock() + p.symmetricKeyCache[recipient] = key +} + +func (p *EncryptionKeyRepository) fetchSymmetricKeyFromCache(recipient string) []byte { + p.mu.Lock() + defer p.mu.Unlock() + return p.symmetricKeyCache[recipient] +} diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 148f156e..a9602e46 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -2,8 +2,11 @@ package matrix import ( "context" + "crypto/rsa" "errors" "fmt" + aes_util "github.com/chain4travel/camino-messenger-bot/utils/aes" + rsa_util "github.com/chain4travel/camino-messenger-bot/utils/rsa" "reflect" "sync" "time" @@ -34,27 +37,70 @@ type client struct { cryptoHelper *cryptohelper.CryptoHelper } type messenger struct { - msgChannel chan messaging.Message - cfg *config.MatrixConfig - logger *zap.SugaredLogger - client client - roomHandler RoomHandler - msgAssembler MessageAssembler - mu sync.Mutex + msgChannel chan messaging.Message + cfg *config.MatrixConfig + logger *zap.SugaredLogger + client client + roomHandler RoomHandler + msgAssembler MessageAssembler + mu sync.Mutex + privateRSAKey *rsa.PrivateKey + encryptionKeyRepository EncryptionKeyRepository } -func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) *messenger { +func (m *messenger) Encrypt(msg *CaminoMatrixMessage) error { + pubKey, err := m.encryptionKeyRepository.getPublicKeyForRecipient(msg.Metadata.Recipient) + if err != nil { + return err + } + + symmetricKey := m.encryptionKeyRepository.getSymmetricKeyForRecipient(msg.Metadata.Recipient) + // encrypt symmetric key with recipient's public key + msg.EncryptedSymmetricKey, err = rsa_util.EncryptWithPublicKey(symmetricKey, pubKey) + if err != nil { + return err + } + // encrypt message with symmetric key + encryptedCompressedContent, err := aes_util.Encrypt(msg.CompressedContent, symmetricKey) + if err != nil { + return err + } + msg.CompressedContent = nil + msg.EncryptedCompressedContent = encryptedCompressedContent + return nil +} + +func (m *messenger) Decrypt(msg *CaminoMatrixMessage) error { + // decrypt symmetric key with private key + symmetricKey, err := rsa_util.DecryptWithPrivateKey(msg.EncryptedSymmetricKey, m.privateRSAKey) + if err != nil { + return err + } + + m.encryptionKeyRepository.cacheSymmetricKey(msg.Metadata.Sender, symmetricKey) + // decrypt message with symmetric key + decryptedCompressedContent, err := aes_util.Decrypt(msg.EncryptedCompressedContent, symmetricKey) + if err != nil { + return err + } + msg.CompressedContent = decryptedCompressedContent + return nil +} + +func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger, privateRSAKey *rsa.PrivateKey) *messenger { c, err := mautrix.NewClient(cfg.Host, "", "") if err != nil { panic(err) } return &messenger{ - msgChannel: make(chan messaging.Message), - cfg: cfg, - logger: logger, - client: client{Client: c}, - roomHandler: NewRoomHandler(c, logger), - msgAssembler: NewMessageAssembler(logger), + msgChannel: make(chan messaging.Message), + cfg: cfg, + logger: logger, + client: client{Client: c}, + roomHandler: NewRoomHandler(c, logger), + msgAssembler: NewMessageAssembler(logger), + privateRSAKey: privateRSAKey, + encryptionKeyRepository: *NewEncryptionKeyRepository(), } } func (m *messenger) Checkpoint() string { @@ -68,9 +114,24 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { processCamMsg := func(source mautrix.EventSource, evt *event.Event) { msg := evt.Content.Parsed.(*CaminoMatrixMessage) - go func() { t := time.Now() + if msg.EncryptedSymmetricKey == nil { // if no symmetric key is provided, it should have been exchanged and cached already + key := m.encryptionKeyRepository.fetchSymmetricKeyFromCache(msg.Metadata.Sender) + if key == nil { + m.logger.Errorf("no symmetric key found for sender: %s [request-id:%s]", msg.Metadata.Sender, msg.Metadata.RequestID) + return + } else { + msg.EncryptedSymmetricKey = key + } + } + err := m.Decrypt(msg) + if err != nil { + m.logger.Errorf("failed to decrypt message: %v", err) + return + } + fmt.Printf("%d|decrypted-message|%s|%d\n", t.UnixMilli(), evt.ID.String(), time.Since(t).Milliseconds()) + t = time.Now() completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg) if err != nil { m.logger.Errorf("failed to assemble message: %v", err) @@ -82,6 +143,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { completeMsg.Metadata.StampOn(fmt.Sprintf("matrix-sent-%s", completeMsg.MsgType), evt.Timestamp) completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s-%s", m.Checkpoint(), "received", completeMsg.MsgType), t.UnixMilli()) + t = time.Now() m.mu.Lock() m.msgChannel <- messaging.Message{ Metadata: completeMsg.Metadata, @@ -107,7 +169,7 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite && !m.roomHandler.HasAlreadyJoined(id.UserID(evt.Sender.String()), evt.RoomID) { _, err := m.client.JoinRoomByID(evt.RoomID) if err == nil { - m.roomHandler.CacheRoom(id.UserID(evt.Sender.String()), evt.RoomID) // add room to cache + m.roomHandler.CacheRoom(id.UserID(evt.Sender.String()), evt.RoomID) // add room to pubKeyCache m.logger.Info("Joined room after invite", zap.String("room_id", evt.RoomID.String()), zap.String("inviter", evt.Sender.String())) @@ -200,7 +262,13 @@ func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error { func (m *messenger) sendMessageEvents(roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error { //TODO add retry logic? for _, msg := range messages { - _, err := m.client.SendMessageEvent(roomID, eventType, msg, mautrix.ReqSendEvent{TransactionID: msg.Metadata.RequestID}) + t := time.Now() + err := m.Encrypt(&msg) + if err != nil { + return err + } + fmt.Printf("%d|encrypted-message|%d\n", t.UnixMilli(), time.Since(t).Milliseconds()) + _, err = m.client.SendMessageEvent(roomID, eventType, msg) if err != nil { return err } diff --git a/internal/matrix/room_handler.go b/internal/matrix/room_handler.go index 1645323c..d6a852b3 100644 --- a/internal/matrix/room_handler.go +++ b/internal/matrix/room_handler.go @@ -8,7 +8,7 @@ import ( "sync" ) -const RoomPoolSize = 10 +const RoomPoolSize = 50 type RoomHandler interface { Init() @@ -34,12 +34,12 @@ func (r *roomHandler) Init() { return } - // cache all encrypted rooms + // pubKeyCache all encrypted rooms for _, roomID := range rooms.JoinedRooms { - r.logger.Debugf("Caching room %v | encrypted: %v", roomID, r.client.StateStore.IsEncrypted(roomID)) - if !r.client.StateStore.IsEncrypted(roomID) { + if r.client.StateStore.IsEncrypted(roomID) { continue } + r.logger.Debugf("Caching room %v | encrypted: %v", roomID, r.client.StateStore.IsEncrypted(roomID)) members, err := r.client.JoinedMembers(roomID) if err != nil { r.logger.Debugf("failed to fetch members for room %v", roomID) @@ -82,7 +82,7 @@ func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomI if err != nil { return "", err } else { - err = r.enableEncryptionForRoom(roomID) + //err = r.enableEncryptionForRoom(roomID) return roomID, err } } @@ -130,7 +130,7 @@ func (r *roomHandler) getEncryptedRoomForRecipient(recipient id.UserID) (id.Room createdRooms := 0 for _, roomID := range rooms.JoinedRooms { - if !r.client.StateStore.IsEncrypted(roomID) { + if r.client.StateStore.IsEncrypted(roomID) { continue } members, err := r.client.JoinedMembers(roomID) diff --git a/internal/matrix/types.go b/internal/matrix/types.go index ad434fe4..8ce69e20 100644 --- a/internal/matrix/types.go +++ b/internal/matrix/types.go @@ -10,9 +10,11 @@ import ( // CaminoMatrixMessage is a matrix-specific message format used for communication between the messenger and the service type CaminoMatrixMessage struct { event.MessageEventContent - Content messaging.MessageContent `json:"content"` - CompressedContent []byte `json:"compressed_content"` - Metadata metadata.Metadata `json:"metadata"` + Content messaging.MessageContent `json:"content"` + CompressedContent []byte `json:"compressed_content"` + EncryptedCompressedContent []byte `json:"encrypted_compressed_content"` + EncryptedSymmetricKey []byte `json:"encrypted_symmetric_key"` + Metadata metadata.Metadata `json:"metadata"` } type ByChunkIndex []CaminoMatrixMessage diff --git a/internal/messaging/messenger.go b/internal/messaging/messenger.go index d785cfb8..bb0241a8 100644 --- a/internal/messaging/messenger.go +++ b/internal/messaging/messenger.go @@ -12,8 +12,8 @@ type APIMessageResponse struct { } type Messenger interface { metadata.Checkpoint - StartReceiver(botMode uint) (string, error) // start receiving messages. Returns the user id - StopReceiver() error // stop receiving messages - SendAsync(ctx context.Context, m Message) error // asynchronous call (fire and forget) - Inbound() chan Message // channel where incoming messages are written + StartReceiver(botMode uint) (string, error) // start receiving messages. Returns the user id + StopReceiver() error // stop receiving messages + SendAsync(context.Context, Message) error // asynchronous call (fire and forget) + Inbound() chan Message // channel where incoming messages are written } diff --git a/utils/aes/aes.go b/utils/aes/aes.go new file mode 100644 index 00000000..10332120 --- /dev/null +++ b/utils/aes/aes.go @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package aes_util + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "fmt" +) + +func main() { + // Generate a random AES key + key := GenerateAESKey() + + // The data you want to encrypt + data := []byte("This is a large byte array that needs to be encrypted.") + + // Encrypt the data + encryptedData, err := Encrypt(data, key) + if err != nil { + fmt.Println("Encryption error:", err) + return + } + + fmt.Println("Encrypted data:", encryptedData) + + // Decrypt the data + decryptedData, err := Decrypt(encryptedData, key) + if err != nil { + fmt.Println("Decryption error:", err) + return + } + + fmt.Println("Decrypted data:", string(decryptedData)) +} + +// GenerateAESKey generates a random AES key. +func GenerateAESKey() []byte { + key := make([]byte, 32) // AES-256 key size + _, err := rand.Read(key) + if err != nil { + panic("Error generating AES key: " + err.Error()) + } + return key +} + +// Encrypt encrypts data using AES. +func Encrypt(data, key []byte) ([]byte, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + // Create a new AES cipher block mode + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + + // Generate a random nonce + nonce := make([]byte, gcm.NonceSize()) + if _, err = rand.Read(nonce); err != nil { + return nil, err + } + + // Encrypt the data + ciphertext := gcm.Seal(nil, nonce, data, nil) + return append(nonce, ciphertext...), nil +} + +// Decrypt decrypts data using AES. +func Decrypt(ciphertext, key []byte) ([]byte, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + // Create a new AES cipher block mode + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + + // Extract the nonce from the ciphertext + nonceSize := gcm.NonceSize() + if len(ciphertext) < nonceSize { + return nil, fmt.Errorf("Ciphertext is too short") + } + nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] + + // Decrypt the data + plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) + if err != nil { + return nil, err + } + + return plaintext, nil +} diff --git a/utils/rsa/rsa.go b/utils/rsa/rsa.go new file mode 100644 index 00000000..6d210a23 --- /dev/null +++ b/utils/rsa/rsa.go @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package rsa_util + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/sha512" + "crypto/x509" + "encoding/base64" + "encoding/pem" + "fmt" + "os" +) + +func main() { + // Replace these file paths with your own private and public key file paths + privateKeyFilePath := "private_key2.pem" + publicKeyFilePath := "public_key2.pem" + + // Read private key + _, err := ParseRSAPrivateKeyFromFile(privateKeyFilePath) + if err != nil { + fmt.Printf("Error reading private key file: %v\n", err) + return + } + + publicKeyBytes, err := os.ReadFile(publicKeyFilePath) + if err != nil { + fmt.Printf("Error reading public key file: %v\n", err) + return + } + + // Use the keys as needed + //fmt.Println("Private Key:", privateKey) + + b := make([]byte, base64.StdEncoding.EncodedLen(len(publicKeyBytes))) + base64.StdEncoding.Encode(b, publicKeyBytes) + fmt.Println("Public Key:", string(b)) + dec := make([]byte, base64.StdEncoding.DecodedLen(len(b))) + base64.StdEncoding.Decode(dec, b) + fmt.Println("Public Key:", string(dec)) + + p, err := ParseRSAPublicKey(dec) + if err != nil { + fmt.Printf("Error parsing public key: %v\n", err) + return + } + fmt.Println("Public Key:", p) +} + +func ParseRSAPrivateKeyFromFile(keyFilePath string) (*rsa.PrivateKey, error) { + privateKeyBytes, err := os.ReadFile(keyFilePath) + if err != nil { + fmt.Printf("Error reading private key file: %v\n", err) + return nil, err + } + return ParseRSAPrivateKey(privateKeyBytes) +} +func ParseRSAPrivateKey(keyBytes []byte) (*rsa.PrivateKey, error) { + block, _ := pem.Decode(keyBytes) + if block == nil || block.Type != "RSA PRIVATE KEY" { + return nil, fmt.Errorf("failed to decode PEM block containing private key") + } + + privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %v", err) + } + + return privateKey, nil +} + +func ParseRSAPublicKey(keyBytes []byte) (*rsa.PublicKey, error) { + block, _ := pem.Decode(keyBytes) + if block == nil || block.Type != "RSA PUBLIC KEY" { + return nil, fmt.Errorf("failed to decode PEM block containing public key") + } + + publicKey, err := x509.ParsePKCS1PublicKey(block.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse public key: %v", err) + } + + return publicKey, nil +} + +// EncryptWithPublicKey encrypts data with public key +func EncryptWithPublicKey(msg []byte, pub *rsa.PublicKey) ([]byte, error) { + hash := sha512.New() + return rsa.EncryptOAEP(hash, rand.Reader, pub, msg, nil) +} + +// DecryptWithPrivateKey decrypts data with private key +func DecryptWithPrivateKey(ciphertext []byte, priv *rsa.PrivateKey) ([]byte, error) { + hash := sha512.New() + return rsa.DecryptOAEP(hash, rand.Reader, priv, ciphertext, nil) +}