Skip to content

Commit

Permalink
Move etcd RouteLoader into separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
spy16 committed Feb 1, 2019
1 parent 079db27 commit 875c0a4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 86 deletions.
24 changes: 24 additions & 0 deletions internal/etcd/aclkey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package etcd

import (
"fmt"

"github.com/gojektech/weaver/internal/server"
)

const (
// ACLKeyFormat - Format for a ACL's key in a KV Store
ACLKeyFormat = "/%s/acls/%s/acl"
)

// ACLKey - Points to a stored ACL
type ACLKey string

// GenACLKey - Generate an ACL Key given etcd's node key
func GenACLKey(key string) ACLKey {
return ACLKey(fmt.Sprintf("%s/acl", key))
}

func GenKey(acl *server.ACL, pfx string) ACLKey {
return ACLKey(fmt.Sprintf(ACLKeyFormat, pfx, acl.ID))
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package etcd

import (
"context"
Expand All @@ -8,25 +8,20 @@ import (

etcd "github.com/coreos/etcd/client"
"github.com/gojektech/weaver/internal/config"
"github.com/gojektech/weaver/internal/server"
"github.com/gojektech/weaver/pkg/logger"
"github.com/pkg/errors"
)

const (
// ACLKeyFormat - Format for a ACL's key in a KV Store
ACLKeyFormat = "/%s/acls/%s/acl"
)

// ACLKey - Points to a stored ACL
type ACLKey string

// GenACLKey - Generate an ACL Key given etcd's node key
func GenACLKey(key string) ACLKey {
return ACLKey(fmt.Sprintf("%s/acl", key))
}

func GenKey(acl *ACL, pfx string) ACLKey {
return ACLKey(fmt.Sprintf(ACLKeyFormat, pfx, acl.ID))
func NewRouteLoader() (*ETCDRouteLoader, error) {
etcdClient, err := config.NewETCDClient()
if err != nil {
return nil, err
}
return &ETCDRouteLoader{
etcdClient: etcdClient,
namespace: config.ETCDKeyPrefix(),
}, nil
}

// ETCDRouteLoader - To store and modify proxy configuration
Expand All @@ -36,7 +31,7 @@ type ETCDRouteLoader struct {
}

// PutACL - Upserts a given ACL
func (routeLoader *ETCDRouteLoader) PutACL(acl *ACL) (ACLKey, error) {
func (routeLoader *ETCDRouteLoader) PutACL(acl *server.ACL) (ACLKey, error) {
key := GenKey(acl, routeLoader.namespace)
val, err := json.Marshal(acl)
if err != nil {
Expand All @@ -50,17 +45,17 @@ func (routeLoader *ETCDRouteLoader) PutACL(acl *ACL) (ACLKey, error) {
}

// GetACL - Fetches an ACL given an ACLKey
func (routeLoader *ETCDRouteLoader) GetACL(key ACLKey) (*ACL, error) {
func (routeLoader *ETCDRouteLoader) GetACL(key ACLKey) (*server.ACL, error) {
res, err := etcd.NewKeysAPI(routeLoader.etcdClient).Get(context.Background(), string(key), nil)
if err != nil {
return nil, fmt.Errorf("fail to GET %s with %s", key, err.Error())
}
acl := &ACL{}
acl := &server.ACL{}
if err := json.Unmarshal([]byte(res.Node.Value), acl); err != nil {
return nil, err
}

acl.Endpoint, err = NewEndpoint(acl.EndpointConfig)
acl.Endpoint, err = server.NewEndpoint(acl.EndpointConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to create a new Endpoint for key: %s", key)
}
Expand All @@ -77,26 +72,7 @@ func (routeLoader *ETCDRouteLoader) DelACL(key ACLKey) error {
return nil
}

// NewETCDRouteLoader - Creates a new ETCDRouteLoader (routeLoader)
func NewETCDRouteLoader() (*ETCDRouteLoader, error) {
etcdClient, err := config.NewETCDClient()
if err != nil {
return nil, err
}
return &ETCDRouteLoader{
etcdClient: etcdClient,
namespace: config.ETCDKeyPrefix(),
}, nil
}

func initEtcd(routeLoader *ETCDRouteLoader) (etcd.KeysAPI, string) {
key := fmt.Sprintf("/%s/acls/", routeLoader.namespace)
etc := etcd.NewKeysAPI(routeLoader.etcdClient)

return etc, key
}

func (routeLoader *ETCDRouteLoader) WatchRoutes(ctx context.Context, upsertRouteFunc UpsertRouteFunc, deleteRouteFunc DeleteRouteFunc) {
func (routeLoader *ETCDRouteLoader) WatchRoutes(ctx context.Context, upsertRouteFunc server.UpsertRouteFunc, deleteRouteFunc server.DeleteRouteFunc) {
etc, key := initEtcd(routeLoader)
watcher := etc.Watcher(key, &etcd.WatcherOptions{Recursive: true})

Expand Down Expand Up @@ -127,7 +103,7 @@ func (routeLoader *ETCDRouteLoader) WatchRoutes(ctx context.Context, upsertRoute
continue
}
case "delete":
acl := &ACL{}
acl := &server.ACL{}
err := acl.GenACL(res.PrevNode.Value)
if err != nil {
logger.Errorf("error in unmarshalling %s: %v", res.PrevNode.Value, err)
Expand All @@ -144,7 +120,7 @@ func (routeLoader *ETCDRouteLoader) WatchRoutes(ctx context.Context, upsertRoute
}
}

func (routeLoader *ETCDRouteLoader) BootstrapRoutes(ctx context.Context, upsertRouteFunc UpsertRouteFunc) error {
func (routeLoader *ETCDRouteLoader) BootstrapRoutes(ctx context.Context, upsertRouteFunc server.UpsertRouteFunc) error {
// TODO: Consider error scenarios and return an error when it makes sense
etc, key := initEtcd(routeLoader)
logger.Infof("bootstrapping router using etcd on %s", key)
Expand Down Expand Up @@ -177,3 +153,10 @@ func (routeLoader *ETCDRouteLoader) BootstrapRoutes(ctx context.Context, upsertR

return nil
}

func initEtcd(routeLoader *ETCDRouteLoader) (etcd.KeysAPI, string) {
key := fmt.Sprintf("/%s/acls/", routeLoader.namespace)
etc := etcd.NewKeysAPI(routeLoader.etcdClient)

return etc, key
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package server
package etcd

import (
"context"
"encoding/json"
"errors"
"github.com/gojektech/weaver/internal/server"
"reflect"
"testing"
"time"
Expand All @@ -29,7 +30,7 @@ func (es *ETCDRouteLoaderSuite) SetupTest() {

var err error

es.ng, err = NewETCDRouteLoader()
es.ng, err = NewRouteLoader()
assert.NoError(es.T(), err)
}

Expand All @@ -42,10 +43,10 @@ func TestETCDRouteLoaderSuite(tst *testing.T) {
}

func (es *ETCDRouteLoaderSuite) TestPutACL() {
aclPut := &ACL{
aclPut := &server.ACL{
ID: "svc-01",
Criterion: "Method(`GET`) && Path(`/ping`)",
EndpointConfig: &EndpointConfig{
EndpointConfig: &server.EndpointConfig{
ShardFunc: "lookup",
Matcher: "path",
ShardExpr: "*",
Expand All @@ -72,10 +73,10 @@ func (es *ETCDRouteLoaderSuite) TestPutACL() {
}

func (es *ETCDRouteLoaderSuite) TestBootstrapRoutes() {
aclPut := &ACL{
aclPut := &server.ACL{
ID: "svc-01",
Criterion: "Method(`GET`) && Path(`/ping`)",
EndpointConfig: &EndpointConfig{
EndpointConfig: &server.EndpointConfig{
ShardFunc: "lookup",
Matcher: "path",
ShardExpr: "*",
Expand All @@ -85,18 +86,18 @@ func (es *ETCDRouteLoaderSuite) TestBootstrapRoutes() {
key, err := es.ng.PutACL(aclPut)
assert.NoError(es.T(), err, "failed to PUT %s", aclPut)

aclsChan := make(chan *ACL, 1)
aclsChan := make(chan *server.ACL, 1)
es.ng.BootstrapRoutes(context.Background(), genRouteProcessorMock(aclsChan))

deepEqual(es.T(), aclPut, <-aclsChan)
assert.Nil(es.T(), es.ng.DelACL(key), "fail to DELETE %+v", aclPut)
}

func (es *ETCDRouteLoaderSuite) TestBootstrapRoutesSucceedWhenARouteUpsertFails() {
aclPut := &ACL{
aclPut := &server.ACL{
ID: "svc-01",
Criterion: "Method(`GET`) && Path(`/ping`)",
EndpointConfig: &EndpointConfig{
EndpointConfig: &server.EndpointConfig{
ShardFunc: "lookup",
Matcher: "path",
ShardExpr: "*",
Expand Down Expand Up @@ -142,7 +143,7 @@ func (es *ETCDRouteLoaderSuite) TestBootstrapRoutesSucceedWhenARouteHasInvalidDa
func (es *ETCDRouteLoaderSuite) TestWatchRoutesUpsertRoutesWhenRoutesSet() {
newACL := newTestACL("path")

aclsUpserted := make(chan *ACL, 1)
aclsUpserted := make(chan *server.ACL, 1)

watchCtx, cancelWatch := context.WithCancel(context.Background())
defer cancelWatch()
Expand All @@ -162,7 +163,7 @@ func (es *ETCDRouteLoaderSuite) TestWatchRoutesUpsertRoutesWhenRoutesUpdated() {
updatedACL := newTestACL("header")

_, err := es.ng.PutACL(newACL)
aclsUpserted := make(chan *ACL, 1)
aclsUpserted := make(chan *server.ACL, 1)
watchCtx, cancelWatch := context.WithCancel(context.Background())
defer cancelWatch()

Expand All @@ -182,7 +183,7 @@ func (es *ETCDRouteLoaderSuite) TestWatchRoutesDeleteRouteWhenARouteIsDeleted()
key, err := es.ng.PutACL(newACL)
require.NoError(es.T(), err, "fail to PUT ACL %+v", newACL)

aclsDeleted := make(chan *ACL, 1)
aclsDeleted := make(chan *server.ACL, 1)

watchCtx, cancelWatch := context.WithCancel(context.Background())
defer cancelWatch()
Expand All @@ -196,11 +197,11 @@ func (es *ETCDRouteLoaderSuite) TestWatchRoutesDeleteRouteWhenARouteIsDeleted()
deepEqual(es.T(), newACL, <-aclsDeleted)
}

func newTestACL(matcher string) *ACL {
return &ACL{
func newTestACL(matcher string) *server.ACL {
return &server.ACL{
ID: "svc-01",
Criterion: "Method(`GET`) && Path(`/ping`)",
EndpointConfig: &EndpointConfig{
EndpointConfig: &server.EndpointConfig{
ShardFunc: "lookup",
Matcher: matcher,
ShardExpr: "*",
Expand All @@ -219,14 +220,14 @@ func newTestACL(matcher string) *ACL {
}
}

func genRouteProcessorMock(c chan *ACL) func(*ACL) error {
return func(acl *ACL) error {
func genRouteProcessorMock(c chan *server.ACL) func(*server.ACL) error {
return func(acl *server.ACL) error {
c <- acl
return nil
}
}

func deepEqual(t *testing.T, expected *ACL, actual *ACL) {
func deepEqual(t *testing.T, expected *server.ACL, actual *server.ACL) {
assert.Equal(t, expected.ID, actual.ID)
assert.Equal(t, expected.Criterion, actual.Criterion)
assertEqualJSON(t, expected.EndpointConfig.ShardConfig, actual.EndpointConfig.ShardConfig)
Expand All @@ -247,10 +248,10 @@ func assertEqualJSON(t *testing.T, json1, json2 json.RawMessage) {
assert.True(t, reflect.DeepEqual(jsonVal1, jsonVal2))
}

func failingUpsertRouteFunc(acl *ACL) error {
func failingUpsertRouteFunc(acl *server.ACL) error {
return errors.New("error")
}

func successUpsertRouteFunc(acl *ACL) error {
func successUpsertRouteFunc(acl *server.ACL) error {
return nil
}
21 changes: 5 additions & 16 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,23 @@ import (
var server *Weaver

type Weaver struct {
httpServer *http.Server
cancelRouteSync context.CancelFunc
httpServer *http.Server
}

func ShutdownServer(ctx context.Context) {
server.cancelRouteSync()
server.httpServer.Shutdown(ctx)
}

func StartServer() {
routeSyncCtx, cancelRouteSync := context.WithCancel(context.Background())
routeLoader, err := NewETCDRouteLoader()
if err != nil {
log.Printf("StartServer: failed to initialise etcd route loader: %s", err)
cancelRouteSync()
}

func StartServer(ctx context.Context, routeLoader RouteLoader) {
proxyRouter := NewRouter(routeLoader)
err = proxyRouter.BootstrapRoutes(context.Background())
err := proxyRouter.BootstrapRoutes(context.Background())
if err != nil {
log.Printf("StartServer: failed to initialise proxy router: %s", err)
cancelRouteSync()
}

log.Printf("StartServer: bootstraped routes from etcd")

go proxyRouter.WatchRouteUpdates(routeSyncCtx)
go proxyRouter.WatchRouteUpdates(ctx)

proxy := middleware.Recover(wrapNewRelicHandler(&proxy{
router: proxyRouter,
Expand All @@ -56,8 +46,7 @@ func StartServer() {
httpServer.SetKeepAlivesEnabled(keepAliveEnabled)

server = &Weaver{
httpServer: httpServer,
cancelRouteSync: cancelRouteSync,
httpServer: httpServer,
}

log.Printf("StartServer: starting weaver on %s", server.httpServer.Addr)
Expand Down
21 changes: 13 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

raven "github.com/getsentry/raven-go"
"github.com/gojektech/weaver/internal/config"
"github.com/gojektech/weaver/internal/etcd"
"github.com/gojektech/weaver/internal/server"
"github.com/gojektech/weaver/pkg/instrumentation"
"github.com/gojektech/weaver/pkg/logger"
cli "gopkg.in/urfave/cli.v1"
"log"
"os"
"os/signal"
"syscall"
"time"
)

func main() {
Expand Down Expand Up @@ -51,12 +51,17 @@ func startWeaver(_ *cli.Context) error {
instrumentation.InitNewRelic()
defer instrumentation.ShutdownNewRelic()

go server.StartServer()
routeLoader, err := etcd.NewRouteLoader()
if err != nil {
log.Printf("StartServer: failed to initialise etcd route loader: %s", err)
}

ctx, cancel := context.WithTimeout(context.Background(), (1 * time.Second))
go server.StartServer(ctx, routeLoader)

sig := <-sigC
log.Printf("Received %d, shutting down", sig)

ctx, cancel := context.WithTimeout(context.Background(), (1 * time.Second))
defer cancel()
server.ShutdownServer(ctx)

Expand Down

0 comments on commit 875c0a4

Please sign in to comment.