Skip to content

Commit

Permalink
more tests, updated temba.dump, add testsuite package
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Sep 19, 2018
1 parent 6ba5c07 commit ec9925d
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 151 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.so
*.dylib
mailroom
dist

# Test binary, build with `go test -c`
*.test
Expand Down
61 changes: 41 additions & 20 deletions campaigns/campaigns_test.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,59 @@
package campaigns

import (
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/mailroom/marker"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
resetDB()
testsuite.Reset()
os.Exit(m.Run())
}

func mustExec(command string, args ...string) {
cmd := exec.Command(command, args...)
output, err := cmd.CombinedOutput()
if err != nil {
panic(fmt.Sprintf("error restoring database: %s: %s", err, string(output)))
}
}
func TestCampaigns(t *testing.T) {
ctx := testsuite.CTX()
rp := testsuite.RP()
rc := testsuite.RC()
defer rc.Close()

func resetDB() {
db := sqlx.MustOpen("postgres", "postgres://temba@localhost/temba?sslmode=disable")
db.MustExec("drop owned by temba cascade")
mustExec("pg_restore", "-d", "temba", "../temba.dump")
}
err := marker.ClearTasks(rc, campaignsLock)
assert.NoError(t, err)

// let's create a campaign event fire for one of our contacts (for now this is totally hacked, they aren't in the group and
// their relative to date isn't relative, but this still tests execution)
db := testsuite.DB()
db.MustExec(`UPDATE flows_flow SET flow_server_enabled=TRUE WHERE id = 31;`)
db.MustExec(`INSERT INTO campaigns_eventfire(scheduled, contact_id, event_id) VALUES (NOW(), 42, 2), (NOW(), 43, 2);`)
time.Sleep(10 * time.Millisecond)

func getDB() *sqlx.DB {
db := sqlx.MustOpen("postgres", "postgres://temba@localhost/temba?sslmode=disable")
return db
// schedule our campaign to be started
err = fireCampaignEvents(ctx, db, rp, campaignsLock, "lock")
assert.NoError(t, err)

// then actually work on the event
task, err := queue.PopNextTask(rc, eventQueue)
assert.NoError(t, err)
assert.NotNil(t, task)

// work on that task
err = fireEventFires(ctx, db, rp, task)
assert.NoError(t, err)

// should now have a flow run for that contact and flow
assertCount(t, db, `SELECT COUNT(*) from flows_flowrun WHERE contact_id = 42 AND flow_id = 31;`, 1)
assertCount(t, db, `SELECT COUNT(*) from flows_flowrun WHERE contact_id = 43 AND flow_id = 31;`, 1)
}
func TestCampaigns(t *testing.T) {
// create a campaign and event

func assertCount(t *testing.T, db *sqlx.DB, query string, count int) {
var c int
err := db.Get(&c, query)
assert.NoError(t, err)
assert.Equal(t, count, c)
}
35 changes: 23 additions & 12 deletions campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,31 @@ func fireCampaignEvents(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockNa
return nil
}

err = queue.AddTask(rc, eventQueue, campaignEventFireType, fmt.Sprintf("%d", task.OrgID), task, queue.DefaultPriority)
if err != nil {
return errors.Annotate(err, "error queuing task")
}
// TODO: should have a max group size for these (IE, 100 events at a time)
fireIDs := task.FireIDs
for len(fireIDs) > 0 {
batchSize := 100
if batchSize > len(fireIDs) {
batchSize = len(fireIDs)
}
task.FireIDs = fireIDs[:batchSize]
fireIDs = fireIDs[batchSize:]

// mark each of these fires as queued
for _, id := range task.FireIDs {
err = marker.AddTask(rc, campaignsLock, fmt.Sprintf("%d", id))
err = queue.AddTask(rc, eventQueue, campaignEventFireType, fmt.Sprintf("%d", task.OrgID), task, queue.DefaultPriority)
if err != nil {
return errors.Annotate(err, "error marking event as queued")
return errors.Annotate(err, "error queuing task")
}

// mark each of these fires as queued
for _, id := range task.FireIDs {
err = marker.AddTask(rc, campaignsLock, fmt.Sprintf("%d", id))
if err != nil {
return errors.Annotate(err, "error marking event as queued")
}
}
log.WithField("task", fmt.Sprintf("%vvv", task)).WithField("fire_count", len(task.FireIDs)).Debug("added event fire task")
queued += len(task.FireIDs)
}
log.WithField("task", fmt.Sprintf("%vvv", task)).WithField("fire_count", len(task.FireIDs)).Debug("added event fire task")
queued += len(task.FireIDs)

return nil
}
Expand Down Expand Up @@ -170,12 +181,12 @@ FROM
campaigns_campaign c,
flows_flow f
WHERE
ef.fired IS NULL AND ef.scheduled < NOW() AND
ef.fired IS NULL AND ef.scheduled <= NOW() AND
ce.id = ef.event_id AND
f.id = ce.flow_id AND f.is_system = TRUE AND f.flow_server_enabled = TRUE AND
ce.campaign_id = c.id
ORDER BY
scheduled ASC,
DATE_TRUNC('minute', scheduled) ASC,
ef.event_id ASC
LIMIT
25000;
Expand Down
21 changes: 10 additions & 11 deletions campaigns/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ func init() {
mailroom.AddTaskFunction(campaignEventFireType, HandleCampaignEvent)
}

// HandleCampaignEvent is called by mailroom when a campaign event task is ready to be
// processed.
// HandleCampaignEvent is called by mailroom when a campaign event task is ready to be processed.
func HandleCampaignEvent(mr *mailroom.Mailroom, task *queue.Task) error {
ctx, cancel := context.WithTimeout(mr.CTX, time.Minute*5)
defer cancel()

return queueExpiredEventFires(ctx, mr.DB, mr.RedisPool, task)
return fireEventFires(ctx, mr.DB, mr.RedisPool, task)
}

// queueExpiredEventFires handles expired campaign events
// fireEventFires handles expired campaign events
// For each event:
// - loads the event to fire
// - loads the org asset for that event
Expand All @@ -41,7 +40,7 @@ func HandleCampaignEvent(mr *mailroom.Mailroom, task *queue.Task) error {
// - creates the trigger for that event
// - runs the flow that is to be started through our engine
// - saves the flow run and session resulting from our run
func queueExpiredEventFires(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task *queue.Task) error {
func fireEventFires(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task *queue.Task) error {
log := logrus.WithField("comp", "campaign_worker").WithField("task", string(task.Task))

// decode our task body
Expand All @@ -56,9 +55,7 @@ func queueExpiredEventFires(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ta

// grab all the fires for this event
fires, err := loadEventFires(ctx, db, eventTask.FireIDs)

// no fires returned
if len(fires) == 0 {
if err != nil {
// unmark all these fires as fires so they can retry
rc := rp.Get()
for _, id := range eventTask.FireIDs {
Expand All @@ -67,9 +64,11 @@ func queueExpiredEventFires(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ta
rc.Close()

// if we had an error, return that
if err != nil {
return errors.Annotatef(err, "error loading event fire from db: %v", eventTask.FireIDs)
}
return errors.Annotatef(err, "error loading event fire from db: %v", eventTask.FireIDs)
}

// no fires returned
if len(fires) == 0 {
log.Info("events already fired, ignoring")
return nil
}
Expand Down
44 changes: 8 additions & 36 deletions celery/celery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,30 @@ package celery

import (
"encoding/json"
"log"
"testing"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/testsuite"
)

func getPool() *redis.Pool {
redisPool := &redis.Pool{
Wait: true, // makes callers wait for a connection
MaxActive: 5, // only open this many concurrent connections at once
MaxIdle: 2, // only keep up to 2 idle
IdleTimeout: 240 * time.Second, // how long to wait before reaping a connection
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
return nil, err
}
_, err = conn.Do("SELECT", 0)
return conn, err
},
}
conn := redisPool.Get()
defer conn.Close()

_, err := conn.Do("FLUSHDB")
if err != nil {
log.Fatal(err)
}

return redisPool
}
func TestQueue(t *testing.T) {
pool := getPool()
defer pool.Close()

conn := pool.Get()
defer conn.Close()
testsuite.ResetRP()
rc := testsuite.RC()
defer rc.Close()

// queue to our handler queue
conn.Send("multi")
err := QueueTask(conn, "handler", "handle_event_task", []int64{})
rc.Send("multi")
err := QueueTask(rc, "handler", "handle_event_task", []int64{})
if err != nil {
t.Error(err)
}
_, err = conn.Do("exec")
_, err = rc.Do("exec")
if err != nil {
t.Error(err)
}

// check whether things look right
taskJSON, err := redis.String(conn.Do("LPOP", "handler"))
taskJSON, err := redis.String(rc.Do("LPOP", "handler"))
if err != nil {
t.Errorf("should have value in handler queue: %s", err)
}
Expand Down
18 changes: 4 additions & 14 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,15 @@ import (
"testing"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)

func TestCron(t *testing.T) {
rp := &redis.Pool{
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
return nil, err
}
_, err = conn.Do("SELECT", 0)
return conn, err
},
}

rc := rp.Get()
testsuite.ResetRP()
rp := testsuite.RP()
rc := testsuite.RC()
defer rc.Close()
rc.Do("del", "test_lock")

mutex := sync.RWMutex{}
fired := 0
Expand Down
10 changes: 10 additions & 0 deletions marker/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,13 @@ func RemoveTask(rc redis.Conn, taskGroup string, taskID string) error {
}
return nil
}

// ClearTasks removes all tasks for the passed in group (mostly useful in unit testing)
func ClearTasks(rc redis.Conn, taskGroup string) error {
todayKey := fmt.Sprintf(keyPattern, taskGroup, time.Now().UTC().Format("2006_01_02"))
yesterdayKey := fmt.Sprintf(keyPattern, taskGroup, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02"))
rc.Send("del", todayKey)
rc.Send("del", yesterdayKey)
_, err := rc.Do("")
return err
}
6 changes: 3 additions & 3 deletions marker/marker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package marker
import (
"testing"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,8 +25,8 @@ func TestMarker(t *testing.T) {
{"1", "1", "absent"},
}

rc, err := redis.Dial("tcp", "localhost:6379")
assert.NoError(t, err)
testsuite.ResetRP()
rc := testsuite.RC()

for i, tc := range tcs {
if tc.Action == "absent" {
Expand Down
Loading

0 comments on commit ec9925d

Please sign in to comment.