Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 10, 2017
0 parents commit d946e3a
Show file tree
Hide file tree
Showing 436 changed files with 122,153 additions and 0 deletions.
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
*~

# Folders
_obj
_test
.vscode

# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out

*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*

_testmain.go

*.exe
*.test
*.prof
15 changes: 15 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
language: go
go:
- 1.8
services:
- redis-server
addons:
postgresql: '9.3'
before_script:
- psql -U postgres -c "CREATE USER courier WITH PASSWORD 'courier';"
- psql -U postgres -c "ALTER ROLE courier WITH SUPERUSER;"
- psql -U postgres -c "CREATE DATABASE courier_test;"
- psql -U postgres -d courier_test -f schema.sql
script:
- go test $(go list ./... | grep -v /vendor/) -cover
- go test $(go list ./... | grep -v /vendor/) -cover -bench=. -benchmem
661 changes: 661 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Courier

This is a private repo so need to configure git to use token authentication for HTTPS requests.
Create a github auth token and set it as an environment variable called `GITHUB_TOKEN`. Then run:

```
git config --global url."https://${GITHUB_TOKEN}:[email protected]/".insteadOf "https://github.com/"
```

Install Courier in your workspace with:

```
go get github.com/nyaruka/courier
```

Build Courier with:

```
go install github.com/nyaruka/courier/cmd/...
```

This will create a new executable in $GOPATH/bin called `courier`.

To run the tests you need to create the test database:

```
$ createdb courier_test
$ createuser -P -E courier
$ psql -d courier_test -f schema.sql
$ psql -d courier_test -c "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO courier;"
$ psql -d courier_test -c "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO courier;"
```

To run all of the tests including benchmarks:

```
go test $(go list ./... | grep -v /vendor/) -cover -bench=.
```
202 changes: 202 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package courier

import (
"database/sql"
"encoding/json"
"errors"
"log"
"strings"
"sync"
"time"

uuid "github.com/satori/go.uuid"
)

const (
ConfigAuthToken = "auth_token"
)

type ChannelType string

type ChannelUUID struct {
uuid.UUID
}

var NilChannelUUID = ChannelUUID{uuid.Nil}

func NewChannelUUID(u string) (ChannelUUID, error) {
channelUUID, err := uuid.FromString(strings.ToLower(u))
if err != nil {
return NilChannelUUID, err
}
return ChannelUUID{channelUUID}, nil
}

type Channel interface {
UUID() ChannelUUID
ChannelType() ChannelType
Address() string
Country() string
GetConfig(string) string
}

// ChannelFromUUID will look up the channel with the passed in UUID and channel type.
// It will return an error if the channel does not exist or is not active.
//
// This will use a 3 tier caching strategy:
// 1) Process level cache, we will first check a local cache, which is expired
// every 5 seconds
// 2) Redis level cache, we will consult Redis for the latest Channel definition, caching
// it locally if found
// 3) Postgres Lookup, we will lookup the value in our database, caching the result
// both locally and in Redis
func ChannelFromUUID(s *server, channelType ChannelType, uuidStr string) (Channel, error) {
channelUUID, err := NewChannelUUID(uuidStr)
if err != nil {
return nil, err
}

// look for the channel locally
channel, localErr := getLocalChannel(channelType, channelUUID)

// found it? return it
if localErr == nil {
return channel, nil
}

// look in our database instead
dbErr := loadChannelFromDB(s, channel, channelType, channelUUID)

// if it wasn't found in the DB, clear our cache and return that it wasn't found
if dbErr == ErrChannelNotFound {
clearLocalChannel(channelUUID)
return channel, dbErr
}

// if we had some other db error, return it if our cached channel was only just expired
if dbErr != nil && localErr == ErrChannelExpired {
return channel, nil
}

// no cached channel, oh well, we fail
if dbErr != nil {
return nil, dbErr
}

// we found it in the db, cache it locally
cacheLocalChannel(channel)
return channel, nil
}

const lookupChannelFromUUIDSQL = `SELECT uuid, channel_type, address, country, config
FROM channels_channel
WHERE channel_type = $1 AND uuid = $2 AND is_active = true`

// ChannelForUUID attempts to look up the channel with the passed in UUID, returning it
func loadChannelFromDB(s *server, channel *channel, channelType ChannelType, uuid ChannelUUID) error {
// select just the fields we need
err := s.db.Get(channel, lookupChannelFromUUIDSQL, channelType, uuid)

// parse our config
channel.parseConfig()

// we didn't find a match
if err == sql.ErrNoRows {
return ErrChannelNotFound
}

// other error
if err != nil {
return err
}

// found it, return it
return nil
}

var cacheMutex sync.RWMutex
var channelCache = make(map[ChannelUUID]*channel)

var ErrChannelExpired = errors.New("channel expired")
var ErrChannelNotFound = errors.New("channel not found")
var ErrChannelWrongType = errors.New("channel type wrong")

// getLocalChannel returns a Channel object for the passed in type and UUID.
func getLocalChannel(channelType ChannelType, uuid ChannelUUID) (*channel, error) {
// first see if the channel exists in our local cache
cacheMutex.RLock()
channel, found := channelCache[uuid]
cacheMutex.RUnlock()

if found {
// if it was found but the type is wrong, that's an error
if channel.ChannelType() != channelType {
return newChannel(channelType, uuid), ErrChannelWrongType
}

// if we've expired, clear our cache and return it
if channel.expiration.Before(time.Now()) {
return channel, ErrChannelExpired
}

return channel, nil
}

return newChannel(channelType, uuid), ErrChannelNotFound
}

func cacheLocalChannel(channel *channel) {
// set our expiration
channel.expiration = time.Now().Add(localTTL * time.Second)

// first write to our local cache
cacheMutex.Lock()
channelCache[channel.UUID()] = channel
cacheMutex.Unlock()
}

func clearLocalChannel(uuid ChannelUUID) {
cacheMutex.Lock()
delete(channelCache, uuid)
cacheMutex.Unlock()
}

const redisTTL = 3600 * 24
const localTTL = 60

//-----------------------------------------------------------------------------
// Channel implementation
//-----------------------------------------------------------------------------

type channel struct {
UUID_ ChannelUUID `db:"uuid" json:"uuid"`
ChannelType_ ChannelType `db:"channel_type" json:"channel_type"`
Address_ string `db:"address" json:"address"`
Country_ string `db:"country" json:"country"`
Config_ string `db:"config" json:"config"`

expiration time.Time
config map[string]string
}

func (c *channel) UUID() ChannelUUID { return c.UUID_ }
func (c *channel) ChannelType() ChannelType { return c.ChannelType_ }
func (c *channel) Address() string { return c.Address_ }
func (c *channel) Country() string { return c.Country_ }
func (c *channel) GetConfig(key string) string { return c.config[key] }

func (c *channel) parseConfig() {
c.config = make(map[string]string)

if c.Config_ != "" {
err := json.Unmarshal([]byte(c.Config_), &c.config)
if err != nil {
log.Printf("ERROR parsing channel config '%s': %s", c.Config_, err)
}
}
}

func newChannel(channelType ChannelType, uuid ChannelUUID) *channel {
config := make(map[string]string)
return &channel{ChannelType_: channelType, UUID_: uuid, config: config}
}
42 changes: 42 additions & 0 deletions cmd/courier/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"log"
"os"
"os/signal"
"syscall"

"github.com/koding/multiconfig"
_ "github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"

// load channel handler packages
_ "github.com/nyaruka/courier/handlers/africastalking"
_ "github.com/nyaruka/courier/handlers/blackmyna"
_ "github.com/nyaruka/courier/handlers/kannel"
_ "github.com/nyaruka/courier/handlers/telegram"
_ "github.com/nyaruka/courier/handlers/twilio"
)

func main() {
m := multiconfig.NewWithPath("courier.toml")
config := &config.Courier{}

err := m.Load(config)
if err != nil {
log.Fatalf("Error loading configuration: %s", err)
}

server := courier.NewServer(config)
err = server.Start()
if err != nil {
log.Fatalf("Error starting server: %s", err)
}

ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println(<-ch)

server.Stop()
}
20 changes: 20 additions & 0 deletions config/courier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

// Courier is our top level configuration object
type Courier struct {
Base_URL string `default:"https://localhost:8080"`
Port int `default:"8080"`
DB string `default:"postgres://courier@localhost/courier?sslmode=disable"`
Redis string `default:"redis://localhost:6379/0"`
Spool_Dir string `default:"/var/spool/courier"`

S3_Region string `default:"us-east-1"`
S3_Media_Bucket string `default:"courier-media"`
S3_Media_Prefix string `default:"/media/"`

AWS_Access_Key_ID string `default:"missing_aws_access_key_id"`
AWS_Secret_Access_Key string `default:"missing_aws_secret_access_key"`

Include_Channels []string
Exclude_Channels []string
}
37 changes: 37 additions & 0 deletions courier.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#-------------------------------------------------------------------------------------
# Courier settings can be set in four different wants, in order of preference:
#
# 1) Courier defaults
# 2) Config file options as defined below
# 3) Env variable, which are prefixed with COURIER_, ex: export COURIER_PORT=8000
# 4) Command line options, ex: courier -port 8000
#
#-------------------------------------------------------------------------------------

# The externally accessible base URL of the server
base_url = "https://localhost:8080"

# What port courier will run on
port = 8080

# Our database connection string, right now only Postgres is supported
db = "postgres://courier@localhost/courier?sslmode=disable"

# Our redis connection string, path is our database
redis = "redis://localhost:6379/0"

# Our spool directory for storing messages when Redis is down, this must exist and be writeable
spool_dir = "/var/spool/courier"

# Our AWS access credentials
aws_access_key_id = "missing_aws_access_key_id"
aws_secret_access_key = "missing_aws_secret_access_key"

# The region our AWS bucket is in
s3_region = "us-east-2"

# The S3 bucket we will write our media files to
s3_media_bucket = "courier-test"

# prefix to our filenames for media (files will be named after the msg uuid)
s3_media_prefix = "media"
Loading

0 comments on commit d946e3a

Please sign in to comment.