Skip to content

Commit

Permalink
[m3msg] Remove unecessary ConsumeHandler interface
Browse files Browse the repository at this point in the history
The ConsumerHandler is redundant and unncessary. Use the
MessageProcessor interface for the aggregator so it's consistent with
the coordinator.

This will make it easier to add some base metrics to both consumers in
the future.
  • Loading branch information
ryanhall07 committed Nov 10, 2021
1 parent e37f346 commit b968d34
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 123 deletions.
65 changes: 25 additions & 40 deletions src/aggregator/server/m3msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
package m3msg

import (
"errors"
"fmt"
"io"

"github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/metrics/encoding"
Expand All @@ -35,11 +33,6 @@ import (
"go.uber.org/zap"
)

type server struct {
aggregator aggregator.Aggregator
logger *zap.Logger
}

// NewServer creates a new M3Msg server.
func NewServer(
address string,
Expand All @@ -50,48 +43,38 @@ func NewServer(
return nil, err
}

s := &server{
handler := consumer.NewMessageHandler(&messageProcessor{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
}

handler := consumer.NewConsumerHandler(s.Consume, opts.ConsumerOptions())
}, opts.ConsumerOptions())
return xserver.NewServer(address, handler, opts.ServerOptions()), nil
}

func (s *server) Consume(c consumer.Consumer) {
var (
pb = &metricpb.MetricWithMetadatas{}
union = &encoding.UnaggregatedMessageUnion{}
)
for {
msg, err := c.Message()
if err != nil {
if !errors.Is(err, io.EOF) {
s.logger.Error("could not read message", zap.Error(err))
}
break
}
type messageProcessor struct {
pb metricpb.MetricWithMetadatas
union encoding.UnaggregatedMessageUnion
aggregator aggregator.Aggregator
logger *zap.Logger
}

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(pb)
if err = s.handleMessage(pb, union, msg); err != nil {
s.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", pb.String()))
}
func (m *messageProcessor) Process(msg consumer.Message) {
if err := m.handleMessage(&m.pb, &m.union, msg); err != nil {
m.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", m.pb.String()))
}
c.Close()
}

func (s *server) handleMessage(
func (m *messageProcessor) handleMessage(
pb *metricpb.MetricWithMetadatas,
union *encoding.UnaggregatedMessageUnion,
msg consumer.Message,
) error {
defer msg.Ack()

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(&m.pb)
// Unmarshal the message.
if err := pb.Unmarshal(msg.Bytes()); err != nil {
return err
Expand All @@ -104,46 +87,48 @@ func (s *server) handleMessage(
return err
}
u := union.CounterWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS:
err := union.BatchTimerWithMetadatas.FromProto(pb.BatchTimerWithMetadatas)
if err != nil {
return err
}
u := union.BatchTimerWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS:
err := union.GaugeWithMetadatas.FromProto(pb.GaugeWithMetadatas)
if err != nil {
return err
}
u := union.GaugeWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
err := union.ForwardedMetricWithMetadata.FromProto(pb.ForwardedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddForwarded(
return m.aggregator.AddForwarded(
union.ForwardedMetricWithMetadata.ForwardedMetric,
union.ForwardedMetricWithMetadata.ForwardMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA:
err := union.TimedMetricWithMetadata.FromProto(pb.TimedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddTimed(
return m.aggregator.AddTimed(
union.TimedMetricWithMetadata.Metric,
union.TimedMetricWithMetadata.TimedMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS:
err := union.TimedMetricWithMetadatas.FromProto(pb.TimedMetricWithMetadatas)
if err != nil {
return err
}
return s.aggregator.AddTimedWithStagedMetadatas(
return m.aggregator.AddTimedWithStagedMetadatas(
union.TimedMetricWithMetadatas.Metric,
union.TimedMetricWithMetadatas.StagedMetadatas)
default:
return fmt.Errorf("unrecognized message type: %v", pb.Type)
}
}

func (m *messageProcessor) Close() {}
27 changes: 0 additions & 27 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,6 @@ import (
"go.uber.org/zap"
)

type consumerHandler struct {
opts Options
mPool *messagePool
consumeFn ConsumeFn
m metrics
}

// NewConsumerHandler creates a new server handler with consumerFn.
func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &consumerHandler{
consumeFn: consumeFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *consumerHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c.Init()
h.consumeFn(c)
}

func (h *consumerHandler) Close() {}

type messageHandler struct {
opts Options
mPool *messagePool
Expand Down
57 changes: 1 addition & 56 deletions src/msg/consumer/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,59 +86,4 @@ func TestServerWithMessageFn(t *testing.T) {

p.EXPECT().Close()
s.Close()
}

func TestServerWithConsumeFn(t *testing.T) {
defer leaktest.Check(t)()

var (
count = 0
bytes []byte
closed bool
wg sync.WaitGroup
)
consumeFn := func(c Consumer) {
for {
count++
m, err := c.Message()
if err != nil {
break
}
bytes = m.Bytes()
m.Ack()
wg.Done()
}
c.Close()
closed = true
}

l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

// Set a large ack buffer size to make sure the background go routine
// can flush it.
opts := testOptions().SetAckBufferSize(100)
s := server.NewServer("a", NewConsumerHandler(consumeFn, opts), server.NewOptions())
require.NoError(t, err)
s.Serve(l)

conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)

wg.Add(1)
err = produce(conn, &testMsg1)
require.NoError(t, err)

wg.Wait()
require.Equal(t, testMsg1.Value, bytes)

var ack msgpb.Ack
testDecoder := proto.NewDecoder(conn, opts.DecoderOptions(), 10)
err = testDecoder.Decode(&ack)
require.NoError(t, err)
require.Equal(t, 1, len(ack.Metadata))
require.Equal(t, testMsg1.Metadata, ack.Metadata[0])

s.Close()
require.True(t, closed)
}
}

0 comments on commit b968d34

Please sign in to comment.