From 5e4e5bf7fd0bc2c198dad44e9aabb06547dba22a Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 19 Nov 2021 11:33:43 -0800 Subject: [PATCH 1/4] [msg] Do not Close singleton MessageProcessors when closing connections This fixes a regression that was introduced in https://github.com/m3db/m3/pull/3918 The shared singleton MessageProcessor was being closed when a Connection was being closed. Now a Pool interface is introduced. A MessageProcessor is fetched when a Connection is created and returned when a Connection is closed. It is up to the pool impl to decide if it should Close the MessageProcessor on each return or only when the pool is closed. --- src/aggregator/server/m3msg/server.go | 2 +- src/msg/consumer/consumer.go | 7 ++-- src/msg/consumer/handlers.go | 26 ++++++------ src/msg/consumer/handlers_test.go | 28 ++++++++----- src/msg/consumer/types.go | 57 +++++++++++++++++++++++---- 5 files changed, 86 insertions(+), 34 deletions(-) diff --git a/src/aggregator/server/m3msg/server.go b/src/aggregator/server/m3msg/server.go index bf98e4850e..73456b3206 100644 --- a/src/aggregator/server/m3msg/server.go +++ b/src/aggregator/server/m3msg/server.go @@ -50,7 +50,7 @@ func NewServer( logger: opts.InstrumentOptions().Logger(), } } - handler := consumer.NewMessageHandler(newMessageProcessor, opts.ConsumerOptions()) + handler := consumer.NewMessageHandler(consumer.NewMessageProcessorPool(newMessageProcessor), opts.ConsumerOptions()) return xserver.NewServer(address, handler, opts.ServerOptions()), nil } diff --git a/src/msg/consumer/consumer.go b/src/msg/consumer/consumer.go index b27dbc8b1b..dc0efcb0a3 100644 --- a/src/msg/consumer/consumer.go +++ b/src/msg/consumer/consumer.go @@ -69,7 +69,7 @@ func (l *listener) Accept() (Consumer, error) { return nil, err } - return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor), nil + return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor()), nil } type metrics struct { @@ -123,7 +123,7 @@ func newConsumer( mPool *messagePool, opts Options, m metrics, - newMessageProcessorFn NewMessageProcessorFn, + mp MessageProcessor, ) *consumer { var ( wOpts = xio.ResettableWriterOptions{ @@ -146,7 +146,7 @@ func newConsumer( closed: false, doneCh: make(chan struct{}), m: m, - messageProcessor: newMessageProcessorFn(), + messageProcessor: mp, } } @@ -262,7 +262,6 @@ func (c *consumer) Close() { close(c.doneCh) c.wg.Wait() c.conn.Close() - c.messageProcessor.Close() } type message struct { diff --git a/src/msg/consumer/handlers.go b/src/msg/consumer/handlers.go index 84b0f4ad6e..babc137ee3 100644 --- a/src/msg/consumer/handlers.go +++ b/src/msg/consumer/handlers.go @@ -31,26 +31,27 @@ import ( ) type messageHandler struct { - opts Options - mPool *messagePool - newMessageProcessorFn NewMessageProcessorFn - m metrics + opts Options + mPool *messagePool + mpPool MessageProcessorPool + m metrics } // NewMessageHandler creates a new server handler with messageFn. -func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler { +func NewMessageHandler(mpPool MessageProcessorPool, opts Options) server.Handler { mPool := newMessagePool(opts.MessagePoolOptions()) mPool.Init() return &messageHandler{ - newMessageProcessorFn: newMessageProcessorFn, - opts: opts, - mPool: mPool, - m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), + mpPool: mpPool, + opts: opts, + mPool: mPool, + m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), } } func (h *messageHandler) Handle(conn net.Conn) { - c := newConsumer(conn, h.mPool, h.opts, h.m, h.newMessageProcessorFn) + mp := h.mpPool.Get() + c := newConsumer(conn, h.mPool, h.opts, h.m, mp) c.Init() var ( msgErr error @@ -68,7 +69,10 @@ func (h *messageHandler) Handle(conn net.Conn) { if msgErr != nil && msgErr != io.EOF { h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer") } + h.mpPool.Put(mp) c.Close() } -func (h *messageHandler) Close() {} +func (h *messageHandler) Close() { + h.mpPool.Close() +} diff --git a/src/msg/consumer/handlers_test.go b/src/msg/consumer/handlers_test.go index 1c7f9d99c5..7e4c1341a2 100644 --- a/src/msg/consumer/handlers_test.go +++ b/src/msg/consumer/handlers_test.go @@ -35,7 +35,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestServerWithMessageFn(t *testing.T) { +func TestServerWithSingletonMessageProcessor(t *testing.T) { defer leaktest.Check(t)() var ( @@ -53,7 +53,7 @@ func TestServerWithMessageFn(t *testing.T) { m.Ack() wg.Done() }, - ).Times(2) + ).Times(3) // Set a large ack buffer size to make sure the background go routine // can flush it. opts := testOptions().SetAckBufferSize(100) @@ -63,27 +63,35 @@ func TestServerWithMessageFn(t *testing.T) { s := server.NewServer("a", NewMessageHandler(SingletonMessageProcessor(p), opts), server.NewOptions()) s.Serve(l) - conn, err := net.Dial("tcp", l.Addr().String()) + conn1, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + conn2, err := net.Dial("tcp", l.Addr().String()) require.NoError(t, err) - wg.Add(1) - err = produce(conn, &testMsg1) + wg.Add(3) + err = produce(conn1, &testMsg1) require.NoError(t, err) - wg.Add(1) - err = produce(conn, &testMsg2) + err = produce(conn1, &testMsg2) + require.NoError(t, err) + err = produce(conn2, &testMsg2) require.NoError(t, err) wg.Wait() require.Equal(t, string(testMsg1.Value), data[0]) require.Equal(t, string(testMsg2.Value), data[1]) + require.Equal(t, string(testMsg2.Value), data[2]) var ack msgpb.Ack - testDecoder := proto.NewDecoder(conn, opts.DecoderOptions(), 10) + testDecoder := proto.NewDecoder(conn1, opts.DecoderOptions(), 10) + err = testDecoder.Decode(&ack) + require.NoError(t, err) + testDecoder = proto.NewDecoder(conn2, opts.DecoderOptions(), 10) err = testDecoder.Decode(&ack) require.NoError(t, err) - require.Equal(t, 2, len(ack.Metadata)) + require.Equal(t, 3, len(ack.Metadata)) require.Equal(t, testMsg1.Metadata, ack.Metadata[0]) require.Equal(t, testMsg2.Metadata, ack.Metadata[1]) + require.Equal(t, testMsg2.Metadata, ack.Metadata[2]) p.EXPECT().Close() s.Close() @@ -126,7 +134,7 @@ func TestServerMessageDifferentConnections(t *testing.T) { return mp2 } - s := server.NewServer("a", NewMessageHandler(newMessageProcessor, opts), server.NewOptions()) + s := server.NewServer("a", NewMessageHandler(NewMessageProcessorPool(newMessageProcessor), opts), server.NewOptions()) require.NoError(t, err) require.NoError(t, s.Serve(l)) diff --git a/src/msg/consumer/types.go b/src/msg/consumer/types.go index cf556eb678..bdd76fe52b 100644 --- a/src/msg/consumer/types.go +++ b/src/msg/consumer/types.go @@ -132,17 +132,58 @@ type MessageProcessor interface { Close() } -// NewMessageProcessorFn creates a new MessageProcessor scoped to a single connection. Messages are processed serially -// in a connection. -type NewMessageProcessorFn func() MessageProcessor +// MessageProcessorPool returns MessageProcessors. +type MessageProcessorPool interface { + // Get returns a MessageProcessor. + Get() MessageProcessor + // Put returns the MessageProcessor. + Put(mp MessageProcessor) + // Close the pool. + Close() +} + +// SingletonMessageProcessor returns a MessageProcessorPool that shares the same MessageProcessor for all users. The +// MessageProcessor is closed when the pool is closed. +func SingletonMessageProcessor(mp MessageProcessor) MessageProcessorPool { + return &singletonMessageProcessorPool{mp: mp} +} + +type singletonMessageProcessorPool struct { + mp MessageProcessor +} + +func (s singletonMessageProcessorPool) Get() MessageProcessor { + return s.mp +} + +func (s singletonMessageProcessorPool) Put(MessageProcessor) { + // mp is shared by all users, nothing to do. +} -// SingletonMessageProcessor uses the same MessageProcessor for all connections. -func SingletonMessageProcessor(p MessageProcessor) NewMessageProcessorFn { - return func() MessageProcessor { - return p - } +func (s singletonMessageProcessorPool) Close() { + s.mp.Close() } +// NewMessageProcessorPool returns a MessageProcessorPool that creates a new MessageProcessor for every call to Get +// and closes the MessageProcessor for every call to Put. +func NewMessageProcessorPool(fn func() MessageProcessor) MessageProcessorPool { + return &messageProcessorPool{fn: fn} +} + +type messageProcessorPool struct { + fn func() MessageProcessor +} + +func (m messageProcessorPool) Get() MessageProcessor { + return m.fn() +} + +func (m messageProcessorPool) Put(mp MessageProcessor) { + mp.Close() +} + +func (m messageProcessorPool) Close() {} + // NewNoOpMessageProcessor creates a new MessageProcessor that does nothing. func NewNoOpMessageProcessor() MessageProcessor { return &noOpMessageProcessor{} From acb8e21300fe72c21f59bc2d50a2c32a9109e44a Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 19 Nov 2021 13:17:45 -0800 Subject: [PATCH 2/4] fix racy test --- src/msg/consumer/handlers_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/msg/consumer/handlers_test.go b/src/msg/consumer/handlers_test.go index 7e4c1341a2..ec4a67798c 100644 --- a/src/msg/consumer/handlers_test.go +++ b/src/msg/consumer/handlers_test.go @@ -22,6 +22,7 @@ package consumer import ( "net" + "sort" "sync" "testing" @@ -41,6 +42,7 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { var ( data []string wg sync.WaitGroup + mu sync.Mutex ) ctrl := gomock.NewController(t) @@ -49,7 +51,9 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { p := NewMockMessageProcessor(ctrl) p.EXPECT().Process(gomock.Any()).Do( func(m Message) { + mu.Lock() data = append(data, string(m.Bytes())) + mu.Unlock() m.Ack() wg.Done() }, @@ -61,7 +65,8 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { require.NoError(t, err) s := server.NewServer("a", NewMessageHandler(SingletonMessageProcessor(p), opts), server.NewOptions()) - s.Serve(l) + defer s.Close() + require.NoError(t, s.Serve(l)) conn1, err := net.Dial("tcp", l.Addr().String()) require.NoError(t, err) @@ -77,24 +82,25 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { require.NoError(t, err) wg.Wait() - require.Equal(t, string(testMsg1.Value), data[0]) + sort.Strings(data) + require.Equal(t, string(testMsg2.Value), data[0]) require.Equal(t, string(testMsg2.Value), data[1]) - require.Equal(t, string(testMsg2.Value), data[2]) + require.Equal(t, string(testMsg1.Value), data[2]) var ack msgpb.Ack testDecoder := proto.NewDecoder(conn1, opts.DecoderOptions(), 10) err = testDecoder.Decode(&ack) - require.NoError(t, err) testDecoder = proto.NewDecoder(conn2, opts.DecoderOptions(), 10) err = testDecoder.Decode(&ack) require.NoError(t, err) require.Equal(t, 3, len(ack.Metadata)) + sort.Slice(ack.Metadata, func(i, j int) bool { + return ack.Metadata[i].Id < ack.Metadata[j].Id + }) require.Equal(t, testMsg1.Metadata, ack.Metadata[0]) require.Equal(t, testMsg2.Metadata, ack.Metadata[1]) require.Equal(t, testMsg2.Metadata, ack.Metadata[2]) - p.EXPECT().Close() - s.Close() } func TestServerMessageDifferentConnections(t *testing.T) { From 17b7a0c9cf87c6a01b943c632d78ba89204500ca Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 19 Nov 2021 14:08:16 -0800 Subject: [PATCH 3/4] review comments --- src/aggregator/server/m3msg/server.go | 2 +- src/msg/consumer/handlers.go | 22 +++++----- src/msg/consumer/handlers_test.go | 5 ++- src/msg/consumer/types.go | 61 ++++++++++++++------------- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/src/aggregator/server/m3msg/server.go b/src/aggregator/server/m3msg/server.go index 73456b3206..467ff6e5c8 100644 --- a/src/aggregator/server/m3msg/server.go +++ b/src/aggregator/server/m3msg/server.go @@ -50,7 +50,7 @@ func NewServer( logger: opts.InstrumentOptions().Logger(), } } - handler := consumer.NewMessageHandler(consumer.NewMessageProcessorPool(newMessageProcessor), opts.ConsumerOptions()) + handler := consumer.NewMessageHandler(consumer.NewMessageProcessorFactory(newMessageProcessor), opts.ConsumerOptions()) return xserver.NewServer(address, handler, opts.ServerOptions()), nil } diff --git a/src/msg/consumer/handlers.go b/src/msg/consumer/handlers.go index babc137ee3..6658048d0f 100644 --- a/src/msg/consumer/handlers.go +++ b/src/msg/consumer/handlers.go @@ -32,25 +32,25 @@ import ( type messageHandler struct { opts Options - mPool *messagePool - mpPool MessageProcessorPool - m metrics + mPool *messagePool + mpFactory MessageProcessorFactory + m metrics } // NewMessageHandler creates a new server handler with messageFn. -func NewMessageHandler(mpPool MessageProcessorPool, opts Options) server.Handler { +func NewMessageHandler(mpFactory MessageProcessorFactory, opts Options) server.Handler { mPool := newMessagePool(opts.MessagePoolOptions()) mPool.Init() return &messageHandler{ - mpPool: mpPool, - opts: opts, - mPool: mPool, - m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), + mpFactory: mpFactory, + opts: opts, + mPool: mPool, + m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), } } func (h *messageHandler) Handle(conn net.Conn) { - mp := h.mpPool.Get() + mp := h.mpFactory.Create() c := newConsumer(conn, h.mPool, h.opts, h.m, mp) c.Init() var ( @@ -69,10 +69,10 @@ func (h *messageHandler) Handle(conn net.Conn) { if msgErr != nil && msgErr != io.EOF { h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer") } - h.mpPool.Put(mp) + mp.Close() c.Close() } func (h *messageHandler) Close() { - h.mpPool.Close() + h.mpFactory.Close() } diff --git a/src/msg/consumer/handlers_test.go b/src/msg/consumer/handlers_test.go index ec4a67798c..21e58097ad 100644 --- a/src/msg/consumer/handlers_test.go +++ b/src/msg/consumer/handlers_test.go @@ -42,7 +42,7 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { var ( data []string wg sync.WaitGroup - mu sync.Mutex + mu sync.Mutex ) ctrl := gomock.NewController(t) @@ -90,6 +90,7 @@ func TestServerWithSingletonMessageProcessor(t *testing.T) { var ack msgpb.Ack testDecoder := proto.NewDecoder(conn1, opts.DecoderOptions(), 10) err = testDecoder.Decode(&ack) + require.NoError(t, err) testDecoder = proto.NewDecoder(conn2, opts.DecoderOptions(), 10) err = testDecoder.Decode(&ack) require.NoError(t, err) @@ -140,7 +141,7 @@ func TestServerMessageDifferentConnections(t *testing.T) { return mp2 } - s := server.NewServer("a", NewMessageHandler(NewMessageProcessorPool(newMessageProcessor), opts), server.NewOptions()) + s := server.NewServer("a", NewMessageHandler(NewMessageProcessorFactory(newMessageProcessor), opts), server.NewOptions()) require.NoError(t, err) require.NoError(t, s.Serve(l)) diff --git a/src/msg/consumer/types.go b/src/msg/consumer/types.go index bdd76fe52b..5f01b84af1 100644 --- a/src/msg/consumer/types.go +++ b/src/msg/consumer/types.go @@ -132,57 +132,58 @@ type MessageProcessor interface { Close() } -// MessageProcessorPool returns MessageProcessors. -type MessageProcessorPool interface { - // Get returns a MessageProcessor. - Get() MessageProcessor - // Put returns the MessageProcessor. - Put(mp MessageProcessor) - // Close the pool. +// MessageProcessorFactory creates MessageProcessors. +type MessageProcessorFactory interface { + // Create returns a MessageProcessor. + Create() MessageProcessor + // Close the factory. Close() } -// SingletonMessageProcessor returns a MessageProcessorPool that shares the same MessageProcessor for all users. The -// MessageProcessor is closed when the pool is closed. -func SingletonMessageProcessor(mp MessageProcessor) MessageProcessorPool { - return &singletonMessageProcessorPool{mp: mp} +// SingletonMessageProcessor returns a MessageProcessorFactory that shares the same MessageProcessor for all users. The +// MessageProcessor is closed when the factory is closed. +func SingletonMessageProcessor(mp MessageProcessor) MessageProcessorFactory { + return &singletonMessageProcessorFactory{mp: mp, noClose: &noCloseMessageProcessor{mp: mp}} } -type singletonMessageProcessorPool struct { - mp MessageProcessor +type singletonMessageProcessorFactory struct { + mp MessageProcessor + noClose MessageProcessor } -func (s singletonMessageProcessorPool) Get() MessageProcessor { - return s.mp +func (s singletonMessageProcessorFactory) Create() MessageProcessor { + return s.noClose } -func (s singletonMessageProcessorPool) Put(MessageProcessor) { - // mp is shared by all users, nothing to do. +func (s singletonMessageProcessorFactory) Close() { + s.mp.Close() } -func (s singletonMessageProcessorPool) Close() { - s.mp.Close() +type noCloseMessageProcessor struct { + mp MessageProcessor } -// NewMessageProcessorPool returns a MessageProcessorPool that creates a new MessageProcessor for every call to Get -// and closes the MessageProcessor for every call to Put. -func NewMessageProcessorPool(fn func() MessageProcessor) MessageProcessorPool { - return &messageProcessorPool{fn: fn} +func (n noCloseMessageProcessor) Process(m Message) { + n.mp.Process(m) } -type messageProcessorPool struct { - fn func() MessageProcessor +func (n noCloseMessageProcessor) Close() {} + +// NewMessageProcessorFactory returns a MessageProcessorFactory that creates a new MessageProcessor for every call to +// Create. +func NewMessageProcessorFactory(fn func() MessageProcessor) MessageProcessorFactory { + return &messageProcessorFactory{fn: fn} } -func (m messageProcessorPool) Get() MessageProcessor { - return m.fn() +type messageProcessorFactory struct { + fn func() MessageProcessor } -func (m messageProcessorPool) Put(mp MessageProcessor) { - mp.Close() +func (m messageProcessorFactory) Create() MessageProcessor { + return m.fn() } -func (m messageProcessorPool) Close() {} +func (m messageProcessorFactory) Close() {} // NewNoOpMessageProcessor creates a new MessageProcessor that does nothing. func NewNoOpMessageProcessor() MessageProcessor { From 827f024c815d817328dd7fa12a5076af55aaf9b6 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 19 Nov 2021 14:12:23 -0800 Subject: [PATCH 4/4] lint --- src/msg/consumer/handlers.go | 2 +- src/msg/consumer/handlers_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/msg/consumer/handlers.go b/src/msg/consumer/handlers.go index 6658048d0f..845efc1e00 100644 --- a/src/msg/consumer/handlers.go +++ b/src/msg/consumer/handlers.go @@ -31,7 +31,7 @@ import ( ) type messageHandler struct { - opts Options + opts Options mPool *messagePool mpFactory MessageProcessorFactory m metrics diff --git a/src/msg/consumer/handlers_test.go b/src/msg/consumer/handlers_test.go index 21e58097ad..454de168a9 100644 --- a/src/msg/consumer/handlers_test.go +++ b/src/msg/consumer/handlers_test.go @@ -141,7 +141,8 @@ func TestServerMessageDifferentConnections(t *testing.T) { return mp2 } - s := server.NewServer("a", NewMessageHandler(NewMessageProcessorFactory(newMessageProcessor), opts), server.NewOptions()) + s := server.NewServer("a", + NewMessageHandler(NewMessageProcessorFactory(newMessageProcessor), opts), server.NewOptions()) require.NoError(t, err) require.NoError(t, s.Serve(l))