Skip to content

Commit

Permalink
Mark as sent in redis any message send that does result in status err…
Browse files Browse the repository at this point in the history
…ored
  • Loading branch information
rowanseymour committed Nov 5, 2024
1 parent 0559144 commit 938b060
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Backend interface {
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)

Expand Down
15 changes: 9 additions & 6 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,19 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu

dbMsg := msg.(*Msg)

queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken)
if err := queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken); err != nil {
slog.Error("unable to mark queue task complete", "error", err)
}

// mark as sent in redis as well if this was actually wired or sent
if status != nil && (status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusWired) {
// mark as sent in redis as well if message send ended in status that won't be retried
if status.Status() != courier.MsgStatusErrored {
dateKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
rc.Send("sadd", dateKey, msg.ID().String())
rc.Send("expire", dateKey, 60*60*24*2)
rc.Send("SADD", dateKey, msg.ID().String())
rc.Send("EXPIRE", dateKey, 60*60*24*2)

_, err := rc.Do("")
if err != nil {
slog.Error("unable to add new unsent message", "error", err, "sent_msgs_key", dateKey)
slog.Error("unable to mark message sent", "error", err)
}

// if our msg has an associated session and timeout, update that
Expand Down

0 comments on commit 938b060

Please sign in to comment.