Skip to content

Commit

Permalink
Merge pull request #42 from nyaruka/simulation
Browse files Browse the repository at this point in the history
v1 of simulation endpoint
  • Loading branch information
nicpottier authored Oct 24, 2018
2 parents 71f6c5d + f36d77f commit b8c97af
Show file tree
Hide file tree
Showing 14 changed files with 757 additions and 44 deletions.
3 changes: 2 additions & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/sirupsen/logrus"

_ "github.com/nyaruka/mailroom/campaigns"
Expand All @@ -21,7 +22,7 @@ import (
var version = "Dev"

func main() {
config := mailroom.NewMailroomConfig()
config := config.Mailroom
loader := ezconf.NewLoader(
config,
"mailroom", "Mailroom - flow event handler for RapidPro",
Expand Down
23 changes: 18 additions & 5 deletions config.go → config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package mailroom
package config

// MailroomConfig is our top level configuration object
type MailroomConfig struct {
var Mailroom *Config

func init() {
Mailroom = NewMailroomConfig()
}

// Config is our top level configuration object
type Config struct {
SentryDSN string `help:"the DSN used for logging errors to Sentry"`
DB string `help:"URL describing how to connect to the RapidPro database"`
DBPoolSize int `help:"the size of our db pool"`
Expand All @@ -17,11 +23,15 @@ type MailroomConfig struct {
LibratoToken string `help:"the token that will be used to authenticate to Librato"`

AttachmentDomain string `help:"the domain that will be used for relative attachment"`

AuthToken string `help:"the token clients will need to authenticate web requests"`
Address string `help:"the address to bind our web server to"`
Port int `help:"the port to bind our web server to"`
}

// NewMailroomConfig returns a new default configuration object
func NewMailroomConfig() *MailroomConfig {
return &MailroomConfig{
func NewMailroomConfig() *Config {
return &Config{
DB: "postgres://temba@localhost/temba?sslmode=disable",
DBPoolSize: 8,
Redis: "redis://localhost:6379/0",
Expand All @@ -30,5 +40,8 @@ func NewMailroomConfig() *MailroomConfig {
LogLevel: "error",
Version: "Dev",
SMTPServer: "",

Address: "localhost",
Port: 8090,
}
}
4 changes: 2 additions & 2 deletions hooks/email_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"

"github.com/go-mail/mail"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/pkg/errors"

"github.com/gomodule/redigo/redis"
Expand All @@ -33,7 +33,7 @@ const (
// Apply sends all our emails
func (h *SendEmails) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
// get our smtp server config
config := org.Org().ConfigValue(configSMTPServer, mailroom.Config.SMTPServer)
config := org.Org().ConfigValue(configSMTPServer, config.Mailroom.SMTPServer)

// no config? noop
if config == "" {
Expand Down
6 changes: 0 additions & 6 deletions hooks/email_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@ package hooks
import (
"testing"

"github.com/nyaruka/mailroom"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/actions"
"github.com/nyaruka/mailroom/testsuite"
)

func TestEmailCreated(t *testing.T) {
// create a default config for our test
mailroom.Config = mailroom.NewMailroomConfig()
defer func() { mailroom.Config = nil }()

// configure mailtrap for our org
db := testsuite.DB()
db.MustExec(`UPDATE orgs_org SET config = '{"SMTP_SERVER": "smtp://24f335c64dbc28:[email protected]:2525/[email protected]"}' WHERE id = 1;`)
Expand Down
7 changes: 3 additions & 4 deletions hooks/msg_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package hooks
import (
"testing"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/testsuite"

"github.com/nyaruka/goflow/flows"
Expand All @@ -13,9 +13,8 @@ import (
func TestMsgCreated(t *testing.T) {
testsuite.Reset()

mailroom.Config = mailroom.NewMailroomConfig()
mailroom.Config.AttachmentDomain = "foo.bar.com"
defer func() { mailroom.Config = nil }()
config.Mailroom.AttachmentDomain = "foo.bar.com"
defer func() { config.Mailroom.AttachmentDomain = "" }()

// add a URN for cathy so we can test all urn sends
testsuite.DB().MustExec(
Expand Down
21 changes: 14 additions & 7 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/librato"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/web"
"github.com/sirupsen/logrus"
)

Expand All @@ -40,16 +42,12 @@ func AddTaskFunction(taskType string, taskFunc TaskFunction) {
taskFunctions[taskType] = taskFunc
}

// TODO: better handling of global config
// Config our global mailroom config
var Config *MailroomConfig

const BatchQueue = "batch"
const HandlerQueue = "handler"

// Mailroom is a service for handling RapidPro events
type Mailroom struct {
Config *MailroomConfig
Config *config.Config
DB *sqlx.DB
RP *redis.Pool
Quit chan bool
Expand All @@ -59,11 +57,12 @@ type Mailroom struct {

batchForeman *Foreman
handlerForeman *Foreman

webserver *web.Server
}

// NewMailroom creates and returns a new mailroom instance
func NewMailroom(config *MailroomConfig) *Mailroom {
Config = config
func NewMailroom(config *config.Config) *Mailroom {
mr := &Mailroom{
Config: config,
Quit: make(chan bool),
Expand Down Expand Up @@ -174,6 +173,10 @@ func (mr *Mailroom) Start() error {
mr.batchForeman.Start()
mr.handlerForeman.Start()

// start our web server
mr.webserver = web.NewServer(mr.CTX, mr.DB, mr.RP, mr.Config, mr.WaitGroup)
mr.webserver.Start()

logrus.Info("mailroom started")

// wait for any signals such as QUIT for dumping stack
Expand All @@ -190,6 +193,10 @@ func (mr *Mailroom) Stop() error {
librato.Stop()
close(mr.Quit)
mr.Cancel()

// stop our web server
mr.webserver.Stop()

mr.WaitGroup.Wait()
logrus.Info("mailroom stopped")
return nil
Expand Down
12 changes: 12 additions & 0 deletions models/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func FlushCache() {

// GetOrgAssets creates or gets org assets for the passed in org
func GetOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID) (*OrgAssets, error) {
if db == nil {
return nil, errors.Errorf("nil db, cannot load org")
}

// do we have a recent cache?
key := fmt.Sprintf("%d", orgID)
var cached *OrgAssets
Expand Down Expand Up @@ -234,6 +238,10 @@ func (a *OrgAssets) Flow(flowUUID assets.FlowUUID) (assets.Flow, error) {
return nil, errors.Wrapf(err, "error loading flow: %s", flowUUID)
}

if dbFlow == nil {
return nil, errors.Errorf("no flow with uuid: %s", flowUUID)
}

a.flowCacheLock.Lock()
a.flowByID[dbFlow.ID()] = dbFlow
a.flowByUUID[dbFlow.UUID()] = dbFlow
Expand All @@ -256,6 +264,10 @@ func (a *OrgAssets) FlowByID(flowID FlowID) (*Flow, error) {
return nil, errors.Wrapf(err, "error loading flow: %d", flowID)
}

if dbFlow == nil {
return nil, errors.Errorf("no flow with id: %d", flowID)
}

a.flowCacheLock.Lock()
a.flowByID[dbFlow.ID()] = dbFlow
a.flowByUUID[dbFlow.UUID()] = dbFlow
Expand Down
46 changes: 31 additions & 15 deletions models/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ func (f *Flow) Name() string { return f.f.Name }
// Definition returns the definition for this flow
func (f *Flow) Definition() json.RawMessage { return f.f.Definition }

// SetDefinition sets our definition from the passed in new definition format
func (f *Flow) SetDefinition(definition json.RawMessage) {
f.f.Definition = definition
}

// SetLegacyDefinition sets our definition from the passed in legacy definition
func (f *Flow) SetLegacyDefinition(legacyDefinition json.RawMessage) error {
// load it in from our json
legacyFlow, err := legacy.ReadLegacyFlow(legacyDefinition)
if err != nil {
return errors.Wrapf(err, "error reading flow into legacy format: %s", legacyDefinition)
}

// migrate forwards returning our final flow definition
newFlow, err := legacyFlow.Migrate(false, false)
if err != nil {
return errors.Wrapf(err, "error migrating flow: %s", legacyDefinition)
}

// write this flow back out in our new format
f.f.Definition, err = json.Marshal(newFlow)
if err != nil {
return errors.Wrapf(err, "error mashalling migrated flow definition: %s", legacyDefinition)
}

return nil
}

// IsArchived returns whether this flow is archived
func (f *Flow) IsArchived() bool { return f.f.IsArchived }

Expand Down Expand Up @@ -75,22 +103,10 @@ func loadFlow(ctx context.Context, db *sqlx.DB, sql string, arg interface{}) (*F
return nil, errors.Wrapf(err, "error reading flow definition by: %s", arg)
}

// load it in from our json
legacyFlow, err := legacy.ReadLegacyFlow([]byte(flow.f.Definition))
if err != nil {
return nil, errors.Wrapf(err, "error reading flow into legacy format: %s", arg)
}

// migrate forwards returning our final flow definition
newFlow, err := legacyFlow.Migrate(false, false)
if err != nil {
return nil, errors.Wrapf(err, "error migrating flow: %s", arg)
}

// write this flow back out in our new format
flow.f.Definition, err = json.Marshal(newFlow)
// our definition is really a legacy definition, set it from that
err = flow.SetLegacyDefinition(flow.f.Definition)
if err != nil {
return nil, errors.Wrapf(err, "error mashalling migrated flow definition: %s", arg)
return nil, errors.Wrapf(err, "error setting flow definition from legacy")
}

return flow, nil
Expand Down
6 changes: 3 additions & 3 deletions models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/gsm7"
"github.com/pkg/errors"
null "gopkg.in/guregu/null.v3"
Expand Down Expand Up @@ -183,9 +183,9 @@ func NewOutgoingMsg(orgID OrgID, channel *Channel, contactID flows.ContactID, ou
url := a.URL()
if !strings.HasPrefix(url, "http") {
if strings.HasPrefix(url, "/") {
url = fmt.Sprintf("https://%s%s", mailroom.Config.AttachmentDomain, url)
url = fmt.Sprintf("https://%s%s", config.Mailroom.AttachmentDomain, url)
} else {
url = fmt.Sprintf("https://%s/%s", mailroom.Config.AttachmentDomain, url)
url = fmt.Sprintf("https://%s/%s", config.Mailroom.AttachmentDomain, url)
}
}
m.Attachments = append(m.Attachments, fmt.Sprintf("%s:%s", a.ContentType(), url))
Expand Down
2 changes: 1 addition & 1 deletion models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewSession(org *OrgAssets, s flows.Session) (*Session, error) {
// create our session object
session := &Session{
Status: sessionStatus,
Responded: false, // TODO: populate once we are running real flows
Responded: false,
Output: string(output),
ContactID: s.Contact().ID(),
OrgID: org.OrgID(),
Expand Down
Loading

0 comments on commit b8c97af

Please sign in to comment.