From 8b39d234570124bd9b461dbfc02b4e67d5116516 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Wed, 24 Oct 2018 14:23:00 -0500 Subject: [PATCH 1/3] v1 of simulation endpoint --- cmd/mailroom/main.go | 3 +- config.go => config/config.go | 23 ++- hooks/email_created.go | 4 +- hooks/email_created_test.go | 6 - hooks/msg_created_test.go | 7 +- mailroom.go | 21 ++- models/assets.go | 12 ++ models/flows.go | 46 ++++-- models/msgs.go | 6 +- models/runs.go | 2 +- web/handlers.go | 210 +++++++++++++++++++++++++++ web/middleware.go | 56 +++++++ web/server.go | 133 +++++++++++++++++ web/server_test.go | 266 ++++++++++++++++++++++++++++++++++ 14 files changed, 751 insertions(+), 44 deletions(-) rename config.go => config/config.go (74%) create mode 100644 web/handlers.go create mode 100644 web/middleware.go create mode 100644 web/server.go create mode 100644 web/server_test.go diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index 809a78f0d..312abd79d 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -9,6 +9,7 @@ import ( _ "github.com/lib/pq" "github.com/nyaruka/ezconf" "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" "github.com/sirupsen/logrus" _ "github.com/nyaruka/mailroom/campaigns" @@ -21,7 +22,7 @@ import ( var version = "Dev" func main() { - config := mailroom.NewMailroomConfig() + config := config.Mailroom loader := ezconf.NewLoader( config, "mailroom", "Mailroom - flow event handler for RapidPro", diff --git a/config.go b/config/config.go similarity index 74% rename from config.go rename to config/config.go index 0263b9a63..2fd314528 100644 --- a/config.go +++ b/config/config.go @@ -1,7 +1,13 @@ -package mailroom +package config -// MailroomConfig is our top level configuration object -type MailroomConfig struct { +var Mailroom *Config + +func init() { + Mailroom = NewMailroomConfig() +} + +// Config is our top level configuration object +type Config struct { SentryDSN string `help:"the DSN used for logging errors to Sentry"` DB string `help:"URL describing how to connect to the RapidPro database"` DBPoolSize int `help:"the size of our db pool"` @@ -17,11 +23,15 @@ type MailroomConfig struct { LibratoToken string `help:"the token that will be used to authenticate to Librato"` AttachmentDomain string `help:"the domain that will be used for relative attachment"` + + AuthToken string `help:"the token clients will need to authenticate web requests"` + Address string `help:"the address to bind our web server to"` + Port int `help:"the port to bind our web server to"` } // NewMailroomConfig returns a new default configuration object -func NewMailroomConfig() *MailroomConfig { - return &MailroomConfig{ +func NewMailroomConfig() *Config { + return &Config{ DB: "postgres://temba@localhost/temba?sslmode=disable", DBPoolSize: 8, Redis: "redis://localhost:6379/0", @@ -30,5 +40,8 @@ func NewMailroomConfig() *MailroomConfig { LogLevel: "error", Version: "Dev", SMTPServer: "", + + Address: "localhost", + Port: 8090, } } diff --git a/hooks/email_created.go b/hooks/email_created.go index 1c669f636..1023ccdfa 100644 --- a/hooks/email_created.go +++ b/hooks/email_created.go @@ -6,7 +6,7 @@ import ( "strconv" "github.com/go-mail/mail" - "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" "github.com/pkg/errors" "github.com/gomodule/redigo/redis" @@ -33,7 +33,7 @@ const ( // Apply sends all our emails func (h *SendEmails) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { // get our smtp server config - config := org.Org().ConfigValue(configSMTPServer, mailroom.Config.SMTPServer) + config := org.Org().ConfigValue(configSMTPServer, config.Mailroom.SMTPServer) // no config? noop if config == "" { diff --git a/hooks/email_created_test.go b/hooks/email_created_test.go index 25df256df..446c839b7 100644 --- a/hooks/email_created_test.go +++ b/hooks/email_created_test.go @@ -3,18 +3,12 @@ package hooks import ( "testing" - "github.com/nyaruka/mailroom" - "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/actions" "github.com/nyaruka/mailroom/testsuite" ) func TestEmailCreated(t *testing.T) { - // create a default config for our test - mailroom.Config = mailroom.NewMailroomConfig() - defer func() { mailroom.Config = nil }() - // configure mailtrap for our org db := testsuite.DB() db.MustExec(`UPDATE orgs_org SET config = '{"SMTP_SERVER": "smtp://24f335c64dbc28:d7966a553e76f6@smtp.mailtrap.io:2525/?from=mailroom@foo.bar"}' WHERE id = 1;`) diff --git a/hooks/msg_created_test.go b/hooks/msg_created_test.go index e41040abb..9e10a0644 100644 --- a/hooks/msg_created_test.go +++ b/hooks/msg_created_test.go @@ -3,7 +3,7 @@ package hooks import ( "testing" - "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/goflow/flows" @@ -13,9 +13,8 @@ import ( func TestMsgCreated(t *testing.T) { testsuite.Reset() - mailroom.Config = mailroom.NewMailroomConfig() - mailroom.Config.AttachmentDomain = "foo.bar.com" - defer func() { mailroom.Config = nil }() + config.Mailroom.AttachmentDomain = "foo.bar.com" + defer func() { config.Mailroom.AttachmentDomain = "" }() // add a URN for cathy so we can test all urn sends testsuite.DB().MustExec( diff --git a/mailroom.go b/mailroom.go index 97a5d66a4..c76131187 100644 --- a/mailroom.go +++ b/mailroom.go @@ -16,7 +16,9 @@ import ( "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/nyaruka/librato" + "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/queue" + "github.com/nyaruka/mailroom/web" "github.com/sirupsen/logrus" ) @@ -40,16 +42,12 @@ func AddTaskFunction(taskType string, taskFunc TaskFunction) { taskFunctions[taskType] = taskFunc } -// TODO: better handling of global config -// Config our global mailroom config -var Config *MailroomConfig - const BatchQueue = "batch" const HandlerQueue = "handler" // Mailroom is a service for handling RapidPro events type Mailroom struct { - Config *MailroomConfig + Config *config.Config DB *sqlx.DB RP *redis.Pool Quit chan bool @@ -59,11 +57,12 @@ type Mailroom struct { batchForeman *Foreman handlerForeman *Foreman + + webserver *web.Server } // NewMailroom creates and returns a new mailroom instance -func NewMailroom(config *MailroomConfig) *Mailroom { - Config = config +func NewMailroom(config *config.Config) *Mailroom { mr := &Mailroom{ Config: config, Quit: make(chan bool), @@ -174,6 +173,10 @@ func (mr *Mailroom) Start() error { mr.batchForeman.Start() mr.handlerForeman.Start() + // start our web server + mr.webserver = web.NewServer(mr.CTX, mr.DB, mr.RP, mr.Config, mr.WaitGroup) + mr.webserver.Start() + logrus.Info("mailroom started") // wait for any signals such as QUIT for dumping stack @@ -190,6 +193,10 @@ func (mr *Mailroom) Stop() error { librato.Stop() close(mr.Quit) mr.Cancel() + + // stop our web server + mr.webserver.Stop() + mr.WaitGroup.Wait() logrus.Info("mailroom stopped") return nil diff --git a/models/assets.go b/models/assets.go index 155e8e1c0..63f007052 100644 --- a/models/assets.go +++ b/models/assets.go @@ -70,6 +70,10 @@ func FlushCache() { // GetOrgAssets creates or gets org assets for the passed in org func GetOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID) (*OrgAssets, error) { + if db == nil { + return nil, errors.Errorf("nil db, cannot load org") + } + // do we have a recent cache? key := fmt.Sprintf("%d", orgID) var cached *OrgAssets @@ -234,6 +238,10 @@ func (a *OrgAssets) Flow(flowUUID assets.FlowUUID) (assets.Flow, error) { return nil, errors.Wrapf(err, "error loading flow: %s", flowUUID) } + if dbFlow == nil { + return nil, errors.Errorf("no flow with uuid: %s", flowUUID) + } + a.flowCacheLock.Lock() a.flowByID[dbFlow.ID()] = dbFlow a.flowByUUID[dbFlow.UUID()] = dbFlow @@ -256,6 +264,10 @@ func (a *OrgAssets) FlowByID(flowID FlowID) (*Flow, error) { return nil, errors.Wrapf(err, "error loading flow: %d", flowID) } + if dbFlow == nil { + return nil, errors.Errorf("no flow with id: %d", flowID) + } + a.flowCacheLock.Lock() a.flowByID[dbFlow.ID()] = dbFlow a.flowByUUID[dbFlow.UUID()] = dbFlow diff --git a/models/flows.go b/models/flows.go index 2ecf20c10..74972d16b 100644 --- a/models/flows.go +++ b/models/flows.go @@ -36,6 +36,34 @@ func (f *Flow) Name() string { return f.f.Name } // Definition returns the definition for this flow func (f *Flow) Definition() json.RawMessage { return f.f.Definition } +// SetDefinition sets our definition from the passed in new definition format +func (f *Flow) SetDefinition(definition json.RawMessage) { + f.f.Definition = definition +} + +// SetLegacyDefinition sets our definition from the passed in legacy definition +func (f *Flow) SetLegacyDefinition(legacyDefinition json.RawMessage) error { + // load it in from our json + legacyFlow, err := legacy.ReadLegacyFlow(legacyDefinition) + if err != nil { + return errors.Wrapf(err, "error reading flow into legacy format: %s", legacyDefinition) + } + + // migrate forwards returning our final flow definition + newFlow, err := legacyFlow.Migrate(false, false) + if err != nil { + return errors.Wrapf(err, "error migrating flow: %s", legacyDefinition) + } + + // write this flow back out in our new format + f.f.Definition, err = json.Marshal(newFlow) + if err != nil { + return errors.Wrapf(err, "error mashalling migrated flow definition: %s", legacyDefinition) + } + + return nil +} + // IsArchived returns whether this flow is archived func (f *Flow) IsArchived() bool { return f.f.IsArchived } @@ -75,22 +103,10 @@ func loadFlow(ctx context.Context, db *sqlx.DB, sql string, arg interface{}) (*F return nil, errors.Wrapf(err, "error reading flow definition by: %s", arg) } - // load it in from our json - legacyFlow, err := legacy.ReadLegacyFlow([]byte(flow.f.Definition)) - if err != nil { - return nil, errors.Wrapf(err, "error reading flow into legacy format: %s", arg) - } - - // migrate forwards returning our final flow definition - newFlow, err := legacyFlow.Migrate(false, false) - if err != nil { - return nil, errors.Wrapf(err, "error migrating flow: %s", arg) - } - - // write this flow back out in our new format - flow.f.Definition, err = json.Marshal(newFlow) + // our definition is really a legacy definition, set it from that + err = flow.SetLegacyDefinition(flow.f.Definition) if err != nil { - return nil, errors.Wrapf(err, "error mashalling migrated flow definition: %s", arg) + return nil, errors.Wrapf(err, "error setting flow definition from legacy") } return flow, nil diff --git a/models/msgs.go b/models/msgs.go index b6a1253ca..32932d8cb 100644 --- a/models/msgs.go +++ b/models/msgs.go @@ -15,7 +15,7 @@ import ( "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/gsm7" "github.com/pkg/errors" null "gopkg.in/guregu/null.v3" @@ -183,9 +183,9 @@ func NewOutgoingMsg(orgID OrgID, channel *Channel, contactID flows.ContactID, ou url := a.URL() if !strings.HasPrefix(url, "http") { if strings.HasPrefix(url, "/") { - url = fmt.Sprintf("https://%s%s", mailroom.Config.AttachmentDomain, url) + url = fmt.Sprintf("https://%s%s", config.Mailroom.AttachmentDomain, url) } else { - url = fmt.Sprintf("https://%s/%s", mailroom.Config.AttachmentDomain, url) + url = fmt.Sprintf("https://%s/%s", config.Mailroom.AttachmentDomain, url) } } m.Attachments = append(m.Attachments, fmt.Sprintf("%s:%s", a.ContentType(), url)) diff --git a/models/runs.go b/models/runs.go index 8ed7462e0..3af55b744 100644 --- a/models/runs.go +++ b/models/runs.go @@ -171,7 +171,7 @@ func NewSession(org *OrgAssets, s flows.Session) (*Session, error) { // create our session object session := &Session{ Status: sessionStatus, - Responded: false, // TODO: populate once we are running real flows + Responded: false, Output: string(output), ContactID: s.Contact().ID(), OrgID: org.OrgID(), diff --git a/web/handlers.go b/web/handlers.go new file mode 100644 index 000000000..d6fb730ae --- /dev/null +++ b/web/handlers.go @@ -0,0 +1,210 @@ +package web + +import ( + "encoding/json" + "net/http" + + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/engine" + "github.com/nyaruka/goflow/flows/resumes" + "github.com/nyaruka/goflow/flows/triggers" + "github.com/nyaruka/goflow/utils" + "github.com/nyaruka/mailroom/models" + "github.com/pkg/errors" +) + +const ( + maxRequestBytes int64 = 1048576 +) + +var ( + httpClient = utils.NewHTTPClient("mailroom") +) + +type flowDefinition struct { + UUID assets.FlowUUID `json:"uuid" validate:"required"` + Definition json.RawMessage `json:"definition"` + LegacyDefinition json.RawMessage `json:"legacy_definition"` +} + +type sessionRequest struct { + OrgID models.OrgID `json:"org_id" validate:"required"` + Flows []flowDefinition `json:"flows"` +} + +type sessionResponse struct { + Session flows.Session `json:"session"` + Events []flows.Event `json:"events"` +} + +// Starts a new engine session +// +// { +// "org_id": 1, +// "flows": [{ +// "uuid": uuidv4, +// "definition": "goflow definition", +// "legacy_definition": "legacy definition", +// },.. ], +// "trigger": {...} +// } +// +type startRequest struct { + sessionRequest + LegacyFlow json.RawMessage `json:"legacy_flow"` + Flow json.RawMessage `json:"flow"` + Trigger json.RawMessage `json:"trigger" validate:"required"` +} + +// handles a request to /start +func (s *Server) handleStart(r *http.Request) (interface{}, error) { + request := &startRequest{} + if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, maxRequestBytes); err != nil { + return nil, errors.Wrapf(err, "request failed validation") + } + + // grab our org + org, err := models.GetOrgAssets(s.ctx, s.db, request.OrgID) + if err != nil { + return nil, errors.Wrapf(err, "unable to load org assets") + } + + // for each of our passed in definitions + for _, flow := range request.Flows { + // populate our flow in our org from our request + err = populateFlow(org, flow.UUID, flow.Definition, flow.LegacyDefinition) + if err != nil { + return nil, err + } + } + + // build our session + assets, err := models.GetSessionAssets(org) + if err != nil { + return nil, errors.Wrapf(err, "unable get session assets") + } + + session := engine.NewSession(assets, engine.NewDefaultConfig(), httpClient) + + // read our trigger + trigger, err := triggers.ReadTrigger(session, request.Trigger) + if err != nil { + return nil, errors.Wrapf(err, "unable to read trigger") + } + + // start our flow + newEvents, err := session.Start(trigger) + if err != nil { + return nil, errors.Wrapf(err, "error starting session") + } + + return &sessionResponse{Session: session, Events: newEvents}, nil +} + +// Resumes an existing engine session +// +// { +// "org_id": 1, +// "flows": [{ +// "uuid": uuidv4, +// "definition": "goflow definition", +// "legacy_definition": "legacy definition", +// },.. ], +// "session": {"uuid": "468621a8-32e6-4cd2-afc1-04416f7151f0", "runs": [...], ...}, +// "resume": {...} +// } +// +type resumeRequest struct { + sessionRequest + + Session json.RawMessage `json:"session" validate:"required"` + Resume json.RawMessage `json:"resume" validate:"required"` +} + +func (s *Server) handleResume(r *http.Request) (interface{}, error) { + request := &resumeRequest{} + if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, maxRequestBytes); err != nil { + return nil, err + } + + // grab our org + org, err := models.GetOrgAssets(s.ctx, s.db, request.OrgID) + if err != nil { + return nil, err + } + + // for each of our passed in definitions + for _, flow := range request.Flows { + // populate our flow in our org from our request + err = populateFlow(org, flow.UUID, flow.Definition, flow.LegacyDefinition) + if err != nil { + return nil, err + } + } + + // build our session + assets, err := models.GetSessionAssets(org) + if err != nil { + return nil, err + } + + session, err := engine.ReadSession(assets, engine.NewDefaultConfig(), httpClient, request.Session) + if err != nil { + return nil, err + } + + // read our resume + resume, err := resumes.ReadResume(session, request.Resume) + if err != nil { + return nil, err + } + + // resume our session + newEvents, err := session.Resume(resume) + if err != nil { + return nil, err + } + + return &sessionResponse{Session: session, Events: newEvents}, nil +} + +// populateFlow takes care of setting the definition for the flow with the passed in UUID according to the passed in definitions +func populateFlow(org *models.OrgAssets, uuid assets.FlowUUID, flowDef json.RawMessage, legacyFlowDef json.RawMessage) error { + f, err := org.Flow(uuid) + if err != nil { + return errors.Wrapf(err, "unable to find flow with uuid: %s", uuid) + } + + flow := f.(*models.Flow) + if flowDef != nil { + flow.SetDefinition(flowDef) + return nil + } + + if legacyFlowDef != nil { + err = flow.SetLegacyDefinition(legacyFlowDef) + if err != nil { + return errors.Wrapf(err, "unable to populate flow: %s invalid definition", uuid) + } + return nil + } + + return errors.Errorf("missing definition or legacy_definition for flow: %s", uuid) +} + +func (s *Server) handleIndex(r *http.Request) (interface{}, error) { + response := map[string]string{ + "component": "mailroom", + "version": s.config.Version, + } + return response, nil +} + +func (s *Server) handle404(r *http.Request) (interface{}, error) { + return nil, errors.Errorf("not found: %s", r.URL.String()) +} + +func (s *Server) handle405(r *http.Request) (interface{}, error) { + return nil, errors.Errorf("illegal method: %s", r.Method) +} diff --git a/web/middleware.go b/web/middleware.go new file mode 100644 index 000000000..4b534ccd3 --- /dev/null +++ b/web/middleware.go @@ -0,0 +1,56 @@ +package web + +import ( + "errors" + "fmt" + "net/http" + "runtime/debug" + "strconv" + "time" + + "github.com/go-chi/chi/middleware" + log "github.com/sirupsen/logrus" +) + +func requestLogger(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) + start := time.Now() + + next.ServeHTTP(ww, r) + + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + + elapsed := time.Now().Sub(start).Nanoseconds() + uri := fmt.Sprintf("%s://%s%s", scheme, r.Host, r.RequestURI) + + ww.Header().Set("X-Elapsed-NS", strconv.FormatInt(elapsed, 10)) + + log.WithFields(log.Fields{ + "http_method": r.Method, + "resp_status": ww.Status(), + "resp_time_ms": float64(elapsed) / 1000000.0, + "resp_bytes_length": ww.BytesWritten(), + "uri": uri, + "user_agent": r.UserAgent(), + }).Info("request completed") + }) +} + +// recovers from panics, logs them to sentry and returns an HTTP 500 response +func panicRecovery(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if rvr := recover(); rvr != nil { + debug.PrintStack() + log.WithError(errors.New(fmt.Sprint(rvr))).Error("recovered from panic in web handling") + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + } + }() + + next.ServeHTTP(w, r) + }) +} diff --git a/web/server.go b/web/server.go new file mode 100644 index 000000000..c5ac52c4b --- /dev/null +++ b/web/server.go @@ -0,0 +1,133 @@ +package web + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/nyaruka/mailroom/config" + "github.com/sirupsen/logrus" + + "github.com/gomodule/redigo/redis" + "github.com/jmoiron/sqlx" +) + +func NewServer(ctx context.Context, db *sqlx.DB, rp *redis.Pool, config *config.Config, wg *sync.WaitGroup) *Server { + s := &Server{ + ctx: ctx, + rp: rp, + db: db, + wg: wg, + + config: config, + } + + // set up our middlewares + router := chi.NewRouter() + router.Use(middleware.DefaultCompress) + router.Use(middleware.StripSlashes) + router.Use(middleware.RequestID) + router.Use(middleware.RealIP) + router.Use(panicRecovery) + router.Use(middleware.Timeout(30 * time.Second)) + router.Use(requestLogger) + + // wire up our main pages + router.NotFound(s.wrapJSONHandler(s.handle404)) + router.MethodNotAllowed(s.wrapJSONHandler(s.handle405)) + router.Get("/", s.wrapJSONHandler(s.handleIndex)) + router.Post("/sim/start", s.wrapJSONHandler(s.handleStart)) + router.Post("/sim/resume", s.wrapJSONHandler(s.handleResume)) + + // configure our http server + s.httpServer = &http.Server{ + Addr: fmt.Sprintf("%s:%d", config.Address, config.Port), + Handler: router, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + } + + return s +} + +type JSONHandler func(r *http.Request) (interface{}, error) + +func (s *Server) wrapJSONHandler(handler JSONHandler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + auth := r.Header.Get("authorization") + if s.config.AuthToken != "" && s.config.AuthToken != fmt.Sprintf("Token %s", auth) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(`{"error": "missing bearer token"}`)) + return + } + + value, err := handler(r) + if err != nil { + value = map[string]string{ + "error": err.Error(), + } + } + + serialized, serr := json.Marshal(value) + if serr != nil { + logrus.WithError(err).Error("error serializing handler response") + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(`{"error": "error serializing handler response"}`)) + return + } + + if err != nil { + // TODO: this should be more specific + w.WriteHeader(http.StatusBadRequest) + w.Write(serialized) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(serialized) + } +} + +// Start starts our web server, listening for new requests +func (s *Server) Start() { + // start serving HTTP + go func() { + s.wg.Add(1) + defer s.wg.Done() + + err := s.httpServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + logrus.WithFields(logrus.Fields{ + "comp": "server", + "state": "stopping", + "err": err, + }).Error() + } + }() + + logrus.WithField("address", s.config.Address).WithField("port", s.config.Port).Info("server started") +} + +// Stop stops our web server +func (s *Server) Stop() { + // shut down our HTTP server + if err := s.httpServer.Shutdown(context.Background()); err != nil { + logrus.WithField("state", "stopping").WithError(err).Error("error shutting down server") + } +} + +type Server struct { + ctx context.Context + rp *redis.Pool + db *sqlx.DB + wg *sync.WaitGroup + + config *config.Config + + httpServer *http.Server +} diff --git a/web/server_test.go b/web/server_test.go new file mode 100644 index 000000000..51d039803 --- /dev/null +++ b/web/server_test.go @@ -0,0 +1,266 @@ +package web + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" + "testing" + + "github.com/nyaruka/mailroom/config" + "github.com/nyaruka/mailroom/testsuite" + "github.com/stretchr/testify/assert" +) + +const ( + startBody = ` + { + "org_id": 1, + "trigger": { + "contact": { + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "fields": {}, + "id": 1234567, + "language": "eng", + "name": "Ben Haggerty", + "timezone": "America/Guayaquil", + "urns": [ + "tel:+12065551212" + ], + "uuid": "ba96bf7f-bc2a-4873-a7c7-254d1927c4e3" + }, + "environment": { + "allowed_languages": [ + "eng", + "fra" + ], + "date_format": "YYYY-MM-DD", + "default_language": "eng", + "time_format": "hh:mm", + "timezone": "America/Los_Angeles" + }, + "flow": { + "name": "Favorites", + "uuid": "51e3c67d-8483-449c-abf7-25e50686f0db" + }, + "triggered_on": "2000-01-01T00:00:00.000000000-00:00", + "type": "manual" + } + }` + + resumeBody = ` + { + "org_id": 1, + "resume": { + "contact": { + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "fields": {}, + "id": 1234567, + "language": "eng", + "name": "Ben Haggerty", + "timezone": "America/Guayaquil", + "urns": [ + "tel:+12065551212" + ], + "uuid": "ba96bf7f-bc2a-4873-a7c7-254d1927c4e3" + }, + "environment": { + "allowed_languages": [ + "eng", + "fra" + ], + "date_format": "YYYY-MM-DD", + "default_language": "eng", + "time_format": "hh:mm", + "timezone": "America/New_York" + }, + "msg": { + "channel": { + "name": "Nexmo", + "uuid": "c534272e-817d-4a78-a70c-f21df34407f8" + }, + "text": "I like blue!", + "urn": "tel:+12065551212", + "uuid": "9bf91c2b-ce58-4cef-aacc-281e03f69ab5" + }, + "resumed_on": "2000-01-01T00:00:00.000000000-00:00", + "type": "msg" + }, + "session": { + "environment": { + "date_format": "YYYY-MM-DD", + "time_format": "tt:mm", + "timezone": "UTC", + "redaction_policy": "none" + }, + "trigger": { + "type": "manual", + "flow": { + "uuid": "51e3c67d-8483-449c-abf7-25e50686f0db", + "name": "Registration" + }, + "contact": { + "uuid": "ba96bf7f-bc2a-4873-a7c7-254d1927c4e3", + "id": 1234567, + "name": "Ben Haggerty", + "language": "eng", + "created_on": "2000-01-01T00:00:00Z", + "urns": [ + "tel:+12065551212" + ] + }, + "triggered_on": "2000-01-01T00:00:00Z" + }, + "contact": { + "uuid": "ba96bf7f-bc2a-4873-a7c7-254d1927c4e3", + "id": 1234567, + "name": "Ben Haggerty", + "language": "eng", + "created_on": "2000-01-01T00:00:00Z", + "urns": [ + "tel:+12065551212" + ], + "groups": [ + { + "uuid": "caae117e-c26c-4625-96d5-ec4a0e7b8cdb", + "name": "Unregistered (Dynamic)" + } + ] + }, + "runs": [ + { + "uuid": "e276a417-a460-4b58-8e1d-a9bc7c9dd508", + "flow": { + "uuid": "51e3c67d-8483-449c-abf7-25e50686f0db", + "name": "Favorites" + }, + "path": [ + { + "uuid": "4799ff0d-c2f8-483b-bfd9-a2286f8bb539", + "node_uuid": "9c8f9c1b-4d67-4deb-94d7-411434c12c82", + "exit_uuid": "ccd78ace-36cc-4528-8401-9d1de3bf1a27", + "arrived_on": "2018-10-24T12:07:27.186692-05:00" + }, + { + "uuid": "2106dc49-9269-4559-85d5-884e1ea334e5", + "node_uuid": "5272947a-b80b-47ff-ad76-182ec9185d31", + "arrived_on": "2018-10-24T12:07:27.186754-05:00" + } + ], + "events": [ + { + "type": "msg_created", + "created_on": "2018-10-24T12:07:27.186751-05:00", + "step_uuid": "4799ff0d-c2f8-483b-bfd9-a2286f8bb539", + "msg": { + "uuid": "17b9b827-0eff-4abe-91dd-77dcda64de2a", + "urn": "tel:+12065551212", + "channel": { + "uuid": "ac4c718a-db3f-4d8a-ae43-321f1a5bd44a", + "name": "Android" + }, + "text": "What is your favorite color?" + } + }, + { + "type": "msg_wait", + "created_on": "2018-10-24T12:07:27.186755-05:00", + "step_uuid": "2106dc49-9269-4559-85d5-884e1ea334e5" + } + ], + "status": "waiting", + "created_on": "2018-10-24T12:07:27.186687-05:00", + "modified_on": "2018-10-24T12:07:27.186756-05:00", + "expires_on": "2018-10-25T00:07:27.186689-05:00", + "exited_on": null + } + ], + "status": "waiting", + "wait": { + "type": "msg" + } + }, + "events": [ + { + "type": "contact_groups_changed", + "created_on": "2018-10-24T12:07:27.186686-05:00", + "groups_added": [ + { + "uuid": "caae117e-c26c-4625-96d5-ec4a0e7b8cdb", + "name": "Unregistered (Dynamic)" + } + ] + }, + { + "type": "msg_created", + "created_on": "2018-10-24T12:07:27.186751-05:00", + "step_uuid": "4799ff0d-c2f8-483b-bfd9-a2286f8bb539", + "msg": { + "uuid": "17b9b827-0eff-4abe-91dd-77dcda64de2a", + "urn": "tel:+12065551212", + "channel": { + "uuid": "ac4c718a-db3f-4d8a-ae43-321f1a5bd44a", + "name": "Android" + }, + "text": "What is your favorite color?" + } + }, + { + "type": "msg_wait", + "created_on": "2018-10-24T12:07:27.186755-05:00", + "step_uuid": "2106dc49-9269-4559-85d5-884e1ea334e5" + } + ] + }` +) + +func TestServer(t *testing.T) { + testsuite.Reset() + ctx := testsuite.CTX() + db := testsuite.DB() + rp := testsuite.RP() + wg := &sync.WaitGroup{} + + server := NewServer(ctx, db, rp, config.Mailroom, wg) + server.Start() + defer server.Stop() + + // TODO: test custom flow definitions + + tcs := []struct { + URL string + Method string + Body string + Status int + Response string + }{ + {"/", "POST", "", 400, "illegal"}, + {"/", "GET", "", 200, "mailroom"}, + {"/sim/start", "GET", "", 400, "illegal"}, + {"/sim/start", "POST", startBody, 200, "What is your favorite color?"}, + {"/sim/resume", "GET", "", 400, "illegal"}, + {"/sim/resume", "POST", resumeBody, 200, "Good choice, I like Blue too! What is your favorite beer?"}, + } + + for i, tc := range tcs { + var body io.Reader + if tc.Body != "" { + body = bytes.NewReader([]byte(tc.Body)) + } + + req, err := http.NewRequest(tc.Method, "http://localhost:8090"+tc.URL, body) + assert.NoError(t, err, "%d: error creating request", i) + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err, "%d: error marking request", i) + + assert.Equal(t, tc.Status, resp.StatusCode, "%d: unexpected status", i) + + content, err := ioutil.ReadAll(resp.Body) + assert.NoError(t, err, "%d: error reading body", i) + + assert.True(t, strings.Contains(string(content), tc.Response), "%d: did not find string: %s in body: %s", i, tc.Response, string(content)) + } +} From a25f6460ecf85a22376649bdc51624c0d502ae96 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Wed, 24 Oct 2018 14:28:45 -0500 Subject: [PATCH 2/3] give server time to start --- web/server.go | 1 + web/server_test.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/web/server.go b/web/server.go index c5ac52c4b..35fbc0dae 100644 --- a/web/server.go +++ b/web/server.go @@ -17,6 +17,7 @@ import ( "github.com/jmoiron/sqlx" ) +// NewServer creates a new web server, it will need to be started after being created func NewServer(ctx context.Context, db *sqlx.DB, rp *redis.Pool, config *config.Config, wg *sync.WaitGroup) *Server { s := &Server{ ctx: ctx, diff --git a/web/server_test.go b/web/server_test.go index 51d039803..c9220bd86 100644 --- a/web/server_test.go +++ b/web/server_test.go @@ -8,6 +8,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/testsuite" @@ -225,6 +226,10 @@ func TestServer(t *testing.T) { server := NewServer(ctx, db, rp, config.Mailroom, wg) server.Start() + + // give our server time to start + time.Sleep(time.Second) + defer server.Stop() // TODO: test custom flow definitions From f36d77fa81a280731349cb96bf07275854e7129a Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Wed, 24 Oct 2018 14:29:16 -0500 Subject: [PATCH 3/3] fix error message --- web/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/server_test.go b/web/server_test.go index c9220bd86..b2299df46 100644 --- a/web/server_test.go +++ b/web/server_test.go @@ -259,7 +259,7 @@ func TestServer(t *testing.T) { assert.NoError(t, err, "%d: error creating request", i) resp, err := http.DefaultClient.Do(req) - assert.NoError(t, err, "%d: error marking request", i) + assert.NoError(t, err, "%d: error making request", i) assert.Equal(t, tc.Status, resp.StatusCode, "%d: unexpected status", i)