Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Campaigns #23

Merged
merged 3 commits into from
Sep 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*.dylib
mailroom
dist
.vscode

# Test binary, build with `go test -c`
*.test
Expand Down
14 changes: 10 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

204 changes: 204 additions & 0 deletions handlers/campaigns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package handlers

import (
"context"
"time"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/juju/errors"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/models"
"github.com/sirupsen/logrus"
)

// UpdateCampaignEventsHook is our hook to update any campaign events
type UpdateCampaignEventsHook struct{}

var updateCampaignEventsHook = &UpdateCampaignEventsHook{}

// Apply will update all the campaigns for the passed in sessions, minimizing the number of queries to do so
func (h *UpdateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, orgID models.OrgID, sessions map[*models.Session][]interface{}) error {
logrus.WithField("sessions", sessions).Debug("getting campaign callback")

// these are all the events we need to delete unfired fires for
deletes := make([]interface{}, 0, 5)

// these are all the new events we need to insert
inserts := make([]interface{}, 0, 5)

for s, es := range sessions {
org := s.Org()
groupAdds := make(map[models.GroupID]bool)
groupRemoves := make(map[models.GroupID]bool)
fieldChanges := make(map[*models.Field]bool)

for _, e := range es {
switch event := e.(type) {

case *GroupAdd:
groupAdds[event.GroupID] = true
delete(groupRemoves, event.GroupID)

case *GroupRemove:
groupRemoves[event.GroupID] = true
delete(groupAdds, event.GroupID)

case *events.ContactFieldChangedEvent:
field := s.Org().FieldByKey(event.Field.Key)
if field == nil {
logrus.WithFields(logrus.Fields{
"field_key": event.Field.Key,
"field_name": event.Field.Name,
"session_id": s.ID,
}).Debug("unable to find field with key, ignoring for campaign updates")
continue
}
fieldChanges[field] = true
}
}

// those events that need deleting
deleteEvents := make(map[models.CampaignEventID]bool, len(groupRemoves)+len(fieldChanges))

// those events we need to add
addEvents := make(map[*models.CampaignEvent]bool, len(groupAdds)+len(fieldChanges))

// for every group that was removed, we need to remove all event fires for them
for g := range groupRemoves {
for _, c := range s.Org().CampaignByGroupID(g) {
for _, e := range c.Events() {
// TODO: filter by field value?
deleteEvents[e.ID()] = true
}
}
}

// for every field that was changed, we need to also remove event fires and recalculate
for f := range fieldChanges {
fieldEvents := s.Org().CampaignEventsByFieldID(f.ID())
for _, e := range fieldEvents {
deleteEvents[e.ID()] = true
addEvents[e] = true
}
}

// ok, create all our deletes
for e := range deleteEvents {
deletes = append(deletes, &FireDelete{
ContactID: s.ContactID,
EventID: e,
})
}

// add in all the events we qualify for in campaigns we are now part of
for g := range groupAdds {
for _, c := range org.CampaignByGroupID(g) {
for _, e := range c.Events() {
addEvents[e] = true
}
}
}

// ok, for all the unique events we now calculate our fire date
tz := s.Org().Env().Timezone()
now := time.Now()
for ce := range addEvents {
// we aren't part of the group, move on
if s.Contact().Groups().FindByUUID(ce.Campaign().GroupUUID()) == nil {
continue
}

// get our value for the event
value := s.Contact().Fields()[ce.RelativeToKey()]

// no value? move on
if value == nil {
continue
}

// get the typed value
typed := value.TypedValue()
start, isTime := typed.(types.XDateTime)

// nil or not a date? move on
if !isTime {
continue
}

logrus.WithField("start", start).Debug("calculating offset")

// calculate our next fire
scheduled, err := ce.ScheduleForTime(tz, now, start.Native())
if err != nil {
return errors.Annotatef(err, "error calculating offset for start: %s and event: %d", start, ce.ID())
}

// no scheduled date? move on
if scheduled == nil {
continue
}

// ok we have a new fire date, add it to our list of fires to insert
inserts = append(inserts, &FireInsert{
ContactID: s.Contact().ID(),
EventID: ce.ID(),
Scheduled: *scheduled,
})
}
}

// first delete all our removed fires
if len(deletes) > 0 {
err := models.BulkInsert(ctx, tx, deleteUnfiredFires, deletes)
if err != nil {
return errors.Annotatef(err, "error deleting unfired event fires")
}
}

// then insert our new ones
if len(inserts) > 0 {
err := models.BulkInsert(ctx, tx, insertFires, inserts)
if err != nil {
return errors.Annotatef(err, "error inserting new event fires")
}
}

return nil
}

const deleteUnfiredFires = `
DELETE FROM
campaigns_eventfire
WHERE
id
IN (
SELECT
c.id
FROM
campaigns_eventfire c,
(VALUES(:contact_id, :event_id)) AS f(contact_id, event_id)
WHERE
c.contact_id = f.contact_id::int AND c.event_id = f.event_id::int AND c.fired IS NULL
);
`

type FireDelete struct {
ContactID flows.ContactID `db:"contact_id"`
EventID models.CampaignEventID `db:"event_id"`
}

const insertFires = `
INSERT INTO
campaigns_eventfire
(contact_id, event_id, scheduled)
VALUES(:contact_id, :event_id, :scheduled)
`

type FireInsert struct {
ContactID flows.ContactID `db:"contact_id"`
EventID models.CampaignEventID `db:"event_id"`
Scheduled time.Time `db:"scheduled"`
}
72 changes: 72 additions & 0 deletions handlers/campaigns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package handlers

import (
"testing"

"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
)

func TestCampaigns(t *testing.T) {
testsuite.Reset()

// joinedUUID := models.FieldUUID("8c1c1256-78d6-4a5b-9f1c-1761d5728251")
joined := assets.NewFieldReference("joined", "Joined")
//doctors := assets.NewGroupReference(assets.GroupUUID("85a5a793-4741-4896-b55e-05af65f3c0fa"), "Doctors")
doctorsID := models.GroupID(33)

// add cathy and bob as doctors
testsuite.DB().MustExec("insert into contacts_contactgroup_contacts(contact_id, contactgroup_id) VALUES($1, $2);", Cathy, doctorsID)
testsuite.DB().MustExec("insert into contacts_contactgroup_contacts(contact_id, contactgroup_id) VALUES($1, $2);", Bob, doctorsID)

// init their values
testsuite.DB().MustExec(
`update contacts_contact set fields = fields - '8c1c1256-78d6-4a5b-9f1c-1761d5728251'
WHERE id = $1`, Cathy)

testsuite.DB().MustExec(
`update contacts_contact set fields = fields ||
'{"8c1c1256-78d6-4a5b-9f1c-1761d5728251": { "text": "2029-09-15T12:00:00+00:00", "datetime": "2029-09-15T12:00:00+00:00" }}'::jsonb
WHERE id = $1`, Bob)

tcs := []EventTestCase{
EventTestCase{
Events: ContactEventMap{
Cathy: []flows.Event{
events.NewContactFieldChangedEvent(joined, &flows.Value{Text: types.NewXText("2029-09-15T12:00:00+00:00")}),
events.NewContactFieldChangedEvent(joined, &flows.Value{}),
},
Bob: []flows.Event{
events.NewContactFieldChangedEvent(joined, &flows.Value{Text: types.NewXText("2029-09-15T12:00:00+00:00")}),
events.NewContactFieldChangedEvent(joined, &flows.Value{Text: types.NewXText("2029-09-15T12:00:00+00:00")}),
},
Evan: []flows.Event{
events.NewContactFieldChangedEvent(joined, &flows.Value{Text: types.NewXText("2029-09-15T12:00:00+00:00")}),
},
},
Assertions: []SQLAssertion{
SQLAssertion{
SQL: `select count(*) FROM campaigns_eventfire WHERE contact_id = $1`,
Args: []interface{}{Cathy},
Count: 0,
},
SQLAssertion{
SQL: `select count(*) FROM campaigns_eventfire WHERE contact_id = $1`,
Args: []interface{}{Bob},
Count: 2,
},
SQLAssertion{
SQL: `select count(*) FROM campaigns_eventfire WHERE contact_id = $1`,
Args: []interface{}{Evan},
Count: 0,
},
},
},
}

RunEventTestCases(t, tcs)
}
Loading