Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1 of simulation endpoint #42

Merged
merged 3 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/sirupsen/logrus"

_ "github.com/nyaruka/mailroom/campaigns"
Expand All @@ -21,7 +22,7 @@ import (
var version = "Dev"

func main() {
config := mailroom.NewMailroomConfig()
config := config.Mailroom
loader := ezconf.NewLoader(
config,
"mailroom", "Mailroom - flow event handler for RapidPro",
Expand Down
23 changes: 18 additions & 5 deletions config.go → config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package mailroom
package config

// MailroomConfig is our top level configuration object
type MailroomConfig struct {
var Mailroom *Config

func init() {
Mailroom = NewMailroomConfig()
}

// Config is our top level configuration object
type Config struct {
SentryDSN string `help:"the DSN used for logging errors to Sentry"`
DB string `help:"URL describing how to connect to the RapidPro database"`
DBPoolSize int `help:"the size of our db pool"`
Expand All @@ -17,11 +23,15 @@ type MailroomConfig struct {
LibratoToken string `help:"the token that will be used to authenticate to Librato"`

AttachmentDomain string `help:"the domain that will be used for relative attachment"`

AuthToken string `help:"the token clients will need to authenticate web requests"`
Address string `help:"the address to bind our web server to"`
Port int `help:"the port to bind our web server to"`
}

// NewMailroomConfig returns a new default configuration object
func NewMailroomConfig() *MailroomConfig {
return &MailroomConfig{
func NewMailroomConfig() *Config {
return &Config{
DB: "postgres://temba@localhost/temba?sslmode=disable",
DBPoolSize: 8,
Redis: "redis://localhost:6379/0",
Expand All @@ -30,5 +40,8 @@ func NewMailroomConfig() *MailroomConfig {
LogLevel: "error",
Version: "Dev",
SMTPServer: "",

Address: "localhost",
Port: 8090,
}
}
4 changes: 2 additions & 2 deletions hooks/email_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"

"github.com/go-mail/mail"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/pkg/errors"

"github.com/gomodule/redigo/redis"
Expand All @@ -33,7 +33,7 @@ const (
// Apply sends all our emails
func (h *SendEmails) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
// get our smtp server config
config := org.Org().ConfigValue(configSMTPServer, mailroom.Config.SMTPServer)
config := org.Org().ConfigValue(configSMTPServer, config.Mailroom.SMTPServer)

// no config? noop
if config == "" {
Expand Down
6 changes: 0 additions & 6 deletions hooks/email_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@ package hooks
import (
"testing"

"github.com/nyaruka/mailroom"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/actions"
"github.com/nyaruka/mailroom/testsuite"
)

func TestEmailCreated(t *testing.T) {
// create a default config for our test
mailroom.Config = mailroom.NewMailroomConfig()
defer func() { mailroom.Config = nil }()

// configure mailtrap for our org
db := testsuite.DB()
db.MustExec(`UPDATE orgs_org SET config = '{"SMTP_SERVER": "smtp://24f335c64dbc28:[email protected]:2525/[email protected]"}' WHERE id = 1;`)
Expand Down
7 changes: 3 additions & 4 deletions hooks/msg_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package hooks
import (
"testing"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/testsuite"

"github.com/nyaruka/goflow/flows"
Expand All @@ -13,9 +13,8 @@ import (
func TestMsgCreated(t *testing.T) {
testsuite.Reset()

mailroom.Config = mailroom.NewMailroomConfig()
mailroom.Config.AttachmentDomain = "foo.bar.com"
defer func() { mailroom.Config = nil }()
config.Mailroom.AttachmentDomain = "foo.bar.com"
defer func() { config.Mailroom.AttachmentDomain = "" }()

// add a URN for cathy so we can test all urn sends
testsuite.DB().MustExec(
Expand Down
21 changes: 14 additions & 7 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/librato"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/web"
"github.com/sirupsen/logrus"
)

Expand All @@ -40,16 +42,12 @@ func AddTaskFunction(taskType string, taskFunc TaskFunction) {
taskFunctions[taskType] = taskFunc
}

// TODO: better handling of global config
// Config our global mailroom config
var Config *MailroomConfig

const BatchQueue = "batch"
const HandlerQueue = "handler"

// Mailroom is a service for handling RapidPro events
type Mailroom struct {
Config *MailroomConfig
Config *config.Config
DB *sqlx.DB
RP *redis.Pool
Quit chan bool
Expand All @@ -59,11 +57,12 @@ type Mailroom struct {

batchForeman *Foreman
handlerForeman *Foreman

webserver *web.Server
}

// NewMailroom creates and returns a new mailroom instance
func NewMailroom(config *MailroomConfig) *Mailroom {
Config = config
func NewMailroom(config *config.Config) *Mailroom {
mr := &Mailroom{
Config: config,
Quit: make(chan bool),
Expand Down Expand Up @@ -174,6 +173,10 @@ func (mr *Mailroom) Start() error {
mr.batchForeman.Start()
mr.handlerForeman.Start()

// start our web server
mr.webserver = web.NewServer(mr.CTX, mr.DB, mr.RP, mr.Config, mr.WaitGroup)
mr.webserver.Start()

logrus.Info("mailroom started")

// wait for any signals such as QUIT for dumping stack
Expand All @@ -190,6 +193,10 @@ func (mr *Mailroom) Stop() error {
librato.Stop()
close(mr.Quit)
mr.Cancel()

// stop our web server
mr.webserver.Stop()

mr.WaitGroup.Wait()
logrus.Info("mailroom stopped")
return nil
Expand Down
12 changes: 12 additions & 0 deletions models/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func FlushCache() {

// GetOrgAssets creates or gets org assets for the passed in org
func GetOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID) (*OrgAssets, error) {
if db == nil {
return nil, errors.Errorf("nil db, cannot load org")
}

// do we have a recent cache?
key := fmt.Sprintf("%d", orgID)
var cached *OrgAssets
Expand Down Expand Up @@ -234,6 +238,10 @@ func (a *OrgAssets) Flow(flowUUID assets.FlowUUID) (assets.Flow, error) {
return nil, errors.Wrapf(err, "error loading flow: %s", flowUUID)
}

if dbFlow == nil {
return nil, errors.Errorf("no flow with uuid: %s", flowUUID)
}

a.flowCacheLock.Lock()
a.flowByID[dbFlow.ID()] = dbFlow
a.flowByUUID[dbFlow.UUID()] = dbFlow
Expand All @@ -256,6 +264,10 @@ func (a *OrgAssets) FlowByID(flowID FlowID) (*Flow, error) {
return nil, errors.Wrapf(err, "error loading flow: %d", flowID)
}

if dbFlow == nil {
return nil, errors.Errorf("no flow with id: %d", flowID)
}

a.flowCacheLock.Lock()
a.flowByID[dbFlow.ID()] = dbFlow
a.flowByUUID[dbFlow.UUID()] = dbFlow
Expand Down
46 changes: 31 additions & 15 deletions models/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ func (f *Flow) Name() string { return f.f.Name }
// Definition returns the definition for this flow
func (f *Flow) Definition() json.RawMessage { return f.f.Definition }

// SetDefinition sets our definition from the passed in new definition format
func (f *Flow) SetDefinition(definition json.RawMessage) {
f.f.Definition = definition
}

// SetLegacyDefinition sets our definition from the passed in legacy definition
func (f *Flow) SetLegacyDefinition(legacyDefinition json.RawMessage) error {
// load it in from our json
legacyFlow, err := legacy.ReadLegacyFlow(legacyDefinition)
if err != nil {
return errors.Wrapf(err, "error reading flow into legacy format: %s", legacyDefinition)
}

// migrate forwards returning our final flow definition
newFlow, err := legacyFlow.Migrate(false, false)
if err != nil {
return errors.Wrapf(err, "error migrating flow: %s", legacyDefinition)
}

// write this flow back out in our new format
f.f.Definition, err = json.Marshal(newFlow)
if err != nil {
return errors.Wrapf(err, "error mashalling migrated flow definition: %s", legacyDefinition)
}

return nil
}

// IsArchived returns whether this flow is archived
func (f *Flow) IsArchived() bool { return f.f.IsArchived }

Expand Down Expand Up @@ -75,22 +103,10 @@ func loadFlow(ctx context.Context, db *sqlx.DB, sql string, arg interface{}) (*F
return nil, errors.Wrapf(err, "error reading flow definition by: %s", arg)
}

// load it in from our json
legacyFlow, err := legacy.ReadLegacyFlow([]byte(flow.f.Definition))
if err != nil {
return nil, errors.Wrapf(err, "error reading flow into legacy format: %s", arg)
}

// migrate forwards returning our final flow definition
newFlow, err := legacyFlow.Migrate(false, false)
if err != nil {
return nil, errors.Wrapf(err, "error migrating flow: %s", arg)
}

// write this flow back out in our new format
flow.f.Definition, err = json.Marshal(newFlow)
// our definition is really a legacy definition, set it from that
err = flow.SetLegacyDefinition(flow.f.Definition)
if err != nil {
return nil, errors.Wrapf(err, "error mashalling migrated flow definition: %s", arg)
return nil, errors.Wrapf(err, "error setting flow definition from legacy")
}

return flow, nil
Expand Down
6 changes: 3 additions & 3 deletions models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/gsm7"
"github.com/pkg/errors"
null "gopkg.in/guregu/null.v3"
Expand Down Expand Up @@ -183,9 +183,9 @@ func NewOutgoingMsg(orgID OrgID, channel *Channel, contactID flows.ContactID, ou
url := a.URL()
if !strings.HasPrefix(url, "http") {
if strings.HasPrefix(url, "/") {
url = fmt.Sprintf("https://%s%s", mailroom.Config.AttachmentDomain, url)
url = fmt.Sprintf("https://%s%s", config.Mailroom.AttachmentDomain, url)
} else {
url = fmt.Sprintf("https://%s/%s", mailroom.Config.AttachmentDomain, url)
url = fmt.Sprintf("https://%s/%s", config.Mailroom.AttachmentDomain, url)
}
}
m.Attachments = append(m.Attachments, fmt.Sprintf("%s:%s", a.ContentType(), url))
Expand Down
2 changes: 1 addition & 1 deletion models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewSession(org *OrgAssets, s flows.Session) (*Session, error) {
// create our session object
session := &Session{
Status: sessionStatus,
Responded: false, // TODO: populate once we are running real flows
Responded: false,
Output: string(output),
ContactID: s.Contact().ID(),
OrgID: org.OrgID(),
Expand Down
Loading