Skip to content

Commit

Permalink
notify
Browse files Browse the repository at this point in the history
  • Loading branch information
paragor committed Aug 7, 2024
1 parent 8f2b54f commit 82e36a4
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 24 deletions.
13 changes: 9 additions & 4 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"
"github.com/paragor/todo/pkg/cron"
"github.com/paragor/todo/pkg/db"
"github.com/paragor/todo/pkg/events"
"github.com/paragor/todo/pkg/httpserver"
"github.com/paragor/todo/pkg/models"
"github.com/paragor/todo/pkg/service"
"github.com/paragor/todo/pkg/telegram"
"github.com/spf13/cobra"
Expand All @@ -19,12 +21,15 @@ var serverCmd = &cobra.Command{
Use: "server",
Short: "Run todolist server",
Run: func(cmd *cobra.Command, args []string) {
repo := db.NewInMemoryTasksRepository(cfg.Server.DatabaseFile)

runner := service.NewRunner()
runnable := []service.Runnable{
repo,
runnable := []service.Runnable{}
var repo models.Repository
{
originRepo := db.NewInMemoryTasksRepository(cfg.Server.DatabaseFile)
repo = events.NewSpyRepository(originRepo)
runnable = append(runnable, originRepo)
}

authConfig := &httpserver.AuthChainConfig{
AuthBaseConfig: nil,
AuthTelegramConfig: nil,
Expand Down
55 changes: 55 additions & 0 deletions pkg/events/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package events

import (
"github.com/google/uuid"
"github.com/paragor/todo/pkg/models"
"slices"
"sync"
)

var onDatabaseChangeSubscribers []DatabaseChangeSubscriber

type DatabaseChangeSubscriber interface {
OnDatabaseChange()
}

var m sync.Mutex

func RegisterOnDatabaseChangeSubscriber(subscriber DatabaseChangeSubscriber) {
m.Lock()
defer m.Unlock()
onDatabaseChangeSubscribers = append(onDatabaseChangeSubscribers, subscriber)
}
func UnRegisterOnDatabaseChangeSubscriber(subscriber DatabaseChangeSubscriber) {
m.Lock()
defer m.Unlock()
onDatabaseChangeSubscribers = slices.DeleteFunc(onDatabaseChangeSubscribers, func(existing DatabaseChangeSubscriber) bool {
return existing == subscriber
})
}

type spyRepository struct {
db models.Repository
}

func NewSpyRepository(db models.Repository) *spyRepository {
return &spyRepository{db: db}
}

func (s *spyRepository) Get(UUID uuid.UUID) (*models.Task, error) {
return s.db.Get(UUID)
}

func (s *spyRepository) Insert(t *models.Task) error {
err := s.db.Insert(t)
if err == nil {
for _, subscriber := range onDatabaseChangeSubscribers {
subscriber.OnDatabaseChange()
}
}
return err
}

func (s *spyRepository) All() ([]*models.Task, error) {
return s.db.All()
}
54 changes: 36 additions & 18 deletions pkg/telegram/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,66 @@ import (
"fmt"
"github.com/google/uuid"
"github.com/paragor/todo/pkg/cron"
"github.com/paragor/todo/pkg/events"
"github.com/paragor/todo/pkg/models"
"log"
"sync"
"time"
)

type Notifier struct {
notifyRefreshTime time.Duration
notifyState map[uuid.UUID]*cron.Cron
notifyErrChan chan error
db models.Repository
telegram *TelegramServer
notifyState map[uuid.UUID]*cron.Cron
notifyErrChan chan error
refreshErrChan chan error
db models.Repository
telegram *TelegramServer
m sync.Mutex
}

func newNotifier(refreshTime time.Duration, db models.Repository, telegram *TelegramServer) *Notifier {
func newNotifier(db models.Repository, telegram *TelegramServer) *Notifier {
return &Notifier{
notifyRefreshTime: refreshTime,
notifyState: map[uuid.UUID]*cron.Cron{},
notifyErrChan: make(chan error, 1000),
db: db,
telegram: telegram,
notifyState: map[uuid.UUID]*cron.Cron{},
notifyErrChan: make(chan error, 1000),
refreshErrChan: make(chan error, 1),
db: db,
telegram: telegram,
}
}

func (n *Notifier) start(ctx context.Context) error {
func (n *Notifier) Start(ctx context.Context) error {
err := n.refreshState()
if err != nil {
n.close()
return err
}
events.RegisterOnDatabaseChangeSubscriber(n)
defer events.UnRegisterOnDatabaseChangeSubscriber(n)
for {
err := n.refreshState()
if err != nil {
n.close()
return err
}
select {
case <-time.After(n.notifyRefreshTime):
case err := <-n.notifyErrChan:
if err != nil {
return fmt.Errorf("cron job return error: %w", err)
}
case err := <-n.refreshErrChan:
if err != nil {
return fmt.Errorf("refresh state return error: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}

func (n *Notifier) OnDatabaseChange() {
err := n.refreshState()
if err != nil {
n.notifyErrChan <- err
}
}

func (n *Notifier) refreshState() error {
n.m.Lock()
defer n.m.Unlock()
tasks, err := n.db.All()
if err != nil {
return fmt.Errorf("cant get task list: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/telegram/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func (t *TelegramServer) Start(ctx context.Context, stopper chan<- error) error
stopper <- fmt.Errorf("stop telegram")
}()

notifier := newNotifier(time.Second*63, t.db, t)
notifier := newNotifier(t.db, t)
go func() {
err := notifier.start(ctx)
err := notifier.Start(ctx)
stopper <- fmt.Errorf("stop telegram.notifier: %w", err)
}()
go func() {
Expand Down

0 comments on commit 82e36a4

Please sign in to comment.