Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Add passthrough functionality in m3aggregator using rawtcp server #2235

Merged
merged 5 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ aggregator:
flushInterval: 1s
writeBufferSize: 16384
readBufferSize: 256
passthrough:
enabled: true
forwarding:
maxConstDelay: 1m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
entryTTL: 1h
Expand Down
140 changes: 114 additions & 26 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/aggregator/client"
"github.com/m3db/m3/src/aggregator/sharding"
"github.com/m3db/m3/src/cluster/placement"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -77,6 +79,9 @@ type Aggregator interface {
// AddForwarded adds a forwarded metric with metadata.
AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error

// AddPassthrough adds a passthrough metric with storage policy.
AddPassthrough(metric aggregated.Metric, storagePolicy policy.StoragePolicy) error

// Resign stops the aggregator from participating in leader election and resigns
// from ongoing campaign if any.
Resign() error
Expand All @@ -103,6 +108,7 @@ type aggregator struct {
electionManager ElectionManager
flushManager FlushManager
flushHandler handler.Handler
passthroughWriter writer.Writer
adminClient client.AdminClient
resignTimeout time.Duration

Expand Down Expand Up @@ -137,6 +143,7 @@ func NewAggregator(opts Options) Aggregator {
electionManager: opts.ElectionManager(),
flushManager: opts.FlushManager(),
flushHandler: opts.FlushHandler(),
passthroughWriter: opts.PassthroughWriter(),
adminClient: opts.AdminClient(),
resignTimeout: opts.ResignTimeout(),
doneCh: make(chan struct{}),
Expand Down Expand Up @@ -257,6 +264,43 @@ func (agg *aggregator) AddForwarded(
return nil
}

func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
callStart := agg.nowFn()
agg.metrics.passthrough.Inc(1)

if agg.electionManager.ElectionState() == FollowerState {
agg.metrics.addPassthrough.ReportFollowerNoop()
return nil
}

pw, err := agg.passWriter()
if err != nil {
agg.metrics.addPassthrough.ReportError(err)
return err
}

mp := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
StoragePolicy: storagePolicy,
}

if err := pw.Write(mp); err != nil {
agg.metrics.addPassthrough.ReportError(err)
return err
}
agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart))
return nil
}

func (agg *aggregator) Resign() error {
ctx, cancel := context.WithTimeout(context.Background(), agg.resignTimeout)
defer cancel()
Expand Down Expand Up @@ -284,13 +328,29 @@ func (agg *aggregator) Close() error {
agg.closeShardSetWithLock()
}
agg.flushHandler.Close()
agg.passthroughWriter.Close()
if agg.adminClient != nil {
agg.adminClient.Close()
}
agg.state = aggregatorClosed
return nil
}

func (agg *aggregator) passWriter() (writer.Writer, error) {
agg.RLock()
defer agg.RUnlock()

if agg.state != aggregatorOpen {
return nil, errAggregatorNotOpenOrClosed
}

if agg.electionManager.ElectionState() == FollowerState {
return writer.NewBlackholeWriter(), nil
}

return agg.passthroughWriter, nil
}

func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
agg.RLock()
shard, err := agg.shardForWithLock(id, noUpdateShards)
Expand Down Expand Up @@ -760,6 +820,29 @@ func (m *aggregatorAddTimedMetrics) ReportError(err error) {
}
}

type aggregatorAddPassthroughMetrics struct {
aggregatorAddMetricMetrics
followerNoop tally.Counter
}

func newAggregatorAddPassthroughMetrics(
scope tally.Scope,
samplingRate float64,
) aggregatorAddPassthroughMetrics {
return aggregatorAddPassthroughMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
followerNoop: scope.Counter("follower-noop"),
}
}

func (m *aggregatorAddPassthroughMetrics) ReportError(err error) {
m.aggregatorAddMetricMetrics.ReportError(err)
}

func (m *aggregatorAddPassthroughMetrics) ReportFollowerNoop() {
m.followerNoop.Inc(1)
}

type latencyBucketKey struct {
resolution time.Duration
numForwardedTimes int
Expand Down Expand Up @@ -930,19 +1013,21 @@ func newAggregatorShardSetIDMetrics(scope tally.Scope) aggregatorShardSetIDMetri
}

type aggregatorMetrics struct {
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
passthrough tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
addPassthrough aggregatorAddPassthroughMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
}

func newAggregatorMetrics(
Expand All @@ -953,24 +1038,27 @@ func newAggregatorMetrics(
addUntimedScope := scope.SubScope("addUntimed")
addTimedScope := scope.SubScope("addTimed")
addForwardedScope := scope.SubScope("addForwarded")
addPassthroughScope := scope.SubScope("addPassthrough")
placementScope := scope.SubScope("placement")
shardsScope := scope.SubScope("shards")
shardSetIDScope := scope.SubScope("shard-set-id")
tickScope := scope.SubScope("tick")
return aggregatorMetrics{
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
passthrough: scope.Counter("passthrough"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
addPassthrough: newAggregatorAddPassthroughMetrics(addPassthroughScope, samplingRate),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
}
}

Expand Down
33 changes: 33 additions & 0 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ var (
TimeNanos: 12345,
Values: []float64{76109, 23891},
}
testPassthroughMetric = aggregated.Metric{
Type: metric.CounterType,
ID: []byte("testPassthrough"),
TimeNanos: 12345,
Value: 1000,
}
testInvalidMetric = unaggregated.MetricUnion{
Type: metric.UnknownType,
ID: []byte("testInvalid"),
Expand Down Expand Up @@ -126,6 +132,7 @@ var (
SourceID: 1234,
NumForwardedTimes: 3,
}
testPassthroughStroagePolicy = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour)
)

func TestAggregatorOpenAlreadyOpen(t *testing.T) {
Expand Down Expand Up @@ -664,6 +671,25 @@ func TestAggregatorResignSuccess(t *testing.T) {
require.NoError(t, agg.Resign())
}

func TestAggregatorAddPassthroughNotOpen(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

agg, _ := testAggregator(t, ctrl)
err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy)
require.Equal(t, errAggregatorNotOpenOrClosed, err)
}

func TestAggregatorAddPassthroughSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

agg, _ := testAggregator(t, ctrl)
require.NoError(t, agg.Open())
err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy)
require.NoError(t, err)
}

func TestAggregatorStatus(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1100,6 +1126,7 @@ func testOptions(ctrl *gomock.Controller) Options {
electionMgr.EXPECT().Reset().Return(nil).AnyTimes()
electionMgr.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes()
electionMgr.EXPECT().Close().Return(nil).AnyTimes()
electionMgr.EXPECT().ElectionState().Return(LeaderState).AnyTimes()

flushManager := NewMockFlushManager(ctrl)
flushManager.EXPECT().Reset().Return(nil).AnyTimes()
Expand All @@ -1117,6 +1144,11 @@ func testOptions(ctrl *gomock.Controller) Options {
h.EXPECT().NewWriter(gomock.Any()).Return(w, nil).AnyTimes()
h.EXPECT().Close().AnyTimes()

pw := writer.NewMockWriter(ctrl)
pw.EXPECT().Write(gomock.Any()).Return(nil).AnyTimes()
pw.EXPECT().Flush().Return(nil).AnyTimes()
pw.EXPECT().Close().Return(nil).AnyTimes()

cl := client.NewMockAdminClient(ctrl)
cl.EXPECT().Flush().Return(nil).AnyTimes()
cl.EXPECT().Close().AnyTimes()
Expand All @@ -1133,6 +1165,7 @@ func testOptions(ctrl *gomock.Controller) Options {
SetElectionManager(electionMgr).
SetFlushManager(flushManager).
SetFlushHandler(h).
SetPassthroughWriter(pw).
SetAdminClient(cl).
SetMaxAllowedForwardingDelayFn(infiniteAllowedDelayFn).
SetBufferForFutureTimedMetric(math.MaxInt64).
Expand Down
52 changes: 40 additions & 12 deletions src/aggregator/aggregator/capture/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ import (
type aggregator struct {
sync.RWMutex

numMetricsAdded int
countersWithMetadatas []unaggregated.CounterWithMetadatas
batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas
gaugesWithMetadatas []unaggregated.GaugeWithMetadatas
forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata
timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata
timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas
numMetricsAdded int
countersWithMetadatas []unaggregated.CounterWithMetadatas
batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas
gaugesWithMetadatas []unaggregated.GaugeWithMetadatas
forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata
timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata
timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas
passthroughMetricsWithMetadata []aggregated.PassthroughMetricWithMetadata
}

// NewAggregator creates a new capturing aggregator.
Expand Down Expand Up @@ -152,6 +153,26 @@ func (agg *aggregator) AddForwarded(
return nil
}

func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
// Clone the metric and timed metadata to ensure it cannot be mutated externally.
metric = cloneTimedMetric(metric)
storagePolicy = cloneStoragePolicy(storagePolicy)

agg.Lock()
defer agg.Unlock()

pm := aggregated.PassthroughMetricWithMetadata{
Metric: metric,
StoragePolicy: storagePolicy,
}
agg.passthroughMetricsWithMetadata = append(agg.passthroughMetricsWithMetadata, pm)
agg.numMetricsAdded++
return nil
}

func (agg *aggregator) Resign() error { return nil }
func (agg *aggregator) Status() aggr.RuntimeStatus { return aggr.RuntimeStatus{} }
func (agg *aggregator) Close() error { return nil }
Expand All @@ -167,17 +188,19 @@ func (agg *aggregator) Snapshot() SnapshotResult {
agg.Lock()

result := SnapshotResult{
CountersWithMetadatas: agg.countersWithMetadatas,
BatchTimersWithMetadatas: agg.batchTimersWithMetadatas,
GaugesWithMetadatas: agg.gaugesWithMetadatas,
ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata,
TimedMetricWithMetadata: agg.timedMetricsWithMetadata,
CountersWithMetadatas: agg.countersWithMetadatas,
BatchTimersWithMetadatas: agg.batchTimersWithMetadatas,
GaugesWithMetadatas: agg.gaugesWithMetadatas,
ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata,
TimedMetricWithMetadata: agg.timedMetricsWithMetadata,
PassthroughMetricWithMetadata: agg.passthroughMetricsWithMetadata,
}
agg.countersWithMetadatas = nil
agg.batchTimersWithMetadatas = nil
agg.gaugesWithMetadatas = nil
agg.forwardedMetricsWithMetadata = nil
agg.timedMetricsWithMetadata = nil
agg.passthroughMetricsWithMetadata = nil
agg.numMetricsAdded = 0

agg.Unlock()
Expand Down Expand Up @@ -263,3 +286,8 @@ func cloneTimedMetadata(meta metadata.TimedMetadata) metadata.TimedMetadata {
cloned := meta
return cloned
}

func cloneStoragePolicy(sp policy.StoragePolicy) policy.StoragePolicy {
cloned := sp
return cloned
}
Loading