Skip to content

Commit

Permalink
feat(job): periodic JWKS refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
ncarlier committed Sep 24, 2023
1 parent bdc1a77 commit f1b3e82
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 74 deletions.
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ncarlier/readflow/pkg/db"
"github.com/ncarlier/readflow/pkg/exporter"
"github.com/ncarlier/readflow/pkg/exporter/pdf"
"github.com/ncarlier/readflow/pkg/job"
"github.com/ncarlier/readflow/pkg/logger"
"github.com/ncarlier/readflow/pkg/metric"
"github.com/ncarlier/readflow/pkg/server"
Expand Down Expand Up @@ -98,9 +97,6 @@ func main() {
exporter.Register("pdf", pdf.NewPDFExporter(conf.Integration.PDFGeneratorURL))
}

// start job scheduler
scheduler := job.StartNewScheduler(database)

// create HTTP server
httpServer := server.NewHTTPServer(conf)

Expand All @@ -124,7 +120,6 @@ func main() {
go func() {
<-quit
log.Debug().Msg("shutting down readflow...")
scheduler.Shutdown()
api.Shutdown()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -145,6 +140,8 @@ func main() {
}
}

service.Shutdown()

if err := downloadCache.Close(); err != nil {
log.Error().Err(err).Msg("unable to gracefully shutdown the cache storage")
}
Expand Down
28 changes: 13 additions & 15 deletions pkg/job/clean-db.go → pkg/db/cleanup-job.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package job
package db

import (
"time"

"github.com/ncarlier/readflow/pkg/db"

"github.com/ncarlier/readflow/pkg/job"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand All @@ -14,33 +13,32 @@ const (
maximumDeviceInactivityDuration = 30 * 24 * time.Hour
)

// CleanDatabaseJob is a job to clean the database
type CleanDatabaseJob struct {
db db.DB
// CleanupDatabaseJob is a job to clean the database
type CleanupDatabaseJob struct {
db DB
ticker *time.Ticker
logger zerolog.Logger
}

// NewCleanDatabaseJob create and start new job to clean the database
func NewCleanDatabaseJob(_db db.DB) *CleanDatabaseJob {
job := &CleanDatabaseJob{
db: _db,
// NewCleanupDatabaseJob create and start new job to clean the database
func NewCleanupDatabaseJob(db DB) job.Job {
job := &CleanupDatabaseJob{
db: db,
ticker: time.NewTicker(time.Hour),
logger: log.With().Str("component", "scheduler").Str("job", "clean-db").Logger(),
}
go job.start()
return job
}

func (cdj *CleanDatabaseJob) start() {
// Start the cleanup job
func (cdj *CleanupDatabaseJob) Start() {
cdj.logger.Debug().Msg("job started")
for range cdj.ticker.C {
cdj.logger.Debug().Msg("running job...")
nb, err := cdj.db.DeleteReadArticlesOlderThan(maximumArticleRetentionDuration)
if err != nil {
cdj.logger.Error().Err(err).Msg("unable to clean old articles from the database")
break

}
// Using info level only for effective cleanup
evt := cdj.logger.Debug()
Expand All @@ -62,8 +60,8 @@ func (cdj *CleanDatabaseJob) start() {
}
}

// Stop job
func (cdj *CleanDatabaseJob) Stop() {
// Stop the cleanup job
func (cdj *CleanupDatabaseJob) Stop() {
cdj.ticker.Stop()
cdj.logger.Debug().Msg("job stopped")
}
27 changes: 19 additions & 8 deletions pkg/job/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package job

import "github.com/ncarlier/readflow/pkg/db"

// Scheduler is a job scheduler
type Scheduler struct {
cleanDatabaseJob *CleanDatabaseJob
jobs []Job
}

// StartNewScheduler create and start new job scheduler
func StartNewScheduler(_db db.DB) *Scheduler {
return &Scheduler{
cleanDatabaseJob: NewCleanDatabaseJob(_db),
// NewScheduler create new job scheduler
func NewScheduler(jobs ...Job) *Scheduler {
result := &Scheduler{
jobs: jobs,
}
for _, job := range result.jobs {
go job.Start()
}

return result
}

// Add and start job
func (s *Scheduler) Add(job Job) {
job.Start()
s.jobs = append(s.jobs, job)
}

// Shutdown job scheduler
func (s *Scheduler) Shutdown() {
s.cleanDatabaseJob.Stop()
for _, job := range s.jobs {
job.Stop()
}
}
7 changes: 7 additions & 0 deletions pkg/job/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package job

// Job interface
type Job interface {
Start()
Stop()
}
6 changes: 5 additions & 1 deletion pkg/middleware/oidc-jwt-auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ func OpenIDConnectJWTAuth(authority string) Middleware {
if err != nil {
log.Fatal().Err(err).Str("authority", authority).Msg("unable to get OIDC configuration from authority")
}
keystore := oidc.NewOIDCKeystore(cfg)
keystore, err := oidc.NewOIDCKeystore(cfg)
if err != nil {
log.Fatal().Err(err).Str("authority", authority).Msg("unable to get OIDC keys from JWKS endpoint")
}
service.Lookup().AddJob(keystore)
return func(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down
20 changes: 3 additions & 17 deletions pkg/oidc/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,16 @@ import (
func GetOIDCConfiguration(authority string) (Configuration, error) {
var cfg = Configuration{}

r1, err := http.Get(authority + "/.well-known/openid-configuration")
resp, err := http.Get(authority + "/.well-known/openid-configuration")
if err != nil {
return cfg, err
}
defer r1.Body.Close()
defer resp.Body.Close()

err = json.NewDecoder(r1.Body).Decode(&cfg)
err = json.NewDecoder(resp.Body).Decode(&cfg)
if err != nil {
return cfg, err
}

r2, err := http.Get(cfg.JwksURI)
if err != nil {
return cfg, err
}
defer r2.Body.Close()

var jwks = JSONWebKeySet{}
err = json.NewDecoder(r2.Body).Decode(&jwks)
if err != nil {
return cfg, err
}

cfg.JSONWebKeySet = jwks.Keys

return cfg, err
}
53 changes: 48 additions & 5 deletions pkg/oidc/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,50 @@ package oidc

import (
"crypto/rsa"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/dgrijalva/jwt-go"
)

// Keystore OIDC keystore
type Keystore struct {
conf Configuration
store sync.Map
conf Configuration
keys []JSONWebKey
store sync.Map
ticker *time.Ticker
}

// NewOIDCKeystore create a new OIDC keystore
func NewOIDCKeystore(conf Configuration) *Keystore {
return &Keystore{
func NewOIDCKeystore(conf Configuration) (*Keystore, error) {
ks := &Keystore{
conf: conf,
}
if err := ks.refresh(); err != nil {
return nil, err
}
return ks, nil
}

func (k *Keystore) refresh() error {
resp, err := http.Get(k.conf.JwksURI)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode >= 300 {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}

err = json.NewDecoder(resp.Body).Decode(&k.keys)
if err != nil {
return err
}
return nil
}

// GetKey retrieve a key from the keystore
Expand All @@ -37,7 +64,7 @@ func (k *Keystore) GetKey(id string) (*rsa.PublicKey, error) {

func (k *Keystore) getKey(id string) (*rsa.PublicKey, error) {
var pem = ""
for _, jwk := range k.conf.JSONWebKeySet {
for _, jwk := range k.keys {
if jwk.Kid == id {
var err error
pem, err = jwkToPEM(jwk)
Expand All @@ -54,3 +81,19 @@ func (k *Keystore) getKey(id string) (*rsa.PublicKey, error) {

return jwt.ParseRSAPublicKeyFromPEM([]byte(pem))
}

// Start keystore periodic refresh job
func (k *Keystore) Start() {
if k.ticker != nil {
return
}
k.ticker = time.NewTicker(time.Hour)
for range k.ticker.C {
k.refresh()
}
}

// Stop keystore periodic refresh job
func (k *Keystore) Stop() {
k.ticker.Stop()
}
45 changes: 22 additions & 23 deletions pkg/oidc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,28 @@ package oidc

// Configuration is the result from an OIDC discovery endpoint
type Configuration struct {
Issuer string `json:"issuer"`
JwksURI string `json:"jwks_uri"`
AuthorizationEndpoint string `json:"authorization_endpoint"`
TokenEndpoint string `json:"token_endpoint"`
UserinfoEndpoint string `json:"userinfo_endpoint"`
EndSessionEndpoint string `json:"end_session_endpoint"`
CheckSessionIframe string `json:"check_session_iframe"`
RevocationEndpoint string `json:"revocation_endpoint"`
IntrospectionEndpoint string `json:"introspection_endpoint"`
FrontchannelLogoutSupported bool `json:"frontchannel_logout_supported"`
FrontchannelLogoutSessionSupported bool `json:"frontchannel_logout_session_supported"`
BackchannelLogoutSupported bool `json:"backchannel_logout_supported"`
BackchannelLogoutSessionSupported bool `json:"backchannel_logout_session_supported"`
ScopesSupported []string `json:"scopes_supported"`
ClaimsSupported []string `json:"claims_supported"`
GrantTypesSupported []string `json:"grant_types_supported"`
ResponseTypesSupported []string `json:"response_types_supported"`
ResponseModesSupported []string `json:"response_modes_supported"`
TokenEndpointAuthMethodsSupported []string `json:"token_endpoint_auth_methods_supported"`
SubjectTypesSupported []string `json:"subject_types_supported"`
IDTokenSigningAlgValuesSupported []string `json:"id_token_signing_alg_values_supported"`
CodeChallengeMethodsSupported []string `json:"code_challenge_methods_supported"`
JSONWebKeySet []JSONWebKey `json:"-"`
Issuer string `json:"issuer"`
JwksURI string `json:"jwks_uri"`
AuthorizationEndpoint string `json:"authorization_endpoint"`
TokenEndpoint string `json:"token_endpoint"`
UserinfoEndpoint string `json:"userinfo_endpoint"`
EndSessionEndpoint string `json:"end_session_endpoint"`
CheckSessionIframe string `json:"check_session_iframe"`
RevocationEndpoint string `json:"revocation_endpoint"`
IntrospectionEndpoint string `json:"introspection_endpoint"`
FrontchannelLogoutSupported bool `json:"frontchannel_logout_supported"`
FrontchannelLogoutSessionSupported bool `json:"frontchannel_logout_session_supported"`
BackchannelLogoutSupported bool `json:"backchannel_logout_supported"`
BackchannelLogoutSessionSupported bool `json:"backchannel_logout_session_supported"`
ScopesSupported []string `json:"scopes_supported"`
ClaimsSupported []string `json:"claims_supported"`
GrantTypesSupported []string `json:"grant_types_supported"`
ResponseTypesSupported []string `json:"response_types_supported"`
ResponseModesSupported []string `json:"response_modes_supported"`
TokenEndpointAuthMethodsSupported []string `json:"token_endpoint_auth_methods_supported"`
SubjectTypesSupported []string `json:"subject_types_supported"`
IDTokenSigningAlgValuesSupported []string `json:"id_token_signing_alg_values_supported"`
CodeChallengeMethodsSupported []string `json:"code_challenge_methods_supported"`
}

// JSONWebKeySet JSON web key set
Expand Down
8 changes: 8 additions & 0 deletions pkg/service/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package service

import "github.com/ncarlier/readflow/pkg/job"

// AddJob add job to the service
func (reg *Registry) AddJob(job job.Job) {
reg.scheduler.Add(job)
}
13 changes: 13 additions & 0 deletions pkg/service/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ncarlier/readflow/pkg/event"
"github.com/ncarlier/readflow/pkg/event/dispatcher"
"github.com/ncarlier/readflow/pkg/helper"
"github.com/ncarlier/readflow/pkg/job"
"github.com/ncarlier/readflow/pkg/model"
ratelimiter "github.com/ncarlier/readflow/pkg/rate-limiter"
"github.com/ncarlier/readflow/pkg/sanitizer"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Registry struct {
scriptEngine *scripting.ScriptEngine
sanitizer *sanitizer.Sanitizer
events *event.Manager
scheduler *job.Scheduler
dispatcher dispatcher.Dispatcher
secretsEngineProvider secret.EngineProvider
}
Expand Down Expand Up @@ -68,6 +70,9 @@ func Configure(conf config.Config, database db.DB, downloadCache cache.Cache) er
if err != nil {
return err
}
scheduler := job.NewScheduler(
db.NewCleanupDatabaseJob(database),
)

instance = &Registry{
conf: conf,
Expand All @@ -82,12 +87,20 @@ func Configure(conf config.Config, database db.DB, downloadCache cache.Cache) er
scriptEngine: scripting.NewScriptEngine(128),
dispatcher: dispatcher,
events: event.NewEventManager(),
scheduler: scheduler,
secretsEngineProvider: secretsEngineProvider,
}
instance.registerEventHandlers()
return instance.initProperties()
}

// Shutdown service internals jobs
func Shutdown() {
if instance != nil {
instance.scheduler.Shutdown()
}
}

// Lookup returns the global service registry
func Lookup() *Registry {
if instance != nil {
Expand Down

0 comments on commit f1b3e82

Please sign in to comment.