Skip to content

Commit

Permalink
Update to latest goflow which requires mapping groups and flows to id…
Browse files Browse the repository at this point in the history
…s for ES queries
  • Loading branch information
rowanseymour committed Apr 4, 2022
1 parent e34f917 commit 7823156
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 44 deletions.
9 changes: 6 additions & 3 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ func (c *Contact) UpdatePreferredURN(ctx context.Context, db Queryer, oa *OrgAss
// FlowContact converts our mailroom contact into a flow contact for use in the engine
func (c *Contact) FlowContact(oa *OrgAssets) (*flows.Contact, error) {
// convert our groups to a list of references
groups := make([]*assets.GroupReference, len(c.groups))
for i, g := range c.groups {
groups[i] = assets.NewGroupReference(g.UUID(), g.Name())
groups := make([]*assets.GroupReference, 0, len(c.groups))
for _, g := range c.groups {
// exclude the db-trigger based status groups for now
if g.Type() == GroupTypeManual || g.Type() == GroupTypeSmart {
groups = append(groups, assets.NewGroupReference(g.UUID(), g.Name()))
}
}

// convert our tickets to flow tickets
Expand Down
32 changes: 24 additions & 8 deletions core/models/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/sirupsen/logrus"
)

// GroupID is our type for group ids
type GroupID int

// GroupStatus is the current status of the passed in group
type GroupStatus string

Expand All @@ -24,16 +27,23 @@ const (
GroupStatusReady = GroupStatus("R")
)

// GroupID is our type for group ids
type GroupID int
// GroupType is the the type of a group
type GroupType string

const (
GroupTypeManual = GroupType("M")
GroupTypeSmart = GroupType("Q")
)

// Group is our mailroom type for contact groups
type Group struct {
g struct {
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
Status GroupStatus `json:"status"`
Type GroupType `json:"group_type"`
}
}

Expand All @@ -49,6 +59,12 @@ func (g *Group) Name() string { return g.g.Name }
// Query returns the query string (if any) for this group
func (g *Group) Query() string { return g.g.Query }

// Status returns the status of this group
func (g *Group) Status() GroupStatus { return g.g.Status }

// Type returns the type of this group
func (g *Group) Type() GroupType { return g.g.Type }

// LoadGroups loads the groups for the passed in org
func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, error) {
start := time.Now()
Expand Down Expand Up @@ -77,9 +93,9 @@ func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, e

const selectGroupsSQL = `
SELECT ROW_TO_JSON(r) FROM (
SELECT id, uuid, name, query
SELECT id, uuid, name, query, status, group_type
FROM contacts_contactgroup
WHERE org_id = $1 AND group_type IN ('M', 'Q') AND is_active = TRUE
WHERE org_id = $1 AND is_active = TRUE
ORDER BY name ASC
) r;`

Expand Down
25 changes: 19 additions & 6 deletions core/models/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,36 @@ import (
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/goflow/contactql/es"
"github.com/nyaruka/goflow/flows"

"github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type AssetMapper struct{}

func (m *AssetMapper) Flow(f assets.Flow) int64 {
return int64(f.(*Flow).ID())
}

func (m *AssetMapper) Group(g assets.Group) int64 {
return int64(g.(*flows.Group).Asset().(*Group).ID())
}

var assetMapper = &AssetMapper{}

// BuildElasticQuery turns the passed in contact ql query into an elastic query
func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
func BuildElasticQuery(oa *OrgAssets, group *Group, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
// filter by org and active contacts
eq := elastic.NewBoolQuery().Must(
elastic.NewTermQuery("org_id", oa.OrgID()),
elastic.NewTermQuery("is_active", true),
)

// our group if present
if group != "" {
eq = eq.Must(elastic.NewTermQuery("groups", group))
if group != nil {
eq = eq.Must(elastic.NewTermQuery("group_ids", group.ID()))
}

// our status is present
Expand All @@ -45,15 +58,15 @@ func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStat

// and by our query if present
if query != nil {
q := es.ToElasticQuery(oa.Env(), query)
q := es.ToElasticQuery(oa.Env(), assetMapper, query)
eq = eq.Must(q)
}

return eq
}

// GetContactIDsForQueryPage returns a page of contact ids for the given query and sort
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group assets.GroupUUID, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group *Group, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
env := oa.Env()
start := time.Now()
var parsed *contactql.ContactQuery
Expand Down Expand Up @@ -126,7 +139,7 @@ func GetContactIDsForQuery(ctx context.Context, client *elastic.Client, oa *OrgA
}

routing := strconv.FormatInt(int64(oa.OrgID()), 10)
eq := BuildElasticQuery(oa, "", ContactStatusActive, nil, parsed)
eq := BuildElasticQuery(oa, nil, ContactStatusActive, nil, parsed)
ids := make([]ContactID, 0, 100)

// if limit provided that can be done with regular search, do that
Expand Down
15 changes: 8 additions & 7 deletions core/models/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -32,7 +31,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
require.NoError(t, err)

tcs := []struct {
Group assets.GroupUUID
Group *testdata.Group
ExcludeIDs []models.ContactID
Query string
Sort string
Expand All @@ -43,7 +42,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedError string
}{
{
Group: testdata.AllContactsGroup.UUID,
Group: testdata.ActiveGroup,
Query: "george",
ExpectedESRequest: `{
"_source": false,
Expand All @@ -63,7 +62,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "d1ee73f0-bdb5-47ce-99dd-0c95d4ebf008"
"group_ids": 1
}
},
{
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedTotal: 1,
},
{
Group: testdata.BlockedContactsGroup.UUID,
Group: testdata.BlockedGroup,
ExcludeIDs: []models.ContactID{testdata.Bob.ID, testdata.Cathy.ID},
Query: "age > 32",
Sort: "-age",
Expand All @@ -139,7 +138,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "9295ebab-5c2d-4eb1-86f9-7c15ed2f3219"
"group_ids": 2
}
},
{
Expand Down Expand Up @@ -237,7 +236,9 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
for i, tc := range tcs {
es.NextResponse = tc.MockedESResponse

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, tc.Group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)
group := oa.GroupByID(tc.Group.ID)

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)

if tc.ExpectedError != "" {
assert.EqualError(t, err, tc.ExpectedError)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/goflow v0.154.0
github.com/nyaruka/goflow v0.155.0
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY
github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc=
github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/goflow v0.155.0 h1:et7QeHEFG7WKJrR78Q2/5PkOp+3JuneFui6B7+gFibQ=
github.com/nyaruka/goflow v0.155.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
6 changes: 4 additions & 2 deletions testsuite/testdata/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ var GenderField = &Field{6, "3a5891e4-756e-4dc9-8e12-b7a766168824"}
var AgeField = &Field{7, "903f51da-2717-47c7-a0d3-f2f32877013d"}
var JoinedField = &Field{8, "d83aae24-4bbf-49d0-ab85-6bfd201eac6d"}

var AllContactsGroup = &Group{1, "d1ee73f0-bdb5-47ce-99dd-0c95d4ebf008"}
var BlockedContactsGroup = &Group{2, "9295ebab-5c2d-4eb1-86f9-7c15ed2f3219"}
var ActiveGroup = &Group{1, "b97f69f7-5edf-45c7-9fda-d37066eae91d"}
var BlockedGroup = &Group{2, "14f6ea01-456b-4417-b0b8-35e942f549f1"}
var StoppedGroup = &Group{3, "d1ee73f0-bdb5-47ce-99dd-0c95d4ebf008"}
var ArchivedGroup = &Group{4, "9295ebab-5c2d-4eb1-86f9-7c15ed2f3219"}
var DoctorsGroup = &Group{10000, "c153e265-f7c9-4539-9dbc-9b358714b638"}
var TestersGroup = &Group{10001, "5e9d8fab-5e7e-4f51-b533-261af5dea70d"}

Expand Down
25 changes: 21 additions & 4 deletions web/contact/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ func init() {
//
// {
// "org_id": 1,
// "group_id": 234,
// "group_uuid": "985a83fe-2e9f-478d-a3ec-fa602d5e7ddd",
// "query": "age > 10",
// "sort": "-age"
// }
//
type searchRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
GroupUUID assets.GroupUUID `json:"group_uuid" validate:"required"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
ExcludeIDs []models.ContactID `json:"exclude_ids"`
Query string `json:"query"`
PageSize int `json:"page_size"`
Expand Down Expand Up @@ -78,9 +80,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

// perform our search
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa,
request.GroupUUID, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa, group, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)

if err != nil {
isQueryError, qerr := contactql.IsQueryError(err)
Expand Down Expand Up @@ -117,13 +125,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
// {
// "org_id": 1,
// "query": "age > 10",
// "group_id": 234,
// "group_uuid": "123123-123-123-"
// }
//
type parseRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
Query string `json:"query" validate:"required"`
ParseOnly bool `json:"parse_only"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
}

Expand Down Expand Up @@ -158,6 +168,13 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

env := oa.Env()
var resolver contactql.Resolver
if !request.ParseOnly {
Expand All @@ -179,7 +196,7 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)

var elasticSource interface{}
if !request.ParseOnly {
eq := models.BuildElasticQuery(oa, request.GroupUUID, models.NilContactStatus, nil, parsed)
eq := models.BuildElasticQuery(oa, group, models.NilContactStatus, nil, parsed)
elasticSource, err = eq.Source()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error getting elastic source")
Expand Down
14 changes: 7 additions & 7 deletions web/contact/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,21 @@ func TestSearch(t *testing.T) {
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "birthday = tomorrow", "group_uuid": "%s"}`, testdata.AllContactsGroup.UUID),
body: fmt.Sprintf(`{"org_id": 1, "query": "birthday = tomorrow", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID),
expectedStatus: 400,
expectedError: "can't resolve 'birthday' to attribute, scheme or field",
},
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "age > tomorrow", "group_uuid": "%s"}`, testdata.AllContactsGroup.UUID),
body: fmt.Sprintf(`{"org_id": 1, "query": "age > tomorrow", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID),
expectedStatus: 400,
expectedError: "can't convert 'tomorrow' to a number",
},
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s"}`, testdata.AllContactsGroup.UUID),
body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID),
esResponse: singleESResponse,
expectedStatus: 200,
expectedHits: []models.ContactID{testdata.Cathy.ID},
Expand All @@ -126,7 +126,7 @@ func TestSearch(t *testing.T) {
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s", "exclude_ids": [%d, %d]}`, testdata.AllContactsGroup.UUID, testdata.Bob.ID, testdata.George.ID),
body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s", "exclude_ids": [%d, %d]}`, testdata.ActiveGroup.UUID, testdata.Bob.ID, testdata.George.ID),
esResponse: singleESResponse,
expectedStatus: 200,
expectedHits: []models.ContactID{testdata.Cathy.ID},
Expand All @@ -153,7 +153,7 @@ func TestSearch(t *testing.T) {
},
{
"term": {
"groups": "d1ee73f0-bdb5-47ce-99dd-0c95d4ebf008"
"group_ids": 1
}
},
{
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestSearch(t *testing.T) {
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "AGE = 10 and gender = M", "group_uuid": "%s"}`, testdata.AllContactsGroup.UUID),
body: fmt.Sprintf(`{"org_id": 1, "query": "AGE = 10 and gender = M", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID),
esResponse: singleESResponse,
expectedStatus: 200,
expectedHits: []models.ContactID{testdata.Cathy.ID},
Expand All @@ -204,7 +204,7 @@ func TestSearch(t *testing.T) {
{
method: "POST",
url: "/mr/contact/search",
body: fmt.Sprintf(`{"org_id": 1, "query": "", "group_uuid": "%s"}`, testdata.AllContactsGroup.UUID),
body: fmt.Sprintf(`{"org_id": 1, "query": "", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID),
esResponse: singleESResponse,
expectedStatus: 200,
expectedHits: []models.ContactID{testdata.Cathy.ID},
Expand Down
Loading

0 comments on commit 7823156

Please sign in to comment.