Skip to content

Commit

Permalink
feat(job): add cleanup job
Browse files Browse the repository at this point in the history
  • Loading branch information
ncarlier committed Apr 6, 2019
1 parent c612417 commit 8570769
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 10 deletions.
22 changes: 14 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"syscall"
"time"

"github.com/ncarlier/reader/pkg/job"

eventbroker "github.com/ncarlier/reader/pkg/event-broker"
"github.com/ncarlier/reader/pkg/service"

Expand Down Expand Up @@ -41,22 +43,25 @@ func main() {
}
logger.Configure(level, true, *conf.SentryDSN)

log.Debug().Msg("starting Nunux Reader server...")

// Configure the DB
_db, err := db.Configure(*conf.DB)
if err != nil {
log.Fatal().Err(err).Msg("Could not configure Database")
log.Fatal().Err(err).Msg("could not configure Database")
}

// Configure Event Broker
_, err = eventbroker.Configure(*conf.Broker)
if err != nil {
log.Fatal().Err(err).Msg("Could not configure Event Broker")
log.Fatal().Err(err).Msg("could not configure Event Broker")
}

// Init service registry
service.InitRegistry(_db)

log.Debug().Msg("Starting Nunux Reader server...")
// Start job scheduler
scheduler := job.StartNewScheduler(_db)

server := &http.Server{
Addr: *conf.ListenAddr,
Expand All @@ -69,27 +74,28 @@ func main() {

go func() {
<-quit
log.Debug().Msg("Server is shutting down...")
log.Debug().Msg("server is shutting down...")
scheduler.Shutdown()
api.Shutdown()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
log.Fatal().Err(err).Msg("Could not gracefully shutdown the server")
log.Fatal().Err(err).Msg("could not gracefully shutdown the server")
}
close(done)
}()

api.Start()

log.Info().Str("listen", *conf.ListenAddr).Msg("Server started")
log.Info().Str("listen", *conf.ListenAddr).Msg("server started")

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Str("listen", *conf.ListenAddr).Msg("Could not start the server")
log.Fatal().Err(err).Str("listen", *conf.ListenAddr).Msg("could not start the server")
}

<-done
log.Debug().Msg("Server stopped")
log.Debug().Msg("server stopped")
}
7 changes: 6 additions & 1 deletion pkg/db/article.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package db

import "github.com/ncarlier/reader/pkg/model"
import (
"time"

"github.com/ncarlier/reader/pkg/model"
)

// ArticleRepository is the repository interface to manage Articles
type ArticleRepository interface {
Expand All @@ -9,4 +13,5 @@ type ArticleRepository interface {
CreateOrUpdateArticle(article model.Article) (*model.Article, error)
DeleteArticle(article model.Article) error
MarkAllArticlesAsRead(uid uint, categoryID *uint) (int64, error)
DeleteReadArticlesOlderThan(delay time.Duration) (int64, error)
}
19 changes: 19 additions & 0 deletions pkg/db/postgres/article.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/ncarlier/reader/pkg/model"
Expand Down Expand Up @@ -203,3 +204,21 @@ func (pg *DB) MarkAllArticlesAsRead(uid uint, categoryID *uint) (int64, error) {

return count, err
}

// DeleteReadArticlesOlderThan remove old articles from the DB
func (pg *DB) DeleteReadArticlesOlderThan(delay time.Duration) (int64, error) {
maxAge := time.Now().Add(-delay)
query, args, _ := pg.psql.Delete(
"articles",
).Where(
sq.Eq{"status": "read"},
).Where(
sq.Lt{"updated_at": maxAge},
).ToSql()

result, err := pg.db.Exec(query, args...)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
47 changes: 47 additions & 0 deletions pkg/job/clean-db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package job

import (
"time"

"github.com/ncarlier/reader/pkg/db"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

// CleanDatabaseJob is a job to clean the database
type CleanDatabaseJob struct {
db db.DB
ticker *time.Ticker
logger zerolog.Logger
}

// NewCleanDatabaseJob create and start new job to clean the database
func NewCleanDatabaseJob(_db db.DB) *CleanDatabaseJob {
job := &CleanDatabaseJob{
db: _db,
ticker: time.NewTicker(time.Hour),
logger: log.With().Str("job", "clean-db").Logger(),
}
go job.start()
return job
}

func (cdj *CleanDatabaseJob) start() {
cdj.logger.Debug().Msg("job started")
for range cdj.ticker.C {
cdj.logger.Debug().Msg("running job...")
nb, err := cdj.db.DeleteReadArticlesOlderThan(48 * time.Hour)
if err != nil {
cdj.logger.Error().Err(err).Msg("unable to clean the database")
break
}
cdj.logger.Info().Int64("removed_articles", nb).Msg("cleanup done")
}
}

// Stop job
func (cdj *CleanDatabaseJob) Stop() {
cdj.ticker.Stop()
cdj.logger.Debug().Msg("job stopped")
}
20 changes: 20 additions & 0 deletions pkg/job/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package job

import "github.com/ncarlier/reader/pkg/db"

// Scheduler is a job scheduler
type Scheduler struct {
cleanDatabaseJob *CleanDatabaseJob
}

// StartNewScheduler create and start new job scheduler
func StartNewScheduler(_db db.DB) *Scheduler {
return &Scheduler{
cleanDatabaseJob: NewCleanDatabaseJob(_db),
}
}

// Shutdown job scheduler
func (s *Scheduler) Shutdown() {
s.cleanDatabaseJob.Stop()
}
2 changes: 1 addition & 1 deletion pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func Lookup() *Registry {
if instance != nil {
return instance
}
log.Fatal().Msg("Service registry not initialized")
log.Fatal().Msg("service registry not initialized")
return nil
}

0 comments on commit 8570769

Please sign in to comment.