Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3msg] Remove unnecessary ConsumeHandler interface #3918

Merged
merged 3 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 31 additions & 42 deletions src/aggregator/server/m3msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
package m3msg

import (
"errors"
"fmt"
"io"

"github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/metrics/encoding"
Expand All @@ -35,11 +33,6 @@ import (
"go.uber.org/zap"
)

type server struct {
aggregator aggregator.Aggregator
logger *zap.Logger
}

// NewServer creates a new M3Msg server.
func NewServer(
address string,
Expand All @@ -49,49 +42,43 @@ func NewServer(
if err := opts.Validate(); err != nil {
return nil, err
}

s := &server{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
newMessageProcessor := func() consumer.MessageProcessor {
// construct a new messageProcessor per consumer so the internal protos can be reused across messages on the
// same connection.
return &messageProcessor{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
}
}

handler := consumer.NewConsumerHandler(s.Consume, opts.ConsumerOptions())
handler := consumer.NewMessageHandler(newMessageProcessor, opts.ConsumerOptions())
return xserver.NewServer(address, handler, opts.ServerOptions()), nil
}

func (s *server) Consume(c consumer.Consumer) {
var (
pb = &metricpb.MetricWithMetadatas{}
union = &encoding.UnaggregatedMessageUnion{}
)
for {
msg, err := c.Message()
if err != nil {
if !errors.Is(err, io.EOF) {
s.logger.Error("could not read message", zap.Error(err))
}
break
}
type messageProcessor struct {
pb metricpb.MetricWithMetadatas
union encoding.UnaggregatedMessageUnion
aggregator aggregator.Aggregator
logger *zap.Logger
}

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(pb)
if err = s.handleMessage(pb, union, msg); err != nil {
s.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", pb.String()))
}
func (m *messageProcessor) Process(msg consumer.Message) {
if err := m.handleMessage(&m.pb, &m.union, msg); err != nil {
m.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", m.pb.String()))
}
c.Close()
}

func (s *server) handleMessage(
func (m *messageProcessor) handleMessage(
pb *metricpb.MetricWithMetadatas,
union *encoding.UnaggregatedMessageUnion,
msg consumer.Message,
) error {
defer msg.Ack()

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(&m.pb)
// Unmarshal the message.
if err := pb.Unmarshal(msg.Bytes()); err != nil {
return err
Expand All @@ -104,46 +91,48 @@ func (s *server) handleMessage(
return err
}
u := union.CounterWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS:
err := union.BatchTimerWithMetadatas.FromProto(pb.BatchTimerWithMetadatas)
if err != nil {
return err
}
u := union.BatchTimerWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS:
err := union.GaugeWithMetadatas.FromProto(pb.GaugeWithMetadatas)
if err != nil {
return err
}
u := union.GaugeWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
err := union.ForwardedMetricWithMetadata.FromProto(pb.ForwardedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddForwarded(
return m.aggregator.AddForwarded(
union.ForwardedMetricWithMetadata.ForwardedMetric,
union.ForwardedMetricWithMetadata.ForwardMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA:
err := union.TimedMetricWithMetadata.FromProto(pb.TimedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddTimed(
return m.aggregator.AddTimed(
union.TimedMetricWithMetadata.Metric,
union.TimedMetricWithMetadata.TimedMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS:
err := union.TimedMetricWithMetadatas.FromProto(pb.TimedMetricWithMetadatas)
if err != nil {
return err
}
return s.aggregator.AddTimedWithStagedMetadatas(
return m.aggregator.AddTimedWithStagedMetadatas(
union.TimedMetricWithMetadatas.Metric,
union.TimedMetricWithMetadatas.StagedMetadatas)
default:
return fmt.Errorf("unrecognized message type: %v", pb.Type)
}
}

func (m *messageProcessor) Close() {}
2 changes: 1 addition & 1 deletion src/cmd/services/m3coordinator/server/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c handlerConfiguration) newHandler(
ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts),
BlockholePolicies: c.BlackholePolicies,
})
return consumer.NewMessageHandler(p, cOpts), nil
return consumer.NewMessageHandler(consumer.SingletonMessageProcessor(p), cOpts), nil
}

// NewOptions creates handler options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down
29 changes: 18 additions & 11 deletions src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (l *listener) Accept() (Consumer, error) {
return nil, err
}

return newConsumer(conn, l.msgPool, l.opts, l.m), nil
return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor), nil
}

type metrics struct {
Expand Down Expand Up @@ -97,18 +97,20 @@ type consumer struct {
w xio.ResettableWriter
conn net.Conn

ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
messageProcessor MessageProcessor
}

func newConsumer(
conn net.Conn,
mPool *messagePool,
opts Options,
m metrics,
newMessageProcessorFn NewMessageProcessorFn,
) *consumer {
var (
wOpts = xio.ResettableWriterOptions{
Expand All @@ -126,11 +128,12 @@ func newConsumer(
decoder: proto.NewDecoder(
conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(),
),
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
messageProcessor: newMessageProcessorFn(),
}
}

Expand All @@ -141,6 +144,9 @@ func (c *consumer) Init() {
c.wg.Done()
}()
}
func (c *consumer) process(m Message) {
c.messageProcessor.Process(m)
}

func (c *consumer) Message() (Message, error) {
m := c.mPool.Get()
Expand Down Expand Up @@ -230,6 +236,7 @@ func (c *consumer) Close() {
close(c.doneCh)
c.wg.Wait()
c.conn.Close()
c.messageProcessor.Close()
}

type message struct {
Expand Down
58 changes: 21 additions & 37 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,38 @@ import (
"go.uber.org/zap"
)

type consumerHandler struct {
opts Options
mPool *messagePool
consumeFn ConsumeFn
m metrics
}

// NewConsumerHandler creates a new server handler with consumerFn.
func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &consumerHandler{
consumeFn: consumeFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *consumerHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c.Init()
h.consumeFn(c)
type messageHandler struct {
opts Options
mPool *messagePool
newMessageProcessorFn NewMessageProcessorFn
m metrics
}

func (h *consumerHandler) Close() {}
// NewMessageProcessorFn creates a new MessageProcessor scoped to a single connection. Messages are processed serially
// in a connection.
type NewMessageProcessorFn func() MessageProcessor

type messageHandler struct {
opts Options
mPool *messagePool
mp MessageProcessor
m metrics
// SingletonMessageProcessor uses the same MessageProcessor for all connections.
func SingletonMessageProcessor(p MessageProcessor) NewMessageProcessorFn {
return func() MessageProcessor {
return p
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to types.go

}

// NewMessageHandler creates a new server handler with messageFn.
func NewMessageHandler(mp MessageProcessor, opts Options) server.Handler {
func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &messageHandler{
mp: mp,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
newMessageProcessorFn: newMessageProcessorFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *messageHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c := newConsumer(conn, h.mPool, h.opts, h.m, h.newMessageProcessorFn)
c.Init()
var (
msgErr error
Expand All @@ -87,12 +71,12 @@ func (h *messageHandler) Handle(conn net.Conn) {
if msgErr != nil {
break
}
h.mp.Process(msg)
c.process(msg)
}
if msgErr != nil && msgErr != io.EOF {
h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer")
}
c.Close()
}

func (h *messageHandler) Close() { h.mp.Close() }
func (h *messageHandler) Close() {}
Loading