Skip to content

Commit

Permalink
[aggregator] m3aggregator with pass-through functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Siyu Yang authored and prateek committed Feb 5, 2020
1 parent 6db9eee commit c439889
Show file tree
Hide file tree
Showing 23 changed files with 1,363 additions and 52 deletions.
137 changes: 111 additions & 26 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/aggregator/client"
"github.com/m3db/m3/src/aggregator/sharding"
"github.com/m3db/m3/src/cluster/placement"
Expand Down Expand Up @@ -58,6 +59,7 @@ var (
errInvalidMetricType = errors.New("invalid metric type")
errActivePlacementChanged = errors.New("active placement has changed")
errShardNotOwned = errors.New("aggregator shard is not owned")
errPassThroughWriterNotDefined = errors.New("passthrough writer is not defined")
)

// Aggregator aggregates different types of metrics.
Expand All @@ -74,6 +76,9 @@ type Aggregator interface {
// AddForwarded adds a forwarded metric with metadata.
AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error

// AddPassThrough add a pass-through metric with metadata.
AddPassThrough(metric aggregated.Metric, metadata metadata.TimedMetadata) error

// Resign stops the aggregator from participating in leader election and resigns
// from ongoing campaign if any.
Resign() error
Expand All @@ -100,6 +105,7 @@ type aggregator struct {
electionManager ElectionManager
flushManager FlushManager
flushHandler handler.Handler
passThroughWriter writer.Writer
adminClient client.AdminClient
resignTimeout time.Duration

Expand Down Expand Up @@ -134,6 +140,7 @@ func NewAggregator(opts Options) Aggregator {
electionManager: opts.ElectionManager(),
flushManager: opts.FlushManager(),
flushHandler: opts.FlushHandler(),
passThroughWriter: opts.PassThroughWriter(),
adminClient: opts.AdminClient(),
resignTimeout: opts.ResignTimeout(),
doneCh: make(chan struct{}),
Expand Down Expand Up @@ -235,6 +242,59 @@ func (agg *aggregator) AddForwarded(
return nil
}

func (agg *aggregator) passWriter() (writer.Writer, error) {
agg.RLock()
defer agg.RUnlock()

if agg.state != aggregatorOpen {
return nil, errAggregatorNotOpenOrClosed
}

passThroughWriter := agg.passThroughWriter
if passThroughWriter == nil {
return nil, errPassThroughWriterNotDefined
}

return passThroughWriter, nil
}

func (agg *aggregator) AddPassThrough(
metric aggregated.Metric,
metadata metadata.TimedMetadata,
) error {
callStart := agg.nowFn()
agg.metrics.passThrough.Inc(1)

passThroughWriter, err := agg.passWriter()
if err != nil {
agg.metrics.addPassThrough.ReportError(err)
return err
}

mp := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: id.ChunkedID{
// nb: we explicitly choose to set prefix/suffix to nil here as
// we don't modify the name of incoming carbon metrics.
Prefix: nil,
Suffix: nil,
Data: metric.ID,
},
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
StoragePolicy: metadata.StoragePolicy,
}
if err := passThroughWriter.Write(mp); err != nil {
agg.metrics.addPassThrough.ReportError(err)
return err
}

callEnd := agg.nowFn()
agg.metrics.addPassThrough.ReportSuccess(callEnd.Sub(callStart))
return nil
}

func (agg *aggregator) Resign() error {
ctx, cancel := context.WithTimeout(context.Background(), agg.resignTimeout)
defer cancel()
Expand Down Expand Up @@ -262,6 +322,9 @@ func (agg *aggregator) Close() error {
agg.closeShardSetWithLock()
}
agg.flushHandler.Close()
if agg.passThroughWriter != nil {
agg.passThroughWriter.Close()
}
if agg.adminClient != nil {
agg.adminClient.Close()
}
Expand Down Expand Up @@ -802,6 +865,23 @@ func (m *aggregatorAddForwardedMetrics) ReportForwardingLatency(
histogram.RecordDuration(duration)
}

type aggregatorAddPassThroughMetrics struct {
aggregatorAddMetricMetrics
}

func newAggregatorAddPassThroughMetrics(
scope tally.Scope,
samplingRate float64,
) aggregatorAddPassThroughMetrics {
return aggregatorAddPassThroughMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
}
}

func (m *aggregatorAddPassThroughMetrics) ReportError(err error) {
m.aggregatorAddMetricMetrics.ReportError(err)
}

type tickMetricsForMetricCategory struct {
scope tally.Scope
activeEntries tally.Gauge
Expand Down Expand Up @@ -908,19 +988,21 @@ func newAggregatorShardSetIDMetrics(scope tally.Scope) aggregatorShardSetIDMetri
}

type aggregatorMetrics struct {
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
passThrough tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
addPassThrough aggregatorAddPassThroughMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
}

func newAggregatorMetrics(
Expand All @@ -931,24 +1013,27 @@ func newAggregatorMetrics(
addUntimedScope := scope.SubScope("addUntimed")
addTimedScope := scope.SubScope("addTimed")
addForwardedScope := scope.SubScope("addForwarded")
addPassThroughScope := scope.SubScope("addPassThrough")
placementScope := scope.SubScope("placement")
shardsScope := scope.SubScope("shards")
shardSetIDScope := scope.SubScope("shard-set-id")
tickScope := scope.SubScope("tick")
return aggregatorMetrics{
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
passThrough: scope.Counter("passthrough"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
addPassThrough: newAggregatorAddPassThroughMetrics(addPassThroughScope, samplingRate),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
}
}

Expand Down
142 changes: 140 additions & 2 deletions src/aggregator/aggregator/aggregator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c439889

Please sign in to comment.