Skip to content

Commit

Permalink
Deleyed deleter
Browse files Browse the repository at this point in the history
  • Loading branch information
berejant committed Nov 12, 2023
1 parent 3aa4041 commit 179f234
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 27 deletions.
3 changes: 3 additions & 0 deletions ClientControllerInterface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package framework

import (
delayedDeleterContracts "github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts"
"github.com/kneu-messenger-pigeon/events"
scoreApi "github.com/kneu-messenger-pigeon/score-api"
)

type ClientControllerInterface interface {
ExecutableInterface
delayedDeleterContracts.DeleteHandlerInterface

ScoreChangedAction(chatId string, previousMessageId string, disciplineScore *scoreApi.DisciplineScore, previousScore *scoreApi.Score) (err error, messageId string)
WelcomeAuthorizedAction(event *events.UserAuthorizedEvent) error
LogoutFinishedAction(event *events.UserAuthorizedEvent) error
Expand Down
3 changes: 2 additions & 1 deletion Executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ func (executor *Executor) Execute() {

wg := &sync.WaitGroup{}

wg.Add(3)
wg.Add(4)
go executor.serviceContainer.ClientController.Execute(ctx, wg)
go executor.serviceContainer.UserAuthorizedEventProcessor.Execute(ctx, wg)
go executor.serviceContainer.UserCountMetricsSyncer.Execute(ctx, wg)
go executor.serviceContainer.WelcomeAnonymousDelayedDeleter.Execute(ctx, wg)

wg.Add(len(executor.serviceContainer.ScoreChangedEventProcessorPool))
for _, scoreChangedEventProcessor := range executor.serviceContainer.ScoreChangedEventProcessorPool {
Expand Down
4 changes: 4 additions & 0 deletions Executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func TestEventLoopExecute(t *testing.T) {
userCountMetricsSyncer := mocks.NewUserCountMetricsSyncerInterface(t)
userCountMetricsSyncer.On("Execute", matchContext, matchWaitGroup).Return().Times(1)

deleter := mocks.NewDeleterInterface(t)
deleter.On("Execute", matchContext, matchWaitGroup).Return().Times(1)

redisClient, redisMock := redismock.NewClientMock()
redisMock.MatchExpectationsInOrder(true)

Expand All @@ -49,6 +52,7 @@ func TestEventLoopExecute(t *testing.T) {
ScoreChangedEventProcessorPool: processorPool,
ClientController: clientController,
UserCountMetricsSyncer: userCountMetricsSyncer,
WelcomeAnonymousDelayedDeleter: deleter,
},
}

Expand Down
45 changes: 21 additions & 24 deletions ScoreChangedEventHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,8 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) {
stateStorage := mocks.NewScoreChangedStateStorageInterface(t)
stateStorage.On("Get", event.StudentId, event.LessonId).Once().Return(previousState)

clientController := mocks.NewClientControllerInterface(t)

handler := ScoreChangedEventHandler{
out: &bytes.Buffer{},
debugLogger: &DebugLogger{},
Expand All @@ -624,15 +626,12 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) {
scoreChangedEventComposer: scoreChangeEventComposer,
scoreChangedStateStorage: stateStorage,
scoreChangedMessageIdStorage: mocks.NewScoreChangedMessageIdStorageInterface(t),
serviceContainer: &ServiceContainer{},
serviceContainer: &ServiceContainer{
ClientController: clientController,
},
}

clientController := mocks.NewClientControllerInterface(t)
var err error

handler.serviceContainer.SetController(clientController)

err = handler.Handle(&event)
err := handler.Handle(&event)
assert.NoError(t, err)
runtime.Gosched()
time.Sleep(time.Millisecond * 40)
Expand Down Expand Up @@ -754,6 +753,17 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) {
Addr: miniredis.RunT(t).Addr(),
})

firstMessageSendingWait := make(chan time.Time)

clientController := mocks.NewClientControllerInterface(t)
clientController.On("ScoreChangedAction", chatIds[0], "", &disciplineScore1, &previousScore).
Once().WaitUntil(firstMessageSendingWait).
Return(nil, createdMessageIds[0])

clientController.On("ScoreChangedAction", chatIds[1], "", &disciplineScore1, &previousScore).
Once().WaitUntil(firstMessageSendingWait).
Return(nil, createdMessageIds[1])

handler := ScoreChangedEventHandler{
out: &bytes.Buffer{},
debugLogger: &DebugLogger{},
Expand All @@ -773,25 +783,12 @@ func TestScoreChangedEventHandler_Handle(t *testing.T) {
redis: redisClient,
storageExpire: time.Minute,
},
serviceContainer: &ServiceContainer{},
serviceContainer: &ServiceContainer{
ClientController: clientController,
},
}

firstMessageSendingWait := make(chan time.Time)

clientController := mocks.NewClientControllerInterface(t)
clientController.On("ScoreChangedAction", chatIds[0], "", &disciplineScore1, &previousScore).
Once().WaitUntil(firstMessageSendingWait).
Return(nil, createdMessageIds[0])

clientController.On("ScoreChangedAction", chatIds[1], "", &disciplineScore1, &previousScore).
Once().WaitUntil(firstMessageSendingWait).
Return(nil, createdMessageIds[1])

var err error

handler.serviceContainer.SetController(clientController)

err = handler.Handle(&event1)
err := handler.Handle(&event1)
assert.NoError(t, err)
runtime.Gosched()
time.Sleep(time.Millisecond * 40)
Expand Down
8 changes: 8 additions & 0 deletions ServiceContainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package framework

import (
"github.com/kneu-messenger-pigeon/authorizer-client"
"github.com/kneu-messenger-pigeon/client-framework/delayedDeleter"
"github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts"
"github.com/kneu-messenger-pigeon/events"
"github.com/kneu-messenger-pigeon/score-client"
"github.com/redis/go-redis/v9"
Expand All @@ -20,6 +22,7 @@ type ServiceContainer struct {
ScoreClient *score.Client
UserAuthorizedEventProcessor KafkaConsumerProcessorInterface
ScoreChangedEventProcessorPool [ScoreChangedEventProcessorCount]KafkaConsumerProcessorInterface
WelcomeAnonymousDelayedDeleter contracts.DeleterInterface
Executor *Executor
ClientController ClientControllerInterface
UserCountMetricsSyncer UserCountMetricsSyncerInterface
Expand Down Expand Up @@ -134,6 +137,10 @@ func NewServiceContainer(config BaseConfig, out io.Writer) *ServiceContainer {
}
}

container.WelcomeAnonymousDelayedDeleter = delayedDeleter.NewWelcomeAnonymousMessageDelayedDeleter(
redisClient, out, "welcome_anonymous_message",
)

container.Executor = &Executor{
out: out,
serviceContainer: container,
Expand All @@ -144,4 +151,5 @@ func NewServiceContainer(config BaseConfig, out io.Writer) *ServiceContainer {

func (container *ServiceContainer) SetController(controller ClientControllerInterface) {
container.ClientController = controller
container.WelcomeAnonymousDelayedDeleter.SetHandler(controller)
}
15 changes: 13 additions & 2 deletions ServiceContainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package framework

import (
"bytes"
"github.com/kneu-messenger-pigeon/client-framework/delayedDeleter"
"github.com/kneu-messenger-pigeon/client-framework/mocks"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
Expand All @@ -10,7 +11,7 @@ import (
)

func TestNewServiceContainer(t *testing.T) {
t.Run("succeess", func(t *testing.T) {
t.Run("success", func(t *testing.T) {
out := &bytes.Buffer{}
config := BaseConfig{
clientName: "test-client",
Expand Down Expand Up @@ -113,6 +114,9 @@ func TestNewServiceContainer(t *testing.T) {
assert.Equal(t, out, scoreChangedMessageIdStorage.out)
}

assert.NotNil(t, serviceContainer.WelcomeAnonymousDelayedDeleter)
assert.IsType(t, &delayedDeleter.Deleter{}, serviceContainer.WelcomeAnonymousDelayedDeleter)

assert.NotNil(t, serviceContainer.Executor)
assert.IsType(t, &Executor{}, serviceContainer.Executor)
assert.Equal(t, serviceContainer, serviceContainer.Executor.serviceContainer)
Expand All @@ -123,8 +127,15 @@ func TestNewServiceContainer(t *testing.T) {
}

func TestServiceContainer_SetController(t *testing.T) {
serviceContainer := &ServiceContainer{}
controller := mocks.NewClientControllerInterface(t)

deleter := mocks.NewDeleterInterface(t)
deleter.On("SetHandler", controller).Return()

serviceContainer := &ServiceContainer{
WelcomeAnonymousDelayedDeleter: deleter,
}

serviceContainer.SetController(controller)
assert.Equal(t, controller, serviceContainer.ClientController)
}
103 changes: 103 additions & 0 deletions delayedDeleter/Deleter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package delayedDeleter

import (
"context"
"fmt"
"github.com/kneu-messenger-pigeon/client-framework/delayedDeleter/contracts"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"io"
"sync"
"time"
)

const defaultWaitingTimeout = time.Minute

type Deleter struct {
handler contracts.DeleteHandlerInterface
redis redis.UniversalClient
out io.Writer
queueName string
waitingTimeout time.Duration
}

func NewWelcomeAnonymousMessageDelayedDeleter(redis redis.UniversalClient, out io.Writer, name string) *Deleter {
return &Deleter{
redis: redis,
out: out,
queueName: "deleter_queue_" + name,
waitingTimeout: defaultWaitingTimeout,
}
}

func (deleter *Deleter) AddToQueue(messageDeleteTask *contracts.DeleteTask) {
taskSerialized, _ := proto.Marshal(messageDeleteTask)
err := deleter.redis.LPush(context.Background(), deleter.queueName, taskSerialized).Err()
deleter.logError("failed write task to redis queue: ", err)
}

func (deleter *Deleter) SetHandler(handler contracts.DeleteHandlerInterface) {
deleter.handler = handler
}

func (deleter *Deleter) Execute(ctx context.Context, wg *sync.WaitGroup) {
var waitTime time.Duration
var now int64
var nextTask contracts.DeleteTask
var taskSerialized []byte
var nextTasksSerialized []string
var unmarshalErr error
var handlerErr error
var dequeueErr error

dequeue := func() {
dequeueErr = deleter.redis.RPop(context.Background(), deleter.queueName).Err()
deleter.logError("failed dequeue task: ", dequeueErr)
}

var readNextTask func()
readNextTask = func() {
nextTask.Reset()
nextTasksSerialized = deleter.redis.LRange(context.Background(), deleter.queueName, -1, -1).Val()
if len(nextTasksSerialized) > 0 {
taskSerialized = []byte(nextTasksSerialized[0])
unmarshalErr = proto.Unmarshal(taskSerialized, &nextTask)
if unmarshalErr != nil {
_, _ = fmt.Fprintln(deleter.out, "failed unmarshal task: ", unmarshalErr)
dequeue()
readNextTask()
}
}
}

for {
now = time.Now().Unix()
readNextTask()
for nextTask.GetScheduledAt() != 0 && nextTask.GetScheduledAt() <= now {
handlerErr = deleter.handler.HandleDeleteTask(&nextTask)
deleter.logError("handle delete message err: ", handlerErr)
dequeue()
readNextTask()
}

if nextTask.ScheduledAt > now {
waitTime = time.Duration(nextTask.GetScheduledAt()-now) * time.Second
} else {
waitTime = deleter.waitingTimeout
}

select {
case <-ctx.Done():

wg.Done()
return
case <-time.After(waitTime):
}
}
}

func (deleter *Deleter) logError(prefix string, err error) {
if err != nil {
_, _ = fmt.Fprintln(deleter.out, prefix, err)
}
}
Loading

0 comments on commit 179f234

Please sign in to comment.