Skip to content

Commit

Permalink
[dbnode] Add TChannel channel configuration (#2989)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Dec 7, 2020
1 parent 3175f2f commit 4faa2db
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 6 deletions.
10 changes: 10 additions & 0 deletions src/dbnode/integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ func NewTChannelClient(name, address string) (*TestTChannelClient, error) {
}, nil
}

// Address returns the address.
func (client *TestTChannelClient) Address() string {
return client.address
}

// Channel returns the TChannel channel.
func (client *TestTChannelClient) Channel() *tchannel.Channel {
return client.channel
}

// TChannelClientWrite writes a datapoint using a tchannel client.
func (client *TestTChannelClient) TChannelClientWrite(
timeout time.Duration, req *rpc.WriteRequest,
Expand Down
5 changes: 5 additions & 0 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type TestSetup interface {
Scope() tally.TestScope
M3DBClient() client.Client
M3DBVerificationAdminClient() client.AdminClient
TChannelClient() *TestTChannelClient
Namespaces() []namespace.Metadata
TopologyInitializer() topology.Initializer
SetTopologyInitializer(topology.Initializer)
Expand Down Expand Up @@ -789,6 +790,10 @@ func (ts *testSetup) StopServer() error {
return nil
}

func (ts *testSetup) TChannelClient() *TestTChannelClient {
return ts.tchannelClient
}

func (ts *testSetup) WriteBatch(namespace ident.ID, seriesList generate.SeriesBlock) error {
if ts.opts.UseTChannelClientForWriting() {
return ts.tchannelClient.TChannelClientWriteBatch(
Expand Down
33 changes: 33 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import (
"github.com/uber/tchannel-go/thrift"
)

// NewTChanChannelFn creates a tchan channel.
type NewTChanChannelFn func(
service Service,
channelName string,
opts *tchannel.ChannelOptions,
) (*tchannel.Channel, error)

func defaultTChanChannelFn(
service Service,
channelName string,
opts *tchannel.ChannelOptions,
) (*tchannel.Channel, error) {
return tchannel.NewChannel(channelName, opts)
}

// NewTChanNodeServerFn creates a tchan node server.
type NewTChanNodeServerFn func(
service Service,
Expand All @@ -49,6 +64,12 @@ type Options interface {
// ChannelOptions returns the tchan channel options.
ChannelOptions() *tchannel.ChannelOptions

// SetTChanChannelFn sets a tchan node channel registration.
SetTChanChannelFn(value NewTChanChannelFn) Options

// TChanChannelFn returns a tchan node channel registration.
TChanChannelFn() NewTChanChannelFn

// SetTChanNodeServerFn sets a tchan node server builder.
SetTChanNodeServerFn(value NewTChanNodeServerFn) Options

Expand All @@ -65,13 +86,15 @@ type Options interface {
type options struct {
channelOptions *tchannel.ChannelOptions
instrumentOpts instrument.Options
tchanChannelFn NewTChanChannelFn
tchanNodeServerFn NewTChanNodeServerFn
}

// NewOptions creates a new options.
func NewOptions(chanOpts *tchannel.ChannelOptions) Options {
return &options{
channelOptions: chanOpts,
tchanChannelFn: defaultTChanChannelFn,
tchanNodeServerFn: defaultTChanNodeServerFn,
}
}
Expand All @@ -85,6 +108,16 @@ func (o *options) ChannelOptions() *tchannel.ChannelOptions {
return o.channelOptions
}

func (o *options) SetTChanChannelFn(value NewTChanChannelFn) Options {
opts := *o
opts.tchanChannelFn = value
return &opts
}

func (o *options) TChanChannelFn() NewTChanChannelFn {
return o.tchanChannelFn
}

func (o *options) SetTChanNodeServerFn(value NewTChanNodeServerFn) Options {
opts := *o
opts.tchanNodeServerFn = value
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/network/server/tchannelthrift/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *server) ListenAndServe() (ns.Close, error) {
immutableOpts := *chanOpts
opts = &immutableOpts
}
channel, err := tchannel.NewChannel(channel.ChannelName, opts)
channel, err := s.opts.TChanChannelFn()(s.service, channel.ChannelName, opts)
if err != nil {
return nil, err
}
Expand Down
20 changes: 16 additions & 4 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,15 @@ type Service interface {
// Only safe to be called one time once the service has started.
SetDatabase(db storage.Database) error

// Database returns the current database.
Database() (storage.Database, error)

// SetMetadata sets a metadata key to the given value.
SetMetadata(key, value string)

// GetMetadata returns the metadata for the given key and a bool indicating
// Metadata returns the metadata for the given key and a bool indicating
// if it is present.
GetMetadata(key string) (string, bool)
Metadata(key string) (string, bool)
}

// NewService creates a new node TChannel Thrift service
Expand Down Expand Up @@ -349,7 +352,7 @@ func (s *service) SetMetadata(key, value string) {
s.state.health = newHealth
}

func (s *service) GetMetadata(key string) (string, bool) {
func (s *service) Metadata(key string) (string, bool) {
s.state.RLock()
md, found := s.state.health.Metadata[key]
s.state.RUnlock()
Expand Down Expand Up @@ -2197,11 +2200,20 @@ func (s *service) SetDatabase(db storage.Database) error {
if s.state.db != nil {
return errDatabaseHasAlreadyBeenSet
}

s.state.db = db
return nil
}

func (s *service) Database() (storage.Database, error) {
s.state.RLock()
defer s.state.RUnlock()

if s.state.db == nil {
return nil, errDatabaseIsNotInitializedYet
}
return s.state.db, nil
}

func (s *service) startWriteRPCWithDB() (storage.Database, error) {
if s.state.maxOutstandingWriteRPCs == 0 {
// No limitations on number of outstanding requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func TestServiceSetMetadata(t *testing.T) {
wg.Add(1)
md := md
go func() {
meta, ok := service.GetMetadata(md)
meta, ok := service.Metadata(md)
assert.True(t, ok)
assert.Equal(t, meta, md)
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ import (

// StorageOptions are options to apply to the database storage options.
type StorageOptions struct {
TChanChannelFn node.NewTChanChannelFn
TChanNodeServerFn node.NewTChanNodeServerFn
}
3 changes: 3 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ func Run(runOpts RunOptions) {
}
tchanOpts := ttnode.NewOptions(tchannelOpts).
SetInstrumentOptions(opts.InstrumentOptions())
if fn := runOpts.StorageOptions.TChanChannelFn; fn != nil {
tchanOpts = tchanOpts.SetTChanChannelFn(fn)
}
if fn := runOpts.StorageOptions.TChanNodeServerFn; fn != nil {
tchanOpts = tchanOpts.SetTChanNodeServerFn(fn)
}
Expand Down

0 comments on commit 4faa2db

Please sign in to comment.