Skip to content

Commit

Permalink
Further simplify cron starting
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 16, 2022
1 parent 7907575 commit b7ffe0e
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 60 deletions.
7 changes: 1 addition & 6 deletions core/tasks/analytics/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@ package analytics

import (
"context"
"sync"
"time"

"github.com/nyaruka/librato"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "stats", time.Second*60, true, reportAnalytics, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("analytics", time.Second*60, true, reportAnalytics)
}

var (
Expand Down
7 changes: 1 addition & 6 deletions core/tasks/campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package campaigns
import (
"context"
"fmt"
"sync"
"time"

"github.com/gomodule/redigo/redis"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
Expand All @@ -27,10 +25,7 @@ const (
var campaignsMarker = redisx.NewIntervalSet("campaign_event", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "campaign_event", time.Second*60, false, QueueEventFires, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("campaign_event", time.Second*60, false, QueueEventFires)
}

// QueueEventFires looks for all due campaign event fires and queues them to be started
Expand Down
9 changes: 2 additions & 7 deletions core/tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package expirations
import (
"context"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -24,11 +22,8 @@ const (
var expirationsMarker = redisx.NewIntervalSet("run_expirations", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "run_expirations", time.Minute, false, HandleWaitExpirations, time.Minute*5, quit)
cron.Start(rt, wg, "expire_ivr_calls", time.Minute, false, ExpireVoiceSessions, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("run_expirations", time.Minute, false, HandleWaitExpirations)
mailroom.RegisterCron("expire_ivr_calls", time.Minute, false, ExpireVoiceSessions)
}

// HandleWaitExpirations handles waiting messaging sessions whose waits have expired, resuming those that can be resumed,
Expand Down
8 changes: 1 addition & 7 deletions core/tasks/handler/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var retriedMsgs = redisx.NewIntervalSet("retried_msgs", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "retry_msgs", time.Minute*5, false, RetryPendingMsgs, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("retry_msgs", time.Minute*5, false, RetryPendingMsgs)
}

// RetryPendingMsgs looks for any pending msgs older than five minutes and queues them to be handled again
Expand Down
7 changes: 1 addition & 6 deletions core/tasks/incidents/end_incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,19 @@ package incidents
import (
"context"
"fmt"
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "end_incidents", time.Minute*3, false, EndIncidents, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("end_incidents", time.Minute*3, false, EndIncidents)
}

// EndIncidents checks open incidents and end any that no longer apply
Expand Down
7 changes: 1 addition & 6 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@ package ivr

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "retry_ivr_calls", time.Minute, false, RetryCalls, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, RetryCalls)
}

// RetryCalls looks for calls that need to be retried and retries them
Expand Down
7 changes: 1 addition & 6 deletions core/tasks/msgs/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@ package msgs

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "retry_errored_messages", time.Second*60, false, RetryErroredMessages, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("retry_errored_messages", time.Second*60, false, RetryErroredMessages)
}

func RetryErroredMessages(ctx context.Context, rt *runtime.Runtime) error {
Expand Down
7 changes: 1 addition & 6 deletions core/tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@ package schedules

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "fire_schedules", time.Minute*1, false, checkSchedules, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("fire_schedules", time.Minute*1, false, checkSchedules)
}

// checkSchedules looks up any expired schedules and fires them, setting the next fire as needed
Expand Down
8 changes: 1 addition & 7 deletions core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,21 @@ package timeouts
import (
"context"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var marker = redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, "sessions_timeouts", time.Second*60, false, timeoutSessions, time.Minute*5, quit)
return nil
})
mailroom.RegisterCron("sessions_timeouts", time.Second*60, false, timeoutSessions)
}

// timeoutRuns looks for any runs that have timed out and schedules for them to continue
Expand Down
14 changes: 11 additions & 3 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/nyaruka/gocommon/storage"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/mailroom/web"
"github.com/pkg/errors"

Expand All @@ -21,15 +22,22 @@ import (
)

// InitFunction is a function that will be called when mailroom starts
type InitFunction func(runtime *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error
type InitFunction func(*runtime.Runtime, *sync.WaitGroup, chan bool) error

var initFunctions = make([]InitFunction, 0)

// AddInitFunction adds an init function that will be called on startup
func AddInitFunction(initFunc InitFunction) {
func addInitFunction(initFunc InitFunction) {
initFunctions = append(initFunctions, initFunc)
}

// RegisterCron registers a new cron function to run every interval
func RegisterCron(name string, interval time.Duration, allInstances bool, fn cron.Function) {
addInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(rt, wg, name, interval, allInstances, fn, time.Minute*5, quit)
return nil
})
}

// TaskFunction is the function that will be called for a type of task
type TaskFunction func(ctx context.Context, rt *runtime.Runtime, task *queue.Task) error

Expand Down

0 comments on commit b7ffe0e

Please sign in to comment.