From 3f344aa659479b881b9e3185e8ec5b060fcf7345 Mon Sep 17 00:00:00 2001 From: Alexandr Samylkin Date: Tue, 21 Feb 2017 00:20:52 -0800 Subject: [PATCH] Store controller instead of di graph in module --- modules/rpc/thrift.go | 10 +---- modules/rpc/thrift_test.go | 40 +++++++++++++++++- modules/rpc/yarpc.go | 84 ++++++++++++++++++++------------------ modules/rpc/yarpc_test.go | 16 ++++++++ 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/modules/rpc/thrift.go b/modules/rpc/thrift.go index 98fcf7fa1d..b07d4ffae8 100644 --- a/modules/rpc/thrift.go +++ b/modules/rpc/thrift.go @@ -21,15 +21,12 @@ package rpc import ( - "context" "sync" "go.uber.org/fx/modules" "go.uber.org/fx/service" - "go.uber.org/fx/ulog" "github.com/pkg/errors" - "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" ) @@ -67,13 +64,8 @@ func newYARPCThriftModule( reg := func(mod *YARPCModule) { _setupMu.Lock() defer _setupMu.Unlock() - var dispatcher *yarpc.Dispatcher - if err := mod.di.Resolve(&dispatcher); err != nil { - ulog.Logger(context.Background()).Error("can't resolve dispatcher", "error", err) - return - } - dispatcher.Register(registrants) + mod.controller.dispatcher.Register(registrants) } return newYARPCModule(mi, reg, options...) diff --git a/modules/rpc/thrift_test.go b/modules/rpc/thrift_test.go index a47b7a862e..e493901924 100644 --- a/modules/rpc/thrift_test.go +++ b/modules/rpc/thrift_test.go @@ -80,8 +80,43 @@ modules: // Dispatcher must be resolved in the default graph var dispatcher *yarpc.Dispatcher assert.NoError(t, dig.Resolve(&dispatcher)) - assert.NotEmpty(t, dispatcher) + assert.Equal(t, 2, len(dispatcher.Inbounds())) +} + +func TestThriftModuleSeparateGraph_OK(t *testing.T) { + t.Parallel() + di := dig.New() + snowflake := ThriftModule(okCreate, modules.WithRoles("rescue"), withGraph(di)) + cfg := []byte(` +modules: + rpc: + inbounds: + - tchannel: + port: 0 + - http: + port: 0 +`) + + mci := service.ModuleCreateInfo{ + Name: "RPC", + Host: testHost{ + Host: service.NopHost(), + config: config.NewYAMLProviderFromBytes(cfg), + }, + Items: make(map[string]interface{}), + } + + special, err := snowflake(mci) + require.NoError(t, err) + assert.NotEmpty(t, special) + + testInitRunModule(t, special[0], mci) + + // Dispatcher must be resolved in the default graph + var dispatcher *yarpc.Dispatcher + assert.NoError(t, di.Resolve(&dispatcher)) + assert.Equal(t, 2, len(dispatcher.Inbounds())) } func TestThriftModule_BadOptions(t *testing.T) { @@ -113,7 +148,8 @@ func testInitRunModule(t *testing.T, mod service.Module, mci service.ModuleCreat func mch() service.ModuleCreateInfo { return service.ModuleCreateInfo{ - Host: service.NopHost(), + Host: service.NopHost(), + Items: make(map[string]interface{}), } } diff --git a/modules/rpc/yarpc.go b/modules/rpc/yarpc.go index d49e3f95ef..85534c2866 100644 --- a/modules/rpc/yarpc.go +++ b/modules/rpc/yarpc.go @@ -24,9 +24,9 @@ import ( "context" "errors" "fmt" + "strconv" "sync" - "go.uber.org/fx/dig" "go.uber.org/fx/modules" "go.uber.org/fx/modules/rpc/internal/stats" "go.uber.org/fx/service" @@ -43,14 +43,17 @@ import ( // YARPCModule is an implementation of a core RPC module using YARPC. // All the YARPC modules share the same dispatcher and middleware. // Dispatcher will start when any created module calls Start(). +// The YARPC team advised dispatcher to be a 'singleton' to control +// the lifecycle of all of the in/out bound traffic, so we will +// register it in a dig.Graph provided with options/default graph. type YARPCModule struct { modules.ModuleBase - register registerServiceFunc - config yarpcConfig - log ulog.Log - stateMu sync.RWMutex - isRunning bool - di dig.Graph + register registerServiceFunc + config yarpcConfig + log ulog.Log + stateMu sync.RWMutex + isRunning bool + controller *dispatcherController } var ( @@ -82,12 +85,29 @@ type yarpcConfig struct { } // Inbound is a union that configures how to configure a single inbound. -// TODO(alsam) write a formatter to print ports instead of addresses. type Inbound struct { TChannel *Address HTTP *Address } +func (i *Inbound) String() string { + if i == nil { + return "" + } + + http := "none" + if i.HTTP != nil { + http = strconv.Itoa(i.HTTP.Port) + } + + tchannel := "none" + if i.TChannel != nil { + tchannel = strconv.Itoa(i.TChannel.Port) + } + + return fmt.Sprintf("Inbound:{HTTP: %s; TChannel: %s}", http, tchannel) +} + // Address is a struct that have a required port for tchannel/http transports. // TODO(alsam) make it optional type Address struct { @@ -97,8 +117,6 @@ type Address struct { // Stores a collection of all modules configs with a shared dispatcher // that are safe to call from multiple go routines. All the configs must // share the same AdvertiseName and represent a single service. -// The YARPC team advised it to be a 'singleton' to control -// the lifecycle of all of the in/out bound traffic. type dispatcherController struct { // sync configs sync.RWMutex @@ -110,7 +128,7 @@ type dispatcherController struct { startError error configs []*yarpcConfig - dispatcher *yarpc.Dispatcher + dispatcher yarpc.Dispatcher } // Adds the config to the controller @@ -154,12 +172,14 @@ func (c *dispatcherController) Start(host service.Host) error { _dispatcherMu.Lock() defer _dispatcherMu.Unlock() - if c.dispatcher, err = _dispatcherFn(host, cfg); err != nil { + var d *yarpc.Dispatcher + if d, err = _dispatcherFn(host, cfg); err != nil { c.startError = err return } - c.startError = _starterFn(c.dispatcher) + c.dispatcher = *d + c.startError = _starterFn(&c.dispatcher) }) return c.startError @@ -248,21 +268,25 @@ func newYARPCModule( module.config.inboundMiddleware = inboundMiddlewareFromCreateInfo(mi) module.config.onewayInboundMiddleware = onewayInboundMiddlewareFromCreateInfo(mi) - module.di = graphFromCreateInfo(mi) - var controller *dispatcherController + di := graphFromCreateInfo(mi) // Try to resolve a controller first // TODO(alsam) use dig options when available. - if err := module.di.Resolve(&controller); err != nil { + if err := di.Resolve(&module.controller); err != nil { // Try to register it then - controller = &dispatcherController{} - if errCr := module.di.Register(controller); errCr != nil { + module.controller = &dispatcherController{} + if errCr := di.Register(module.controller); errCr != nil { return nil, errs.Wrap(errCr, "can't register a dispatcher controller") } + + // Register dispatcher + if err := di.Register(&module.controller.dispatcher); err != nil { + return nil, errs.Wrap(err, "unable to register the dispatcher") + } } - controller.addConfig(module.config) + module.controller.addConfig(module.config) module.log.Info("Module successfuly created", "inbounds", module.config.Inbounds) @@ -306,25 +330,12 @@ func (m *YARPCModule) Start(readyCh chan<- struct{}) <-chan error { m.stateMu.Lock() defer m.stateMu.Unlock() - // Resolve the controller - var controller *dispatcherController - if err := m.di.Resolve(&controller); err != nil { - ret <- errs.Wrap(err, "unable to resolve dispatcher controller") - return ret - } - // TODO(alsam) allow services to advertise with a name separate from the host name. - if err := controller.Start(m.Host()); err != nil { + if err := m.controller.Start(m.Host()); err != nil { ret <- errs.Wrap(err, "unable to start dispatcher") return ret } - // Register dispatcher - if err := m.di.Register(controller.dispatcher); err != nil { - ret <- errs.Wrap(err, "unable to register the dispatcher") - return ret - } - m.register(m) m.log.Info("Module started") @@ -343,13 +354,8 @@ func (m *YARPCModule) Stop() error { m.stateMu.Lock() defer m.stateMu.Unlock() - var controller *dispatcherController - if err := m.di.Resolve(&controller); err != nil { - return errs.Wrap(err, "unable to resolve dispatcher controller") - } - m.isRunning = false - return controller.Stop() + return m.controller.Stop() } // IsRunning returns whether a module is running diff --git a/modules/rpc/yarpc_test.go b/modules/rpc/yarpc_test.go index 75c3243bdf..4c13495dc9 100644 --- a/modules/rpc/yarpc_test.go +++ b/modules/rpc/yarpc_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/fx/service" + "fmt" "github.com/stretchr/testify/assert" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/transport/http" @@ -68,3 +69,18 @@ func TestMergeOfEmptyConfigCollectionReturnsError(t *testing.T) { assert.EqualError(t, err, "unable to merge empty configs") assert.EqualError(t, c.Start(service.NopHost()), err.Error()) } + +func TestInboundPrint(t *testing.T) { + t.Parallel() + var i *Inbound + assert.Equal(t, "", fmt.Sprint(i)) + + i = &Inbound{} + assert.Equal(t, "Inbound:{HTTP: none; TChannel: none}", fmt.Sprint(i)) + i.HTTP = &Address{8080} + assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: none}", fmt.Sprint(i)) + i.TChannel = &Address{9876} + assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: 9876}", fmt.Sprint(i)) + i.HTTP = nil + assert.Equal(t, "Inbound:{HTTP: none; TChannel: 9876}", fmt.Sprint(i)) +}