Skip to content

Commit

Permalink
Merge pull request #608 from nyaruka/timeout_fix
Browse files Browse the repository at this point in the history
Clear session timeouts when wait rejects timeout resume
  • Loading branch information
rowanseymour authored Mar 29, 2022
2 parents 736b5c7 + 460cb9f commit 83c16d9
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/handlers/msg_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func handlePreMsgCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
}

// everybody else gets their timeout cleared, will be set by courier
scene.Session().ClearTimeoutOn()
scene.Session().ClearWaitTimeout(ctx, nil)

return nil
}
Expand Down
14 changes: 13 additions & 1 deletion core/models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func (s *Session) WaitStartedOn() *time.Time { return s.s.WaitStartedOn
func (s *Session) WaitTimeoutOn() *time.Time { return s.s.WaitTimeoutOn }
func (s *Session) WaitExpiresOn() *time.Time { return s.s.WaitExpiresOn }
func (s *Session) WaitResumeOnExpire() bool { return s.s.WaitResumeOnExpire }
func (s *Session) ClearTimeoutOn() { s.s.WaitTimeoutOn = nil }
func (s *Session) CurrentFlowID() FlowID { return s.s.CurrentFlowID }
func (s *Session) ConnectionID() *ConnectionID { return s.s.ConnectionID }
func (s *Session) IncomingMsgID() MsgID { return s.incomingMsgID }
Expand Down Expand Up @@ -467,6 +466,19 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
return nil
}

// ClearWaitTimeout clears the timeout on the wait on this session and is used if the engine tells us
// that the flow no longer has a timeout on that wait. It can be called without updating the session
// in the database which is used when handling msg_created events before session is updated anyway.
func (s *Session) ClearWaitTimeout(ctx context.Context, db *sqlx.DB) error {
s.s.WaitTimeoutOn = nil

if db != nil {
_, err := db.ExecContext(ctx, `UPDATE flows_flowsession SET timeout_on = NULL WHERE id = $1`, s.ID())
return errors.Wrap(err, "error clearing wait timeout")
}
return nil
}

// MarshalJSON is our custom marshaller so that our inner struct get output
func (s *Session) MarshalJSON() ([]byte, error) {
return json.Marshal(s.s)
Expand Down
28 changes: 28 additions & 0 deletions core/models/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,34 @@ func TestGetSessionWaitExpiresOn(t *testing.T) {
assert.Nil(t, s2Actual)
}

func TestClearWaitTimeout(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetData)

oa := testdata.Org1.Load(rt)

_, cathy := testdata.Cathy.Load(db, oa)

expiresOn := time.Now().Add(time.Hour)
timeoutOn := time.Now().Add(time.Minute)
testdata.InsertWaitingSession(db, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilConnectionID, time.Now(), expiresOn, true, &timeoutOn)

session, err := models.FindWaitingSessionForContact(ctx, db, nil, oa, models.FlowTypeMessaging, cathy)
require.NoError(t, err)

// can be called without db connection to clear without updating db
session.ClearWaitTimeout(ctx, nil)
assert.Nil(t, session.WaitTimeoutOn())
assert.NotNil(t, session.WaitExpiresOn()) // unaffected

// and called with one to clear in the database as well
session.ClearWaitTimeout(ctx, db)
assert.Nil(t, session.WaitTimeoutOn())

assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE id = $1`, session.ID()).Returns(nil)
}

func insertSessionAndRun(db *sqlx.DB, contact *testdata.Contact, sessionType models.FlowType, status models.SessionStatus, flow *testdata.Flow, connID models.ConnectionID) (models.SessionID, models.FlowRunID) {
// create session and add a run with same status
sessionID := testdata.InsertFlowSession(db, testdata.Org1, contact, sessionType, status, flow, connID)
Expand Down
53 changes: 34 additions & 19 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,9 @@ func TestTimedEvents(t *testing.T) {

defer testsuite.Reset(testsuite.ResetAll)

// start to start our favorites flow
// create some keyword triggers
testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.Favorites, "start", models.MatchOnly, nil, nil)
testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.PickANumber, "pick", models.MatchOnly, nil, nil)

tcs := []struct {
EventType string
Expand Down Expand Up @@ -575,6 +576,15 @@ func TestTimedEvents(t *testing.T) {

// 9: start our favorite flow again
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel.ID, testdata.Org1.ID},

// 10: timeout on the color question
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel.ID, testdata.Org1.ID},

// 11: start the pick a number flow
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel.ID, testdata.Org1.ID},

// 12: try to resume with timeout even tho flow doesn't have one set
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
}

last := time.Now()
Expand All @@ -585,25 +595,21 @@ func TestTimedEvents(t *testing.T) {
time.Sleep(50 * time.Millisecond)

var task *queue.Task
if tc.EventType == handler.MsgEventType {
event := &handler.MsgEvent{
ContactID: tc.Contact.ID,
OrgID: tc.OrgID,
ChannelID: tc.ChannelID,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
}

eventJSON, err := json.Marshal(event)
assert.NoError(t, err)

if tc.EventType == handler.MsgEventType {
task = &queue.Task{
Type: tc.EventType,
OrgID: int(tc.OrgID),
Task: eventJSON,
Task: jsonx.MustMarshal(&handler.MsgEvent{
ContactID: tc.Contact.ID,
OrgID: tc.OrgID,
ChannelID: tc.ChannelID,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
}),
}
} else if tc.EventType == handler.ExpirationEventType {
var expiration time.Time
Expand All @@ -618,6 +624,14 @@ func TestTimedEvents(t *testing.T) {
}

task = handler.NewExpirationTask(tc.OrgID, tc.Contact.ID, sessionID, expiration)

} else if tc.EventType == handler.TimeoutEventType {
timeoutOn := time.Now().Round(time.Millisecond) // so that there's no difference between this and what we read from the db

// usually courier will set timeout_on after sending the last message
db.MustExec(`UPDATE flows_flowsession SET timeout_on = $2 WHERE id = $1`, sessionID, timeoutOn)

task = handler.NewTimeoutTask(tc.OrgID, tc.Contact.ID, sessionID, timeoutOn)
}

err := handler.QueueHandleTask(rc, tc.Contact.ID, task)
Expand All @@ -643,9 +657,10 @@ func TestTimedEvents(t *testing.T) {
last = time.Now()
}

// should only have a single waiting session/run per contact
assertdb.Query(t, db, `SELECT count(*) from flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)
assertdb.Query(t, db, `SELECT count(*) from flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)
// should only have a single waiting session/run with no timeout
assertdb.Query(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)
assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(nil)
assertdb.Query(t, db, `SELECT count(*) FROM flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)

// test the case of a run and session no longer being the most recent but somehow still active, expiration should still work
r, err := db.QueryContext(ctx, `SELECT id, session_id from flows_flowrun WHERE contact_id = $1 and status = 'I' order by created_on asc limit 1`, testdata.Cathy.ID)
Expand Down
11 changes: 10 additions & 1 deletion core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/goflow/flows/resumes"
"github.com/nyaruka/goflow/flows/triggers"
Expand Down Expand Up @@ -273,7 +274,15 @@ func handleTimedEvent(ctx context.Context, rt *runtime.Runtime, eventType string

_, err = runner.ResumeFlow(ctx, rt, oa, session, modelContact, resume, nil)
if err != nil {
return errors.Wrapf(err, "error resuming flow for timeout")
// if we errored, and it's the wait rejecting the timeout event, it's because it no longer exists on the flow, so clear it
// on the session
var eerr *engine.Error
if errors.As(err, &eerr) && eerr.Code() == engine.ErrorResumeRejectedByWait && resume.Type() == resumes.TypeWaitTimeout {
log.WithField("session_id", session.ID()).Info("clearing session timeout which is no longer set in flow")
return errors.Wrap(session.ClearWaitTimeout(ctx, rt.DB), "error clearing session timeout")
}

return errors.Wrap(err, "error resuming flow for timeout")
}

log.WithField("elapsed", time.Since(start)).Info("handled timed event")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/goflow v0.153.0
github.com/nyaruka/goflow v0.154.0
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY
github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/goflow v0.153.0 h1:ZphPN0WCod77uvBMCLOxjl9fibaHdTkcWVP3lltIgbc=
github.com/nyaruka/goflow v0.153.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc=
github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down

0 comments on commit 83c16d9

Please sign in to comment.