Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser committed Mar 14, 2017
2 parents adcc68e + f9200c4 commit a6a96eb
Show file tree
Hide file tree
Showing 28 changed files with 550 additions and 377 deletions.
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
language: go

go_import_path: github.com/TheThingsNetwork/ttn

sudo: required

services:
- docker

go:
- 1.7
- 1.8

cache:
directories:
- vendor

install:
- make deps
Expand Down
49 changes: 49 additions & 0 deletions api/auth/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package auth

import (
"github.com/TheThingsNetwork/ttn/api"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

const tokenKey = "token"

// TokenCredentials RPC Credentials
type TokenCredentials struct {
token string
tokenFunc func(id string) string
}

// WithStaticToken injects a static token on each request
func WithStaticToken(token string) *TokenCredentials {
return &TokenCredentials{token: token}
}

// WithTokenFunc returns TokenCredentials that execute the tokenFunc on each request
func WithTokenFunc(tokenFunc func(id string) string) *TokenCredentials {
return &TokenCredentials{tokenFunc: tokenFunc}
}

// RequireTransportSecurity implements credentials.PerRPCCredentials
func (c *TokenCredentials) RequireTransportSecurity() bool { return true }

// GetRequestMetadata implements credentials.PerRPCCredentials
func (c *TokenCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
token, _ := api.TokenFromContext(ctx)
if token != "" {
return map[string]string{tokenKey: token}, nil
}
if c.tokenFunc != nil {
id, _ := api.IDFromContext(ctx)
return map[string]string{tokenKey: c.tokenFunc(id)}, nil
}
if c.token != "" {
return map[string]string{tokenKey: c.token}, nil
}
return map[string]string{tokenKey: ""}, nil
}

// DialOption returns a DialOption for the TokenCredentials
func (c *TokenCredentials) DialOption() grpc.DialOption {
return grpc.WithPerRPCCredentials(c)
}
48 changes: 36 additions & 12 deletions api/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,30 +437,50 @@ func (c *Client) NewBrokerStreams(id string, token string) GenericStream {
}
}

chUplink := make(chan *broker.DeduplicatedUplinkMessage, c.config.BufferSize)
chDownlink := make(chan *broker.DownlinkMessage, c.config.BufferSize)

defer func() {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.uplink, server.name)
delete(s.downlink, server.name)
close(chUplink)
close(chDownlink)
}()

// Uplink stream
uplink, err := cli.BrokerUplink(ctx)
if err != nil {
log.WithError(err).Warn("Could not set up BrokerUplink stream")
} else {
go monitor("BrokerUplink", uplink)
s.mu.Lock()
s.uplink[server.name] = chUplink
s.mu.Unlock()
go func() {
monitor("BrokerUplink", uplink)
s.mu.Lock()
defer s.mu.Unlock()
delete(s.uplink, server.name)
}()
}

// Downlink stream
downlink, err := cli.BrokerDownlink(ctx)
if err != nil {
log.WithError(err).Warn("Could not set up BrokerDownlink stream")
} else {
go monitor("BrokerDownlink", downlink)
s.mu.Lock()
s.downlink[server.name] = chDownlink
s.mu.Unlock()
go func() {
monitor("BrokerDownlink", downlink)
s.mu.Lock()
defer s.mu.Unlock()
delete(s.downlink, server.name)
}()
}

chUplink := make(chan *broker.DeduplicatedUplinkMessage, c.config.BufferSize)
chDownlink := make(chan *broker.DownlinkMessage, c.config.BufferSize)

s.mu.Lock()
s.uplink[server.name] = chUplink
s.downlink[server.name] = chDownlink
s.mu.Unlock()

log.Debug("Start handling Broker streams")
defer log.Debug("Done handling Broker streams")
for {
Expand All @@ -470,12 +490,16 @@ func (c *Client) NewBrokerStreams(id string, token string) GenericStream {
case msg := <-chUplink:
if err := uplink.Send(msg); err != nil {
log.WithError(err).Warn("Could not send UplinkMessage to monitor")
return
if err == restartstream.ErrStreamClosed {
return
}
}
case msg := <-chDownlink:
if err := downlink.Send(msg); err != nil {
log.WithError(err).Warn("Could not send DownlinkMessage to monitor")
return
if err == restartstream.ErrStreamClosed {
return
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions api/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestMonitor(t *testing.T) {

cli.AddServer("tls-without-tls", lis.Addr().String())

testLogger.Print(t)

cli.AddServer("test", lis.Addr().String())
time.Sleep(waitTime)
defer func() {
Expand All @@ -48,6 +50,8 @@ func TestMonitor(t *testing.T) {
s.Stop()
}()

testLogger.Print(t)

gtw := cli.NewGatewayStreams("test", "token")
time.Sleep(waitTime)
for i := 0; i < 20; i++ {
Expand All @@ -64,6 +68,8 @@ func TestMonitor(t *testing.T) {
a.So(server.metrics.downlinkMessages, ShouldEqual, 20)
a.So(server.metrics.gatewayStatuses, ShouldEqual, 20)

testLogger.Print(t)

brk := cli.NewBrokerStreams("test", "token")
time.Sleep(waitTime)
brk.Send(&broker.DeduplicatedUplinkMessage{})
Expand All @@ -75,6 +81,8 @@ func TestMonitor(t *testing.T) {
a.So(server.metrics.brokerUplinkMessages, ShouldEqual, 1)
a.So(server.metrics.brokerDownlinkMessages, ShouldEqual, 1)

testLogger.Print(t)

cli.AddConn("test2", cli.serverConns[1].conn)

brk = cli.NewBrokerStreams("test", "token")
Expand Down
Loading

0 comments on commit a6a96eb

Please sign in to comment.