From 4faa2db2509ee3760f5e0bd4050d8447d3007723 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 7 Dec 2020 03:56:48 -0500 Subject: [PATCH] [dbnode] Add TChannel channel configuration (#2989) --- src/dbnode/integration/client.go | 10 ++++++ src/dbnode/integration/setup.go | 5 +++ .../server/tchannelthrift/node/options.go | 33 +++++++++++++++++++ .../server/tchannelthrift/node/server.go | 2 +- .../server/tchannelthrift/node/service.go | 20 ++++++++--- .../tchannelthrift/node/service_test.go | 2 +- src/dbnode/server/options.go | 1 + src/dbnode/server/server.go | 3 ++ 8 files changed, 70 insertions(+), 6 deletions(-) diff --git a/src/dbnode/integration/client.go b/src/dbnode/integration/client.go index 841a2d772b..6e491f743f 100644 --- a/src/dbnode/integration/client.go +++ b/src/dbnode/integration/client.go @@ -67,6 +67,16 @@ func NewTChannelClient(name, address string) (*TestTChannelClient, error) { }, nil } +// Address returns the address. +func (client *TestTChannelClient) Address() string { + return client.address +} + +// Channel returns the TChannel channel. +func (client *TestTChannelClient) Channel() *tchannel.Channel { + return client.channel +} + // TChannelClientWrite writes a datapoint using a tchannel client. func (client *TestTChannelClient) TChannelClientWrite( timeout time.Duration, req *rpc.WriteRequest, diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 6d043d5010..9caa48661d 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -149,6 +149,7 @@ type TestSetup interface { Scope() tally.TestScope M3DBClient() client.Client M3DBVerificationAdminClient() client.AdminClient + TChannelClient() *TestTChannelClient Namespaces() []namespace.Metadata TopologyInitializer() topology.Initializer SetTopologyInitializer(topology.Initializer) @@ -789,6 +790,10 @@ func (ts *testSetup) StopServer() error { return nil } +func (ts *testSetup) TChannelClient() *TestTChannelClient { + return ts.tchannelClient +} + func (ts *testSetup) WriteBatch(namespace ident.ID, seriesList generate.SeriesBlock) error { if ts.opts.UseTChannelClientForWriting() { return ts.tchannelClient.TChannelClientWriteBatch( diff --git a/src/dbnode/network/server/tchannelthrift/node/options.go b/src/dbnode/network/server/tchannelthrift/node/options.go index a00f953e20..956c5b2ce7 100644 --- a/src/dbnode/network/server/tchannelthrift/node/options.go +++ b/src/dbnode/network/server/tchannelthrift/node/options.go @@ -28,6 +28,21 @@ import ( "github.com/uber/tchannel-go/thrift" ) +// NewTChanChannelFn creates a tchan channel. +type NewTChanChannelFn func( + service Service, + channelName string, + opts *tchannel.ChannelOptions, +) (*tchannel.Channel, error) + +func defaultTChanChannelFn( + service Service, + channelName string, + opts *tchannel.ChannelOptions, +) (*tchannel.Channel, error) { + return tchannel.NewChannel(channelName, opts) +} + // NewTChanNodeServerFn creates a tchan node server. type NewTChanNodeServerFn func( service Service, @@ -49,6 +64,12 @@ type Options interface { // ChannelOptions returns the tchan channel options. ChannelOptions() *tchannel.ChannelOptions + // SetTChanChannelFn sets a tchan node channel registration. + SetTChanChannelFn(value NewTChanChannelFn) Options + + // TChanChannelFn returns a tchan node channel registration. + TChanChannelFn() NewTChanChannelFn + // SetTChanNodeServerFn sets a tchan node server builder. SetTChanNodeServerFn(value NewTChanNodeServerFn) Options @@ -65,6 +86,7 @@ type Options interface { type options struct { channelOptions *tchannel.ChannelOptions instrumentOpts instrument.Options + tchanChannelFn NewTChanChannelFn tchanNodeServerFn NewTChanNodeServerFn } @@ -72,6 +94,7 @@ type options struct { func NewOptions(chanOpts *tchannel.ChannelOptions) Options { return &options{ channelOptions: chanOpts, + tchanChannelFn: defaultTChanChannelFn, tchanNodeServerFn: defaultTChanNodeServerFn, } } @@ -85,6 +108,16 @@ func (o *options) ChannelOptions() *tchannel.ChannelOptions { return o.channelOptions } +func (o *options) SetTChanChannelFn(value NewTChanChannelFn) Options { + opts := *o + opts.tchanChannelFn = value + return &opts +} + +func (o *options) TChanChannelFn() NewTChanChannelFn { + return o.tchanChannelFn +} + func (o *options) SetTChanNodeServerFn(value NewTChanNodeServerFn) Options { opts := *o opts.tchanNodeServerFn = value diff --git a/src/dbnode/network/server/tchannelthrift/node/server.go b/src/dbnode/network/server/tchannelthrift/node/server.go index 91f560fa92..cf985cd79f 100644 --- a/src/dbnode/network/server/tchannelthrift/node/server.go +++ b/src/dbnode/network/server/tchannelthrift/node/server.go @@ -59,7 +59,7 @@ func (s *server) ListenAndServe() (ns.Close, error) { immutableOpts := *chanOpts opts = &immutableOpts } - channel, err := tchannel.NewChannel(channel.ChannelName, opts) + channel, err := s.opts.TChanChannelFn()(s.service, channel.ChannelName, opts) if err != nil { return nil, err } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 9efe6b78cd..2250b1ba5a 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -253,12 +253,15 @@ type Service interface { // Only safe to be called one time once the service has started. SetDatabase(db storage.Database) error + // Database returns the current database. + Database() (storage.Database, error) + // SetMetadata sets a metadata key to the given value. SetMetadata(key, value string) - // GetMetadata returns the metadata for the given key and a bool indicating + // Metadata returns the metadata for the given key and a bool indicating // if it is present. - GetMetadata(key string) (string, bool) + Metadata(key string) (string, bool) } // NewService creates a new node TChannel Thrift service @@ -349,7 +352,7 @@ func (s *service) SetMetadata(key, value string) { s.state.health = newHealth } -func (s *service) GetMetadata(key string) (string, bool) { +func (s *service) Metadata(key string) (string, bool) { s.state.RLock() md, found := s.state.health.Metadata[key] s.state.RUnlock() @@ -2197,11 +2200,20 @@ func (s *service) SetDatabase(db storage.Database) error { if s.state.db != nil { return errDatabaseHasAlreadyBeenSet } - s.state.db = db return nil } +func (s *service) Database() (storage.Database, error) { + s.state.RLock() + defer s.state.RUnlock() + + if s.state.db == nil { + return nil, errDatabaseIsNotInitializedYet + } + return s.state.db, nil +} + func (s *service) startWriteRPCWithDB() (storage.Database, error) { if s.state.maxOutstandingWriteRPCs == 0 { // No limitations on number of outstanding requests. diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 0f376001b1..2b8faf0219 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -377,7 +377,7 @@ func TestServiceSetMetadata(t *testing.T) { wg.Add(1) md := md go func() { - meta, ok := service.GetMetadata(md) + meta, ok := service.Metadata(md) assert.True(t, ok) assert.Equal(t, meta, md) wg.Done() diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index de17ff5f76..1736a55dc8 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -26,5 +26,6 @@ import ( // StorageOptions are options to apply to the database storage options. type StorageOptions struct { + TChanChannelFn node.NewTChanChannelFn TChanNodeServerFn node.NewTChanNodeServerFn } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 72422ee912..52df3307c7 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -715,6 +715,9 @@ func Run(runOpts RunOptions) { } tchanOpts := ttnode.NewOptions(tchannelOpts). SetInstrumentOptions(opts.InstrumentOptions()) + if fn := runOpts.StorageOptions.TChanChannelFn; fn != nil { + tchanOpts = tchanOpts.SetTChanChannelFn(fn) + } if fn := runOpts.StorageOptions.TChanNodeServerFn; fn != nil { tchanOpts = tchanOpts.SetTChanNodeServerFn(fn) }