From 179f234e7e9af42592b65e649e9792d964ad8ffd Mon Sep 17 00:00:00 2001 From: Anton Berezhnyi Date: Sun, 12 Nov 2023 22:48:47 +0200 Subject: [PATCH] Deleyed deleter --- ClientControllerInterface.go | 3 + Executor.go | 3 +- Executor_test.go | 4 + ScoreChangedEventHandler_test.go | 45 +++-- ServiceContainer.go | 8 + ServiceContainer_test.go | 15 +- delayedDeleter/Deleter.go | 103 +++++++++++ delayedDeleter/Deleter_test.go | 172 ++++++++++++++++++ .../contracts/DeleteHandlerInterface.go | 5 + delayedDeleter/contracts/DeleteTask.pb.go | 165 +++++++++++++++++ delayedDeleter/contracts/DeleteTask.proto | 12 ++ delayedDeleter/contracts/DeleterInterface.go | 12 ++ mocks/ClientControllerInterface.go | 15 ++ mocks/DeleteHandlerInterface.go | 42 +++++ mocks/DeleterInterface.go | 47 +++++ 15 files changed, 624 insertions(+), 27 deletions(-) create mode 100644 delayedDeleter/Deleter.go create mode 100644 delayedDeleter/Deleter_test.go create mode 100644 delayedDeleter/contracts/DeleteHandlerInterface.go create mode 100644 delayedDeleter/contracts/DeleteTask.pb.go create mode 100644 delayedDeleter/contracts/DeleteTask.proto create mode 100644 delayedDeleter/contracts/DeleterInterface.go create mode 100644 mocks/DeleteHandlerInterface.go create mode 100644 mocks/DeleterInterface.go diff --git a/ClientControllerInterface.go b/ClientControllerInterface.go index 962244e..dbf94c8 100644 --- a/ClientControllerInterface.go +++ b/ClientControllerInterface.go @@ -1,12 +1,15 @@ package framework import ( + delayedDeleterContracts "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" "github.com/kneu-messenger-pigeon/events" scoreApi "github.com/kneu-messenger-pigeon/score-api" ) type ClientControllerInterface interface { ExecutableInterface + delayedDeleterContracts.DeleteHandlerInterface + ScoreChangedAction(chatId string, previousMessageId string, disciplineScore *scoreApi.DisciplineScore, previousScore *scoreApi.Score) (err error, messageId string) WelcomeAuthorizedAction(event *events.UserAuthorizedEvent) error LogoutFinishedAction(event *events.UserAuthorizedEvent) error diff --git a/Executor.go b/Executor.go index 4a7b6ad..6007f5b 100644 --- a/Executor.go +++ b/Executor.go @@ -25,10 +25,11 @@ func (executor *Executor) Execute() { wg := &sync.WaitGroup{} - wg.Add(3) + wg.Add(4) go executor.serviceContainer.ClientController.Execute(ctx, wg) go executor.serviceContainer.UserAuthorizedEventProcessor.Execute(ctx, wg) go executor.serviceContainer.UserCountMetricsSyncer.Execute(ctx, wg) + go executor.serviceContainer.WelcomeAnonymousDelayedDeleter.Execute(ctx, wg) wg.Add(len(executor.serviceContainer.ScoreChangedEventProcessorPool)) for _, scoreChangedEventProcessor := range executor.serviceContainer.ScoreChangedEventProcessorPool { diff --git a/Executor_test.go b/Executor_test.go index caf22e5..a0a9beb 100644 --- a/Executor_test.go +++ b/Executor_test.go @@ -31,6 +31,9 @@ func TestEventLoopExecute(t *testing.T) { userCountMetricsSyncer := mocks.NewUserCountMetricsSyncerInterface(t) userCountMetricsSyncer.On("Execute", matchContext, matchWaitGroup).Return().Times(1) + deleter := mocks.NewDeleterInterface(t) + deleter.On("Execute", matchContext, matchWaitGroup).Return().Times(1) + redisClient, redisMock := redismock.NewClientMock() redisMock.MatchExpectationsInOrder(true) @@ -49,6 +52,7 @@ func TestEventLoopExecute(t *testing.T) { ScoreChangedEventProcessorPool: processorPool, ClientController: clientController, UserCountMetricsSyncer: userCountMetricsSyncer, + WelcomeAnonymousDelayedDeleter: deleter, }, } diff --git a/ScoreChangedEventHandler_test.go b/ScoreChangedEventHandler_test.go index 231ba09..471ae02 100644 --- a/ScoreChangedEventHandler_test.go +++ b/ScoreChangedEventHandler_test.go @@ -616,6 +616,8 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) { stateStorage := mocks.NewScoreChangedStateStorageInterface(t) stateStorage.On("Get", event.StudentId, event.LessonId).Once().Return(previousState) + clientController := mocks.NewClientControllerInterface(t) + handler := ScoreChangedEventHandler{ out: &bytes.Buffer{}, debugLogger: &DebugLogger{}, @@ -624,15 +626,12 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) { scoreChangedEventComposer: scoreChangeEventComposer, scoreChangedStateStorage: stateStorage, scoreChangedMessageIdStorage: mocks.NewScoreChangedMessageIdStorageInterface(t), - serviceContainer: &ServiceContainer{}, + serviceContainer: &ServiceContainer{ + ClientController: clientController, + }, } - clientController := mocks.NewClientControllerInterface(t) - var err error - - handler.serviceContainer.SetController(clientController) - - err = handler.Handle(&event) + err := handler.Handle(&event) assert.NoError(t, err) runtime.Gosched() time.Sleep(time.Millisecond * 40) @@ -754,6 +753,17 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) { Addr: miniredis.RunT(t).Addr(), }) + firstMessageSendingWait := make(chan time.Time) + + clientController := mocks.NewClientControllerInterface(t) + clientController.On("ScoreChangedAction", chatIds[0], "", &disciplineScore1, &previousScore). + Once().WaitUntil(firstMessageSendingWait). + Return(nil, createdMessageIds[0]) + + clientController.On("ScoreChangedAction", chatIds[1], "", &disciplineScore1, &previousScore). + Once().WaitUntil(firstMessageSendingWait). + Return(nil, createdMessageIds[1]) + handler := ScoreChangedEventHandler{ out: &bytes.Buffer{}, debugLogger: &DebugLogger{}, @@ -773,25 +783,12 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) { redis: redisClient, storageExpire: time.Minute, }, - serviceContainer: &ServiceContainer{}, + serviceContainer: &ServiceContainer{ + ClientController: clientController, + }, } - firstMessageSendingWait := make(chan time.Time) - - clientController := mocks.NewClientControllerInterface(t) - clientController.On("ScoreChangedAction", chatIds[0], "", &disciplineScore1, &previousScore). - Once().WaitUntil(firstMessageSendingWait). - Return(nil, createdMessageIds[0]) - - clientController.On("ScoreChangedAction", chatIds[1], "", &disciplineScore1, &previousScore). - Once().WaitUntil(firstMessageSendingWait). - Return(nil, createdMessageIds[1]) - - var err error - - handler.serviceContainer.SetController(clientController) - - err = handler.Handle(&event1) + err := handler.Handle(&event1) assert.NoError(t, err) runtime.Gosched() time.Sleep(time.Millisecond * 40) diff --git a/ServiceContainer.go b/ServiceContainer.go index 442c812..bcf660a 100644 --- a/ServiceContainer.go +++ b/ServiceContainer.go @@ -2,6 +2,8 @@ package framework import ( "github.com/kneu-messenger-pigeon/authorizer-client" + "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter" + "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" "github.com/kneu-messenger-pigeon/events" "github.com/kneu-messenger-pigeon/score-client" "github.com/redis/go-redis/v9" @@ -20,6 +22,7 @@ type ServiceContainer struct { ScoreClient *score.Client UserAuthorizedEventProcessor KafkaConsumerProcessorInterface ScoreChangedEventProcessorPool [ScoreChangedEventProcessorCount]KafkaConsumerProcessorInterface + WelcomeAnonymousDelayedDeleter contracts.DeleterInterface Executor *Executor ClientController ClientControllerInterface UserCountMetricsSyncer UserCountMetricsSyncerInterface @@ -134,6 +137,10 @@ func NewServiceContainer(config BaseConfig, out io.Writer) *ServiceContainer { } } + container.WelcomeAnonymousDelayedDeleter = delayedDeleter.NewWelcomeAnonymousMessageDelayedDeleter( + redisClient, out, "welcome_anonymous_message", + ) + container.Executor = &Executor{ out: out, serviceContainer: container, @@ -144,4 +151,5 @@ func NewServiceContainer(config BaseConfig, out io.Writer) *ServiceContainer { func (container *ServiceContainer) SetController(controller ClientControllerInterface) { container.ClientController = controller + container.WelcomeAnonymousDelayedDeleter.SetHandler(controller) } diff --git a/ServiceContainer_test.go b/ServiceContainer_test.go index 369c6ae..f91965c 100644 --- a/ServiceContainer_test.go +++ b/ServiceContainer_test.go @@ -2,6 +2,7 @@ package framework import ( "bytes" + "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter" "github.com/kneu-messenger-pigeon/client-framework/mocks" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" @@ -10,7 +11,7 @@ import ( ) func TestNewServiceContainer(t *testing.T) { - t.Run("succeess", func(t *testing.T) { + t.Run("success", func(t *testing.T) { out := &bytes.Buffer{} config := BaseConfig{ clientName: "test-client", @@ -113,6 +114,9 @@ func TestNewServiceContainer(t *testing.T) { assert.Equal(t, out, scoreChangedMessageIdStorage.out) } + assert.NotNil(t, serviceContainer.WelcomeAnonymousDelayedDeleter) + assert.IsType(t, &delayedDeleter.Deleter{}, serviceContainer.WelcomeAnonymousDelayedDeleter) + assert.NotNil(t, serviceContainer.Executor) assert.IsType(t, &Executor{}, serviceContainer.Executor) assert.Equal(t, serviceContainer, serviceContainer.Executor.serviceContainer) @@ -123,8 +127,15 @@ func TestNewServiceContainer(t *testing.T) { } func TestServiceContainer_SetController(t *testing.T) { - serviceContainer := &ServiceContainer{} controller := mocks.NewClientControllerInterface(t) + + deleter := mocks.NewDeleterInterface(t) + deleter.On("SetHandler", controller).Return() + + serviceContainer := &ServiceContainer{ + WelcomeAnonymousDelayedDeleter: deleter, + } + serviceContainer.SetController(controller) assert.Equal(t, controller, serviceContainer.ClientController) } diff --git a/delayedDeleter/Deleter.go b/delayedDeleter/Deleter.go new file mode 100644 index 0000000..433b325 --- /dev/null +++ b/delayedDeleter/Deleter.go @@ -0,0 +1,103 @@ +package delayedDeleter + +import ( + "context" + "fmt" + "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" + "github.com/redis/go-redis/v9" + "google.golang.org/protobuf/proto" + "io" + "sync" + "time" +) + +const defaultWaitingTimeout = time.Minute + +type Deleter struct { + handler contracts.DeleteHandlerInterface + redis redis.UniversalClient + out io.Writer + queueName string + waitingTimeout time.Duration +} + +func NewWelcomeAnonymousMessageDelayedDeleter(redis redis.UniversalClient, out io.Writer, name string) *Deleter { + return &Deleter{ + redis: redis, + out: out, + queueName: "deleter_queue_" + name, + waitingTimeout: defaultWaitingTimeout, + } +} + +func (deleter *Deleter) AddToQueue(messageDeleteTask *contracts.DeleteTask) { + taskSerialized, _ := proto.Marshal(messageDeleteTask) + err := deleter.redis.LPush(context.Background(), deleter.queueName, taskSerialized).Err() + deleter.logError("failed write task to redis queue: ", err) +} + +func (deleter *Deleter) SetHandler(handler contracts.DeleteHandlerInterface) { + deleter.handler = handler +} + +func (deleter *Deleter) Execute(ctx context.Context, wg *sync.WaitGroup) { + var waitTime time.Duration + var now int64 + var nextTask contracts.DeleteTask + var taskSerialized []byte + var nextTasksSerialized []string + var unmarshalErr error + var handlerErr error + var dequeueErr error + + dequeue := func() { + dequeueErr = deleter.redis.RPop(context.Background(), deleter.queueName).Err() + deleter.logError("failed dequeue task: ", dequeueErr) + } + + var readNextTask func() + readNextTask = func() { + nextTask.Reset() + nextTasksSerialized = deleter.redis.LRange(context.Background(), deleter.queueName, -1, -1).Val() + if len(nextTasksSerialized) > 0 { + taskSerialized = []byte(nextTasksSerialized[0]) + unmarshalErr = proto.Unmarshal(taskSerialized, &nextTask) + if unmarshalErr != nil { + _, _ = fmt.Fprintln(deleter.out, "failed unmarshal task: ", unmarshalErr) + dequeue() + readNextTask() + } + } + } + + for { + now = time.Now().Unix() + readNextTask() + for nextTask.GetScheduledAt() != 0 && nextTask.GetScheduledAt() <= now { + handlerErr = deleter.handler.HandleDeleteTask(&nextTask) + deleter.logError("handle delete message err: ", handlerErr) + dequeue() + readNextTask() + } + + if nextTask.ScheduledAt > now { + waitTime = time.Duration(nextTask.GetScheduledAt()-now) * time.Second + } else { + waitTime = deleter.waitingTimeout + } + + select { + case <-ctx.Done(): + + wg.Done() + return + case <-time.After(waitTime): + } + } +} + +func (deleter *Deleter) logError(prefix string, err error) { + if err != nil { + _, _ = fmt.Fprintln(deleter.out, prefix, err) + } +} diff --git a/delayedDeleter/Deleter_test.go b/delayedDeleter/Deleter_test.go new file mode 100644 index 0000000..f104831 --- /dev/null +++ b/delayedDeleter/Deleter_test.go @@ -0,0 +1,172 @@ +package delayedDeleter + +import ( + "bytes" + "context" + "errors" + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redismock/v9" + "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" + "github.com/kneu-messenger-pigeon/client-framework/mocks" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/protobuf/proto" + "sync" + "testing" + "time" +) + +func TestWelcomeAnonymousMessageDelayedDeleter(t *testing.T) { + matchTask := func(expectedTask *contracts.DeleteTask, actualTask *contracts.DeleteTask) bool { + if expectedTask == nil && actualTask == nil { + return true + } + + return expectedTask.ScheduledAt == actualTask.ScheduledAt && + expectedTask.MessageId == actualTask.MessageId && + expectedTask.ChatId == actualTask.ChatId + } + + t.Run("success", func(t *testing.T) { + task1 := contracts.DeleteTask{ + ScheduledAt: time.Now().Add(time.Second * 2).Unix(), + MessageId: 1, + ChatId: 2, + } + + task2 := contracts.DeleteTask{ + ScheduledAt: time.Now().Add(time.Second * 4).Unix(), + MessageId: 10, + ChatId: 20, + } + + task1FoundCount := 0 + task2FoundCount := 0 + taskNotFoundCount := 0 + + handler := mocks.NewDeleteHandlerInterface(t) + handler.On("HandleDeleteTask", mock.Anything).Times(2).Return( + func(task *contracts.DeleteTask) error { + if matchTask(&task1, task) { + task1FoundCount++ + return nil + } + + if matchTask(&task2, task) { + task2FoundCount++ + return nil + } + + taskNotFoundCount++ + return errors.New("unexpected task") + }, + ) + + out := &bytes.Buffer{} + redisClient := redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: miniredis.RunT(t).Addr(), + }) + + delayedDeleter := NewWelcomeAnonymousMessageDelayedDeleter(redisClient, out, "test") + delayedDeleter.waitingTimeout = time.Second + delayedDeleter.SetHandler(handler) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go delayedDeleter.Execute(ctx, wg) + + delayedDeleter.AddToQueue(&task1) + delayedDeleter.AddToQueue(&task2) + assert.Empty(t, out.String(), "No error should be printed") + + time.Sleep(time.Second * 5) + cancel() + + assert.Equal(t, 1, task1FoundCount, "Task 1 should be found once") + assert.Equal(t, 1, task2FoundCount, "Task 2 should be found once") + assert.Empty(t, taskNotFoundCount, "No unexpected tasks should be found") + assert.Empty(t, out.String(), "No error should be printed") + }) + + t.Run("errorUnmarshal", func(t *testing.T) { + task1 := contracts.DeleteTask{ + ScheduledAt: time.Now().Unix(), + MessageId: 30, + ChatId: 40, + } + + task2 := contracts.DeleteTask{ + ScheduledAt: time.Now().Unix(), + MessageId: 35, + ChatId: 45, + } + + handler := mocks.NewDeleteHandlerInterface(t) + handler.On("HandleDeleteTask", mock.Anything).Times(2).Return(nil) + + redisClient := redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: miniredis.RunT(t).Addr(), + }) + + out := &bytes.Buffer{} + delayedDeleter := NewWelcomeAnonymousMessageDelayedDeleter(redisClient, out, "test-errorDequeue") + delayedDeleter.waitingTimeout = time.Second + delayedDeleter.SetHandler(handler) + + err := redisClient.LPush(context.Background(), delayedDeleter.queueName, "serializations-errors").Err() + assert.NoError(t, err) + + delayedDeleter.AddToQueue(&task1) + delayedDeleter.AddToQueue(&task2) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go delayedDeleter.Execute(ctx, wg) + + time.Sleep(time.Second * 2) + cancel() + + assert.Contains(t, out.String(), "failed unmarshal task: ", "Error should be printed") + }) + + t.Run("errorDequeue", func(t *testing.T) { + expectedErr := errors.New("dequeue error") + task1 := contracts.DeleteTask{ + ScheduledAt: time.Now().Unix(), + MessageId: 50, + ChatId: 60, + } + task1Serialized, _ := proto.Marshal(&task1) + + handler := mocks.NewDeleteHandlerInterface(t) + handler.On("HandleDeleteTask", mock.Anything).Times(1).Return(nil) + + redisClient, redisMock := redismock.NewClientMock() + redisMock.MatchExpectationsInOrder(true) + + out := &bytes.Buffer{} + delayedDeleter := NewWelcomeAnonymousMessageDelayedDeleter(redisClient, out, "test-errorDequeue") + delayedDeleter.waitingTimeout = time.Second + delayedDeleter.SetHandler(handler) + + redisMock.ExpectLRange(delayedDeleter.queueName, -1, -1).SetVal([]string{ + string(task1Serialized), + }) + redisMock.ExpectRPop(delayedDeleter.queueName).SetErr(expectedErr) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go delayedDeleter.Execute(ctx, wg) + + time.Sleep(time.Second * 2) + cancel() + + assert.Contains(t, out.String(), "failed dequeue task: ", "Error should be printed") + }) +} diff --git a/delayedDeleter/contracts/DeleteHandlerInterface.go b/delayedDeleter/contracts/DeleteHandlerInterface.go new file mode 100644 index 0000000..ce497ae --- /dev/null +++ b/delayedDeleter/contracts/DeleteHandlerInterface.go @@ -0,0 +1,5 @@ +package contracts + +type DeleteHandlerInterface interface { + HandleDeleteTask(task *DeleteTask) error +} diff --git a/delayedDeleter/contracts/DeleteTask.pb.go b/delayedDeleter/contracts/DeleteTask.pb.go new file mode 100644 index 0000000..a9a05fd --- /dev/null +++ b/delayedDeleter/contracts/DeleteTask.pb.go @@ -0,0 +1,165 @@ +// protoc --go_out=delayedDeleter delayedDeleter/DeleteTask.proto + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: delayedDeleter/DeleteTask.proto + +package contracts + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type DeleteTask struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ScheduledAt int64 `protobuf:"varint,1,opt,name=scheduledAt,proto3" json:"scheduledAt,omitempty"` + MessageId int32 `protobuf:"varint,2,opt,name=MessageId,proto3" json:"MessageId,omitempty"` + ChatId int64 `protobuf:"varint,3,opt,name=ChatId,proto3" json:"ChatId,omitempty"` +} + +func (x *DeleteTask) Reset() { + *x = DeleteTask{} + if protoimpl.UnsafeEnabled { + mi := &file_delayedDeleter_DeleteTask_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DeleteTask) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteTask) ProtoMessage() {} + +func (x *DeleteTask) ProtoReflect() protoreflect.Message { + mi := &file_delayedDeleter_DeleteTask_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteTask.ProtoReflect.Descriptor instead. +func (*DeleteTask) Descriptor() ([]byte, []int) { + return file_delayedDeleter_DeleteTask_proto_rawDescGZIP(), []int{0} +} + +func (x *DeleteTask) GetScheduledAt() int64 { + if x != nil { + return x.ScheduledAt + } + return 0 +} + +func (x *DeleteTask) GetMessageId() int32 { + if x != nil { + return x.MessageId + } + return 0 +} + +func (x *DeleteTask) GetChatId() int64 { + if x != nil { + return x.ChatId + } + return 0 +} + +var File_delayedDeleter_DeleteTask_proto protoreflect.FileDescriptor + +var file_delayedDeleter_DeleteTask_proto_rawDesc = []byte{ + 0x0a, 0x1f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x72, + 0x2f, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x09, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x77, 0x6f, 0x72, 0x6b, 0x22, 0x64, 0x0a, 0x0a, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x0b, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1c, 0x0a, 0x09, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x09, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x43, 0x68, + 0x61, 0x74, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x43, 0x68, 0x61, 0x74, + 0x49, 0x64, 0x42, 0x13, 0x5a, 0x11, 0x2e, 0x2f, 0x3b, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_delayedDeleter_DeleteTask_proto_rawDescOnce sync.Once + file_delayedDeleter_DeleteTask_proto_rawDescData = file_delayedDeleter_DeleteTask_proto_rawDesc +) + +func file_delayedDeleter_DeleteTask_proto_rawDescGZIP() []byte { + file_delayedDeleter_DeleteTask_proto_rawDescOnce.Do(func() { + file_delayedDeleter_DeleteTask_proto_rawDescData = protoimpl.X.CompressGZIP(file_delayedDeleter_DeleteTask_proto_rawDescData) + }) + return file_delayedDeleter_DeleteTask_proto_rawDescData +} + +var file_delayedDeleter_DeleteTask_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_delayedDeleter_DeleteTask_proto_goTypes = []interface{}{ + (*DeleteTask)(nil), // 0: framework.DeleteTask +} +var file_delayedDeleter_DeleteTask_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_delayedDeleter_DeleteTask_proto_init() } +func file_delayedDeleter_DeleteTask_proto_init() { + if File_delayedDeleter_DeleteTask_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_delayedDeleter_DeleteTask_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DeleteTask); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_delayedDeleter_DeleteTask_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_delayedDeleter_DeleteTask_proto_goTypes, + DependencyIndexes: file_delayedDeleter_DeleteTask_proto_depIdxs, + MessageInfos: file_delayedDeleter_DeleteTask_proto_msgTypes, + }.Build() + File_delayedDeleter_DeleteTask_proto = out.File + file_delayedDeleter_DeleteTask_proto_rawDesc = nil + file_delayedDeleter_DeleteTask_proto_goTypes = nil + file_delayedDeleter_DeleteTask_proto_depIdxs = nil +} diff --git a/delayedDeleter/contracts/DeleteTask.proto b/delayedDeleter/contracts/DeleteTask.proto new file mode 100644 index 0000000..d8d5d04 --- /dev/null +++ b/delayedDeleter/contracts/DeleteTask.proto @@ -0,0 +1,12 @@ +// protoc --go_out=delayedDeleter delayedDeleter/DeleteTask.proto +syntax="proto3"; + +package framework; + +option go_package="./;delayedDeleter"; + +message DeleteTask { + int64 scheduledAt = 1; + int32 MessageId = 2; + int64 ChatId = 3; +} \ No newline at end of file diff --git a/delayedDeleter/contracts/DeleterInterface.go b/delayedDeleter/contracts/DeleterInterface.go new file mode 100644 index 0000000..42117fa --- /dev/null +++ b/delayedDeleter/contracts/DeleterInterface.go @@ -0,0 +1,12 @@ +package contracts + +import ( + "context" + "sync" +) + +type DeleterInterface interface { + AddToQueue(messageDeleteTask *DeleteTask) + Execute(ctx context.Context, wg *sync.WaitGroup) + SetHandler(handler DeleteHandlerInterface) +} diff --git a/mocks/ClientControllerInterface.go b/mocks/ClientControllerInterface.go index 2a801c2..acecfb8 100644 --- a/mocks/ClientControllerInterface.go +++ b/mocks/ClientControllerInterface.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + contracts "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" events "github.com/kneu-messenger-pigeon/events" mock "github.com/stretchr/testify/mock" @@ -24,6 +25,20 @@ func (_m *ClientControllerInterface) Execute(ctx context.Context, wg *sync.WaitG _m.Called(ctx, wg) } +// HandleDeleteTask provides a mock function with given fields: task +func (_m *ClientControllerInterface) HandleDeleteTask(task *contracts.DeleteTask) error { + ret := _m.Called(task) + + var r0 error + if rf, ok := ret.Get(0).(func(*contracts.DeleteTask) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // LogoutFinishedAction provides a mock function with given fields: event func (_m *ClientControllerInterface) LogoutFinishedAction(event *events.UserAuthorizedEvent) error { ret := _m.Called(event) diff --git a/mocks/DeleteHandlerInterface.go b/mocks/DeleteHandlerInterface.go new file mode 100644 index 0000000..08a4501 --- /dev/null +++ b/mocks/DeleteHandlerInterface.go @@ -0,0 +1,42 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + contracts "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" + mock "github.com/stretchr/testify/mock" +) + +// DeleteHandlerInterface is an autogenerated mock type for the DeleteHandlerInterface type +type DeleteHandlerInterface struct { + mock.Mock +} + +// HandleDeleteTask provides a mock function with given fields: task +func (_m *DeleteHandlerInterface) HandleDeleteTask(task *contracts.DeleteTask) error { + ret := _m.Called(task) + + var r0 error + if rf, ok := ret.Get(0).(func(*contracts.DeleteTask) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewDeleteHandlerInterface interface { + mock.TestingT + Cleanup(func()) +} + +// NewDeleteHandlerInterface creates a new instance of DeleteHandlerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewDeleteHandlerInterface(t mockConstructorTestingTNewDeleteHandlerInterface) *DeleteHandlerInterface { + mock := &DeleteHandlerInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/DeleterInterface.go b/mocks/DeleterInterface.go new file mode 100644 index 0000000..7e09cbb --- /dev/null +++ b/mocks/DeleterInterface.go @@ -0,0 +1,47 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + contracts "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts" + mock "github.com/stretchr/testify/mock" + + sync "sync" +) + +// DeleterInterface is an autogenerated mock type for the DeleterInterface type +type DeleterInterface struct { + mock.Mock +} + +// AddToQueue provides a mock function with given fields: messageDeleteTask +func (_m *DeleterInterface) AddToQueue(messageDeleteTask *contracts.DeleteTask) { + _m.Called(messageDeleteTask) +} + +// Execute provides a mock function with given fields: ctx, wg +func (_m *DeleterInterface) Execute(ctx context.Context, wg *sync.WaitGroup) { + _m.Called(ctx, wg) +} + +// SetHandler provides a mock function with given fields: handler +func (_m *DeleterInterface) SetHandler(handler contracts.DeleteHandlerInterface) { + _m.Called(handler) +} + +type mockConstructorTestingTNewDeleterInterface interface { + mock.TestingT + Cleanup(func()) +} + +// NewDeleterInterface creates a new instance of DeleterInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewDeleterInterface(t mockConstructorTestingTNewDeleterInterface) *DeleterInterface { + mock := &DeleterInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}