diff --git a/cmd/server.go b/cmd/server.go index c01b644..ba1cb1c 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -4,7 +4,9 @@ import ( "fmt" "github.com/paragor/todo/pkg/cron" "github.com/paragor/todo/pkg/db" + "github.com/paragor/todo/pkg/events" "github.com/paragor/todo/pkg/httpserver" + "github.com/paragor/todo/pkg/models" "github.com/paragor/todo/pkg/service" "github.com/paragor/todo/pkg/telegram" "github.com/spf13/cobra" @@ -19,12 +21,15 @@ var serverCmd = &cobra.Command{ Use: "server", Short: "Run todolist server", Run: func(cmd *cobra.Command, args []string) { - repo := db.NewInMemoryTasksRepository(cfg.Server.DatabaseFile) - runner := service.NewRunner() - runnable := []service.Runnable{ - repo, + runnable := []service.Runnable{} + var repo models.Repository + { + originRepo := db.NewInMemoryTasksRepository(cfg.Server.DatabaseFile) + repo = events.NewSpyRepository(originRepo) + runnable = append(runnable, originRepo) } + authConfig := &httpserver.AuthChainConfig{ AuthBaseConfig: nil, AuthTelegramConfig: nil, diff --git a/pkg/events/memory.go b/pkg/events/memory.go new file mode 100644 index 0000000..741849d --- /dev/null +++ b/pkg/events/memory.go @@ -0,0 +1,55 @@ +package events + +import ( + "github.com/google/uuid" + "github.com/paragor/todo/pkg/models" + "slices" + "sync" +) + +var onDatabaseChangeSubscribers []DatabaseChangeSubscriber + +type DatabaseChangeSubscriber interface { + OnDatabaseChange() +} + +var m sync.Mutex + +func RegisterOnDatabaseChangeSubscriber(subscriber DatabaseChangeSubscriber) { + m.Lock() + defer m.Unlock() + onDatabaseChangeSubscribers = append(onDatabaseChangeSubscribers, subscriber) +} +func UnRegisterOnDatabaseChangeSubscriber(subscriber DatabaseChangeSubscriber) { + m.Lock() + defer m.Unlock() + onDatabaseChangeSubscribers = slices.DeleteFunc(onDatabaseChangeSubscribers, func(existing DatabaseChangeSubscriber) bool { + return existing == subscriber + }) +} + +type spyRepository struct { + db models.Repository +} + +func NewSpyRepository(db models.Repository) *spyRepository { + return &spyRepository{db: db} +} + +func (s *spyRepository) Get(UUID uuid.UUID) (*models.Task, error) { + return s.db.Get(UUID) +} + +func (s *spyRepository) Insert(t *models.Task) error { + err := s.db.Insert(t) + if err == nil { + for _, subscriber := range onDatabaseChangeSubscribers { + subscriber.OnDatabaseChange() + } + } + return err +} + +func (s *spyRepository) All() ([]*models.Task, error) { + return s.db.All() +} diff --git a/pkg/telegram/notify.go b/pkg/telegram/notify.go index c924b85..dff9afd 100644 --- a/pkg/telegram/notify.go +++ b/pkg/telegram/notify.go @@ -6,48 +6,66 @@ import ( "fmt" "github.com/google/uuid" "github.com/paragor/todo/pkg/cron" + "github.com/paragor/todo/pkg/events" "github.com/paragor/todo/pkg/models" "log" + "sync" "time" ) type Notifier struct { - notifyRefreshTime time.Duration - notifyState map[uuid.UUID]*cron.Cron - notifyErrChan chan error - db models.Repository - telegram *TelegramServer + notifyState map[uuid.UUID]*cron.Cron + notifyErrChan chan error + refreshErrChan chan error + db models.Repository + telegram *TelegramServer + m sync.Mutex } -func newNotifier(refreshTime time.Duration, db models.Repository, telegram *TelegramServer) *Notifier { +func newNotifier(db models.Repository, telegram *TelegramServer) *Notifier { return &Notifier{ - notifyRefreshTime: refreshTime, - notifyState: map[uuid.UUID]*cron.Cron{}, - notifyErrChan: make(chan error, 1000), - db: db, - telegram: telegram, + notifyState: map[uuid.UUID]*cron.Cron{}, + notifyErrChan: make(chan error, 1000), + refreshErrChan: make(chan error, 1), + db: db, + telegram: telegram, } } -func (n *Notifier) start(ctx context.Context) error { +func (n *Notifier) Start(ctx context.Context) error { + err := n.refreshState() + if err != nil { + n.close() + return err + } + events.RegisterOnDatabaseChangeSubscriber(n) + defer events.UnRegisterOnDatabaseChangeSubscriber(n) for { - err := n.refreshState() - if err != nil { - n.close() - return err - } select { - case <-time.After(n.notifyRefreshTime): case err := <-n.notifyErrChan: if err != nil { return fmt.Errorf("cron job return error: %w", err) } + case err := <-n.refreshErrChan: + if err != nil { + return fmt.Errorf("refresh state return error: %w", err) + } case <-ctx.Done(): return ctx.Err() } } } + +func (n *Notifier) OnDatabaseChange() { + err := n.refreshState() + if err != nil { + n.notifyErrChan <- err + } +} + func (n *Notifier) refreshState() error { + n.m.Lock() + defer n.m.Unlock() tasks, err := n.db.All() if err != nil { return fmt.Errorf("cant get task list: %w", err) diff --git a/pkg/telegram/server.go b/pkg/telegram/server.go index f7406fe..169f698 100644 --- a/pkg/telegram/server.go +++ b/pkg/telegram/server.go @@ -96,9 +96,9 @@ func (t *TelegramServer) Start(ctx context.Context, stopper chan<- error) error stopper <- fmt.Errorf("stop telegram") }() - notifier := newNotifier(time.Second*63, t.db, t) + notifier := newNotifier(t.db, t) go func() { - err := notifier.start(ctx) + err := notifier.Start(ctx) stopper <- fmt.Errorf("stop telegram.notifier: %w", err) }() go func() {