Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Add M3Msg client and server for M3Aggregator #2171

Merged
merged 17 commits into from
Apr 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 246 additions & 5 deletions src/aggregator/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ import (
"github.com/m3db/m3/src/aggregator/sharding"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -129,6 +133,7 @@ func newClientMetrics(scope tally.Scope, sampleRate float64) clientMetrics {
type client struct {
sync.RWMutex

aggregatorClientType AggregatorClientType
opts Options
nowFn clock.NowFn
shardCutoverWarmupDuration time.Duration
Expand All @@ -137,11 +142,22 @@ type client struct {
shardFn sharding.ShardFn
placementWatcher placement.StagedPlacementWatcher
state clientState
m3msg m3msgClient
metrics clientMetrics
}

type m3msgClient struct {
messagePool *messagePool
producer producer.Producer
numShards uint32
}

// NewClient creates a new client.
func NewClient(opts Options) Client {
func NewClient(opts Options) (Client, error) {
if err := opts.Validate(); err != nil {
return nil, err
}

var (
instrumentOpts = opts.InstrumentOptions()
writerMgrScope = instrumentOpts.MetricsScope().SubScope("writer-manager")
Expand All @@ -165,16 +181,41 @@ func NewClient(opts Options) Client {
placementWatcherOpts := opts.StagedPlacementWatcherOptions().SetActiveStagedPlacementOptions(activeStagedPlacementOpts)
placementWatcher := placement.NewStagedPlacementWatcher(placementWatcherOpts)

return &client{
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
c := &client{
aggregatorClientType: opts.AggregatorClientType(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
shardCutoverWarmupDuration: opts.ShardCutoverWarmupDuration(),
shardCutoffLingerDuration: opts.ShardCutoffLingerDuration(),
writerMgr: writerMgr,
shardFn: opts.ShardFn(),
placementWatcher: placementWatcher,
metrics: newClientMetrics(instrumentOpts.MetricsScope(), instrumentOpts.MetricsSamplingRate()),
}
switch c.aggregatorClientType {
case LegacyAggregatorClient:
// Nothing more to do.
case M3MsgAggregatorClient:
m3msgOpts := opts.M3MsgOptions()
if err := m3msgOpts.Validate(); err != nil {
return nil, err
}

producer := m3msgOpts.Producer()
if err := producer.Init(); err != nil {
return nil, err
}

c.m3msg = m3msgClient{
messagePool: newMessagePool(c.opts.EncoderOptions()),
producer: producer,
numShards: producer.NumShards(),
}
default:
return nil, fmt.Errorf("unrecognized client type: %v", c.aggregatorClientType)
}

return c, nil
}

func (c *client) Init() error {
Expand Down Expand Up @@ -293,12 +334,28 @@ func (c *client) Close() error {
if c.state != clientInitialized {
return errClientIsUninitializedOrClosed
}

if c.aggregatorClientType == M3MsgAggregatorClient {
c.m3msg.producer.Close(producer.WaitForConsumption)
}

c.state = clientClosed
c.placementWatcher.Unwatch() // nolint: errcheck
return c.writerMgr.Close()
}

func (c *client) write(metricID id.RawID, timeNanos int64, payload payloadUnion) error {
switch c.aggregatorClientType {
case LegacyAggregatorClient:
return c.writeLegacy(metricID, timeNanos, payload)
case M3MsgAggregatorClient:
return c.writeM3Msg(metricID, timeNanos, payload)
default:
return fmt.Errorf("unrecognized client type: %v", c.aggregatorClientType)
}
}

func (c *client) writeLegacy(metricID id.RawID, timeNanos int64, payload payloadUnion) error {
c.RLock()
if c.state != clientInitialized {
c.RUnlock()
Expand Down Expand Up @@ -338,12 +395,30 @@ func (c *client) write(metricID id.RawID, timeNanos int64, payload payloadUnion)
multiErr = multiErr.Add(err)
}
}

onPlacementDoneFn()
onStagedPlacementDoneFn()
c.RUnlock()
return multiErr.FinalError()
}

func (c *client) writeM3Msg(metricID id.RawID, timeNanos int64, payload payloadUnion) error {
shard := c.shardFn(metricID, c.m3msg.numShards)

msg := c.m3msg.messagePool.Get()
if err := msg.Encode(shard, payload); err != nil {
msg.Finalize(producer.Dropped)
return err
}

if err := c.m3msg.producer.Produce(msg); err != nil {
msg.Finalize(producer.Dropped)
return err
}

return nil
}

func (c *client) shouldWriteForShard(nowNanos int64, shard shard.Shard) bool {
writeEarliestNanos, writeLatestNanos := c.writeTimeRangeFor(shard)
return nowNanos >= writeEarliestNanos && nowNanos <= writeLatestNanos
Expand All @@ -364,4 +439,170 @@ func (c *client) writeTimeRangeFor(shard shard.Shard) (int64, int64) {
return earliestNanos, latestNanos
}

func (c *client) nowNanos() int64 { return c.nowFn().UnixNano() }
func (c *client) nowNanos() int64 {
return c.nowFn().UnixNano()
}

type messagePool struct {
pool sync.Pool
}

func newMessagePool(
encoderOpts protobuf.UnaggregatedOptions,
) *messagePool {
p := &messagePool{}
p.pool.New = func() interface{} {
return newMessage(encoderOpts, p)
}
return p
}

func (m *messagePool) Get() *message {
return m.pool.Get().(*message)
}

func (m *messagePool) Put(msg *message) {
m.pool.Put(msg)
}

// Ensure message implements m3msg producer message interface.
var _ producer.Message = (*message)(nil)

type message struct {
pool *messagePool
shard uint32
encoder protobuf.UnaggregatedEncoder
metric metricpb.MetricWithMetadatas

cm metricpb.CounterWithMetadatas
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
fm metricpb.ForwardedMetricWithMetadata
tm metricpb.TimedMetricWithMetadata

buf []byte
}

func newMessage(
encoderOpts protobuf.UnaggregatedOptions,
pool *messagePool,
) *message {
return &message{
pool: pool,
encoder: protobuf.NewUnaggregatedEncoder(encoderOpts),
}
}

func (m *message) Encode(
shard uint32,
payload payloadUnion,
) error {
m.shard = shard

var pb metricpb.MetricWithMetadatas
switch payload.payloadType {
case untimedType:
switch payload.untimed.metric.Type {
case metric.CounterType:
value := unaggregated.CounterWithMetadatas{
Counter: payload.untimed.metric.Counter(),
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.cm); err != nil {
return err
}

pb = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_COUNTER_WITH_METADATAS,
CounterWithMetadatas: &m.cm,
}
case metric.TimerType:
value := unaggregated.BatchTimerWithMetadatas{
BatchTimer: payload.untimed.metric.BatchTimer(),
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.bm); err != nil {
return err
}

pb = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
BatchTimerWithMetadatas: &m.bm,
}
case metric.GaugeType:
value := unaggregated.GaugeWithMetadatas{
Gauge: payload.untimed.metric.Gauge(),
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.gm); err != nil {
return err
}

pb = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS,
GaugeWithMetadatas: &m.gm,
}
default:
return fmt.Errorf("unrecognized metric type: %v",
payload.untimed.metric.Type)
}
case forwardedType:
value := aggregated.ForwardedMetricWithMetadata{
ForwardedMetric: payload.forwarded.metric,
ForwardMetadata: payload.forwarded.metadata,
}
if err := value.ToProto(&m.fm); err != nil {
return err
}

pb = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &m.fm,
}
case timedType:
value := aggregated.TimedMetricWithMetadata{
Metric: payload.timed.metric,
TimedMetadata: payload.timed.metadata,
}
if err := value.ToProto(&m.tm); err != nil {
return err
}

pb = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
TimedMetricWithMetadata: &m.tm,
}
default:
return fmt.Errorf("unrecognized payload type: %v",
payload.payloadType)
}

size := pb.Size()
if size > cap(m.buf) {
const growthFactor = 1.5
m.buf = make([]byte, int(growthFactor*float64(size)))
robskillington marked this conversation as resolved.
Show resolved Hide resolved
}

// Resize buffer to exactly how long we need for marshalling.
m.buf = m.buf[:size]

_, err := pb.MarshalTo(m.buf)
return err
}

func (m *message) Shard() uint32 {
return m.shard
}

func (m *message) Bytes() []byte {
return m.buf
}

func (m *message) Size() int {
return len(m.buf)
}

func (m *message) Finalize(reason producer.FinalizeReason) {
// Return to pool.
robskillington marked this conversation as resolved.
Show resolved Hide resolved
m.pool.Put(m)
}
Loading