Skip to content

Commit

Permalink
Store controller instead of di graph in module
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr Samylkin committed Feb 21, 2017
1 parent a402a70 commit 3f344aa
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 50 deletions.
10 changes: 1 addition & 9 deletions modules/rpc/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
package rpc

import (
"context"
"sync"

"go.uber.org/fx/modules"
"go.uber.org/fx/service"
"go.uber.org/fx/ulog"

"github.com/pkg/errors"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
)

Expand Down Expand Up @@ -67,13 +64,8 @@ func newYARPCThriftModule(
reg := func(mod *YARPCModule) {
_setupMu.Lock()
defer _setupMu.Unlock()
var dispatcher *yarpc.Dispatcher
if err := mod.di.Resolve(&dispatcher); err != nil {
ulog.Logger(context.Background()).Error("can't resolve dispatcher", "error", err)
return
}

dispatcher.Register(registrants)
mod.controller.dispatcher.Register(registrants)
}

return newYARPCModule(mi, reg, options...)
Expand Down
40 changes: 38 additions & 2 deletions modules/rpc/thrift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,43 @@ modules:
// Dispatcher must be resolved in the default graph
var dispatcher *yarpc.Dispatcher
assert.NoError(t, dig.Resolve(&dispatcher))
assert.NotEmpty(t, dispatcher)
assert.Equal(t, 2, len(dispatcher.Inbounds()))
}

func TestThriftModuleSeparateGraph_OK(t *testing.T) {
t.Parallel()
di := dig.New()
snowflake := ThriftModule(okCreate, modules.WithRoles("rescue"), withGraph(di))

cfg := []byte(`
modules:
rpc:
inbounds:
- tchannel:
port: 0
- http:
port: 0
`)

mci := service.ModuleCreateInfo{
Name: "RPC",
Host: testHost{
Host: service.NopHost(),
config: config.NewYAMLProviderFromBytes(cfg),
},
Items: make(map[string]interface{}),
}

special, err := snowflake(mci)
require.NoError(t, err)
assert.NotEmpty(t, special)

testInitRunModule(t, special[0], mci)

// Dispatcher must be resolved in the default graph
var dispatcher *yarpc.Dispatcher
assert.NoError(t, di.Resolve(&dispatcher))
assert.Equal(t, 2, len(dispatcher.Inbounds()))
}

func TestThriftModule_BadOptions(t *testing.T) {
Expand Down Expand Up @@ -113,7 +148,8 @@ func testInitRunModule(t *testing.T, mod service.Module, mci service.ModuleCreat

func mch() service.ModuleCreateInfo {
return service.ModuleCreateInfo{
Host: service.NopHost(),
Host: service.NopHost(),
Items: make(map[string]interface{}),
}
}

Expand Down
84 changes: 45 additions & 39 deletions modules/rpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"go.uber.org/fx/dig"
"go.uber.org/fx/modules"
"go.uber.org/fx/modules/rpc/internal/stats"
"go.uber.org/fx/service"
Expand All @@ -43,14 +43,17 @@ import (
// YARPCModule is an implementation of a core RPC module using YARPC.
// All the YARPC modules share the same dispatcher and middleware.
// Dispatcher will start when any created module calls Start().
// The YARPC team advised dispatcher to be a 'singleton' to control
// the lifecycle of all of the in/out bound traffic, so we will
// register it in a dig.Graph provided with options/default graph.
type YARPCModule struct {
modules.ModuleBase
register registerServiceFunc
config yarpcConfig
log ulog.Log
stateMu sync.RWMutex
isRunning bool
di dig.Graph
register registerServiceFunc
config yarpcConfig
log ulog.Log
stateMu sync.RWMutex
isRunning bool
controller *dispatcherController
}

var (
Expand Down Expand Up @@ -82,12 +85,29 @@ type yarpcConfig struct {
}

// Inbound is a union that configures how to configure a single inbound.
// TODO(alsam) write a formatter to print ports instead of addresses.
type Inbound struct {
TChannel *Address
HTTP *Address
}

func (i *Inbound) String() string {
if i == nil {
return ""
}

http := "none"
if i.HTTP != nil {
http = strconv.Itoa(i.HTTP.Port)
}

tchannel := "none"
if i.TChannel != nil {
tchannel = strconv.Itoa(i.TChannel.Port)
}

return fmt.Sprintf("Inbound:{HTTP: %s; TChannel: %s}", http, tchannel)
}

// Address is a struct that have a required port for tchannel/http transports.
// TODO(alsam) make it optional
type Address struct {
Expand All @@ -97,8 +117,6 @@ type Address struct {
// Stores a collection of all modules configs with a shared dispatcher
// that are safe to call from multiple go routines. All the configs must
// share the same AdvertiseName and represent a single service.
// The YARPC team advised it to be a 'singleton' to control
// the lifecycle of all of the in/out bound traffic.
type dispatcherController struct {
// sync configs
sync.RWMutex
Expand All @@ -110,7 +128,7 @@ type dispatcherController struct {
startError error

configs []*yarpcConfig
dispatcher *yarpc.Dispatcher
dispatcher yarpc.Dispatcher
}

// Adds the config to the controller
Expand Down Expand Up @@ -154,12 +172,14 @@ func (c *dispatcherController) Start(host service.Host) error {

_dispatcherMu.Lock()
defer _dispatcherMu.Unlock()
if c.dispatcher, err = _dispatcherFn(host, cfg); err != nil {
var d *yarpc.Dispatcher
if d, err = _dispatcherFn(host, cfg); err != nil {
c.startError = err
return
}

c.startError = _starterFn(c.dispatcher)
c.dispatcher = *d
c.startError = _starterFn(&c.dispatcher)
})

return c.startError
Expand Down Expand Up @@ -248,21 +268,25 @@ func newYARPCModule(
module.config.inboundMiddleware = inboundMiddlewareFromCreateInfo(mi)
module.config.onewayInboundMiddleware = onewayInboundMiddlewareFromCreateInfo(mi)

module.di = graphFromCreateInfo(mi)
var controller *dispatcherController
di := graphFromCreateInfo(mi)

// Try to resolve a controller first
// TODO(alsam) use dig options when available.
if err := module.di.Resolve(&controller); err != nil {
if err := di.Resolve(&module.controller); err != nil {

// Try to register it then
controller = &dispatcherController{}
if errCr := module.di.Register(controller); errCr != nil {
module.controller = &dispatcherController{}
if errCr := di.Register(module.controller); errCr != nil {
return nil, errs.Wrap(errCr, "can't register a dispatcher controller")
}

// Register dispatcher
if err := di.Register(&module.controller.dispatcher); err != nil {
return nil, errs.Wrap(err, "unable to register the dispatcher")
}
}

controller.addConfig(module.config)
module.controller.addConfig(module.config)

module.log.Info("Module successfuly created", "inbounds", module.config.Inbounds)

Expand Down Expand Up @@ -306,25 +330,12 @@ func (m *YARPCModule) Start(readyCh chan<- struct{}) <-chan error {
m.stateMu.Lock()
defer m.stateMu.Unlock()

// Resolve the controller
var controller *dispatcherController
if err := m.di.Resolve(&controller); err != nil {
ret <- errs.Wrap(err, "unable to resolve dispatcher controller")
return ret
}

// TODO(alsam) allow services to advertise with a name separate from the host name.
if err := controller.Start(m.Host()); err != nil {
if err := m.controller.Start(m.Host()); err != nil {
ret <- errs.Wrap(err, "unable to start dispatcher")
return ret
}

// Register dispatcher
if err := m.di.Register(controller.dispatcher); err != nil {
ret <- errs.Wrap(err, "unable to register the dispatcher")
return ret
}

m.register(m)
m.log.Info("Module started")

Expand All @@ -343,13 +354,8 @@ func (m *YARPCModule) Stop() error {
m.stateMu.Lock()
defer m.stateMu.Unlock()

var controller *dispatcherController
if err := m.di.Resolve(&controller); err != nil {
return errs.Wrap(err, "unable to resolve dispatcher controller")
}

m.isRunning = false
return controller.Stop()
return m.controller.Stop()
}

// IsRunning returns whether a module is running
Expand Down
16 changes: 16 additions & 0 deletions modules/rpc/yarpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.uber.org/fx/service"

"fmt"
"github.com/stretchr/testify/assert"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/http"
Expand Down Expand Up @@ -68,3 +69,18 @@ func TestMergeOfEmptyConfigCollectionReturnsError(t *testing.T) {
assert.EqualError(t, err, "unable to merge empty configs")
assert.EqualError(t, c.Start(service.NopHost()), err.Error())
}

func TestInboundPrint(t *testing.T) {
t.Parallel()
var i *Inbound
assert.Equal(t, "", fmt.Sprint(i))

i = &Inbound{}
assert.Equal(t, "Inbound:{HTTP: none; TChannel: none}", fmt.Sprint(i))
i.HTTP = &Address{8080}
assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: none}", fmt.Sprint(i))
i.TChannel = &Address{9876}
assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: 9876}", fmt.Sprint(i))
i.HTTP = nil
assert.Equal(t, "Inbound:{HTTP: none; TChannel: 9876}", fmt.Sprint(i))
}

0 comments on commit 3f344aa

Please sign in to comment.