Skip to content

Commit

Permalink
[proto] Allow zero-alloc reuse of AggregatedMetric protobuf payloads (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Oct 30, 2020
1 parent e159616 commit 9681fcd
Show file tree
Hide file tree
Showing 28 changed files with 412 additions and 386 deletions.
5 changes: 2 additions & 3 deletions src/aggregator/aggregator/handler/writer/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ func TestProtobufWriterWrite(t *testing.T) {
actualData := make(map[uint32][]decodeData)
writer.p.(*producer.MockProducer).EXPECT().Produce(gomock.Any()).Do(func(m producer.Message) error {
d := protobuf.NewAggregatedDecoder(nil)
d.Decode(m.Bytes())
require.NoError(t, d.Decode(m.Bytes()))
s := m.Shard()
sp, err := d.StoragePolicy()
require.NoError(t, err)
sp := d.StoragePolicy()
actualData[s] = append(actualData[s], decodeData{
MetricWithStoragePolicy: aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
Expand Down
13 changes: 1 addition & 12 deletions src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type handlerMetrics struct {
metricAccepted tally.Counter
droppedMetricBlackholePolicy tally.Counter
droppedMetricDecodeError tally.Counter
droppedMetricDecodeMalformed tally.Counter
}

func newHandlerMetrics(scope tally.Scope) handlerMetrics {
Expand All @@ -61,9 +60,6 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics {
droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{
"reason": "blackhole-policy",
}).Counter("dropped"),
droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{
"reason": "decode-malformed",
}).Counter("dropped"),
}
}

Expand Down Expand Up @@ -111,18 +107,11 @@ func (h *pbHandler) Process(msg consumer.Message) {
h.m.droppedMetricDecodeError.Inc(1)
return
}
sp, err := dec.StoragePolicy()
if err != nil {
h.logger.Error("invalid storage policy", zap.Error(err))
h.m.droppedMetricDecodeMalformed.Inc(1)
return
}

h.m.metricAccepted.Inc(1)

h.wg.Add(1)
r := NewProtobufCallback(msg, dec, h.wg)

sp := dec.StoragePolicy()
// If storage policy is blackholed, ack the message immediately and don't
// bother passing down the write path.
for _, blackholeSp := range h.blackholePolicies {
Expand Down
8 changes: 4 additions & 4 deletions src/collector/integration/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func defaultMappingRulesConfig() []*rulepb.MappingRule {
Filter: "mtagName1:mtagValue1",
StoragePolicies: []*policypb.StoragePolicy{
&policypb.StoragePolicy{
Resolution: &policypb.Resolution{
Resolution: policypb.Resolution{
WindowSize: int64(10 * time.Second),
Precision: int64(time.Second),
},
Retention: &policypb.Retention{
Retention: policypb.Retention{
Period: int64(24 * time.Hour),
},
},
Expand Down Expand Up @@ -142,11 +142,11 @@ func defaultRollupRulesConfig() []*rulepb.RollupRule {
},
StoragePolicies: []*policypb.StoragePolicy{
&policypb.StoragePolicy{
Resolution: &policypb.Resolution{
Resolution: policypb.Resolution{
WindowSize: int64(time.Minute),
Precision: int64(time.Minute),
},
Retention: &policypb.Retention{
Retention: policypb.Retention{
Period: int64(48 * time.Hour),
},
},
Expand Down
13 changes: 9 additions & 4 deletions src/metrics/encoding/protobuf/aggregated_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type AggregatedDecoder struct {
pool AggregatedDecoderPool
pb metricpb.AggregatedMetric
sp policy.StoragePolicy
}

// NewAggregatedDecoder creates an aggregated decoder.
Expand All @@ -40,7 +41,10 @@ func NewAggregatedDecoder(p AggregatedDecoderPool) *AggregatedDecoder {

// Decode decodes the aggregated metric from the given bytes.
func (d *AggregatedDecoder) Decode(b []byte) error {
return d.pb.Unmarshal(b)
if err := d.pb.Unmarshal(b); err != nil {
return err
}
return d.sp.FromProto(d.pb.Metric.StoragePolicy)
}

// ID returns the decoded id.
Expand All @@ -59,8 +63,8 @@ func (d AggregatedDecoder) Value() float64 {
}

// StoragePolicy returns the decoded storage policy.
func (d AggregatedDecoder) StoragePolicy() (policy.StoragePolicy, error) {
return policy.NewStoragePolicyFromProto(&d.pb.Metric.StoragePolicy)
func (d AggregatedDecoder) StoragePolicy() policy.StoragePolicy {
return d.sp
}

// EncodeNanos returns the decoded encodeNanos.
Expand All @@ -70,7 +74,8 @@ func (d AggregatedDecoder) EncodeNanos() int64 {

// Close closes the decoder.
func (d *AggregatedDecoder) Close() {
resetAggregatedMetricProto(&d.pb)
d.sp = policy.StoragePolicy{}
ReuseAggregatedMetricProto(&d.pb)
if d.pool != nil {
d.pool.Put(d)
}
Expand Down
49 changes: 49 additions & 0 deletions src/metrics/encoding/protobuf/aggregated_decoder_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package protobuf

import (
"runtime"
"testing"

"github.com/m3db/m3/src/metrics/policy"
)

func BenchmarkDecodeStoragePolicy(b *testing.B) {
var (
enc = NewAggregatedEncoder(nil)
dec = NewAggregatedDecoder(nil)
sp policy.StoragePolicy
)
if err := enc.Encode(testAggregatedMetric1, 2000); err != nil {
b.Fatal(err)
}

buf := enc.Buffer().Bytes()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_ = dec.Decode(buf)
sp = dec.StoragePolicy()
dec.Close()
}
runtime.KeepAlive(sp)
}
2 changes: 1 addition & 1 deletion src/metrics/encoding/protobuf/aggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (enc *aggregatedEncoder) Encode(
m aggregated.MetricWithStoragePolicy,
encodedAtNanos int64,
) error {
resetAggregatedMetricProto(&enc.pb)
ReuseAggregatedMetricProto(&enc.pb)
if err := m.ToProto(&enc.pb.Metric); err != nil {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions src/metrics/encoding/protobuf/aggregated_roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func TestAggregatedEncoderDecoder_RoundTrip(t *testing.T) {
require.NoError(t, enc.Encode(testAggregatedMetric1, 2000))
require.NoError(t, dec.Decode(enc.Buffer().Bytes()))
require.Equal(t, int64(2000), dec.EncodeNanos())
sp, err := dec.StoragePolicy()
require.NoError(t, err)
sp := dec.StoragePolicy()
require.Equal(t, testAggregatedMetric1.StoragePolicy, sp)
require.Equal(t, string(testAggregatedMetric1.ID), string(dec.ID()))
require.Equal(t, testAggregatedMetric1.TimeNanos, dec.TimeNanos())
Expand All @@ -77,8 +76,7 @@ func TestAggregatedEncoderDecoder_WithBytesPool(t *testing.T) {
require.NoError(t, enc.Encode(testAggregatedMetric1, 2000))
require.NoError(t, dec.Decode(enc.Buffer().Bytes()))
require.Equal(t, int64(2000), dec.EncodeNanos())
sp, err := dec.StoragePolicy()
require.NoError(t, err)
sp := dec.StoragePolicy()
require.Equal(t, testAggregatedMetric1.StoragePolicy, sp)
require.Equal(t, string(testAggregatedMetric1.ID), string(dec.ID()))
require.Equal(t, testAggregatedMetric1.TimeNanos, dec.TimeNanos())
Expand All @@ -91,8 +89,7 @@ func TestAggregatedEncoderDecoder_ResetProtobuf(t *testing.T) {
require.NoError(t, enc.Encode(testAggregatedMetric1, 2000))
require.NoError(t, dec.Decode(enc.Buffer().Bytes()))
require.Equal(t, int64(2000), dec.EncodeNanos())
sp, err := dec.StoragePolicy()
require.NoError(t, err)
sp := dec.StoragePolicy()
require.Equal(t, testAggregatedMetric1.StoragePolicy, sp)
require.Equal(t, string(testAggregatedMetric1.ID), string(dec.ID()))
require.Equal(t, testAggregatedMetric1.TimeNanos, dec.TimeNanos())
Expand All @@ -105,8 +102,7 @@ func TestAggregatedEncoderDecoder_ResetProtobuf(t *testing.T) {
require.NoError(t, enc.Encode(testAggregatedMetric2, 3000))
require.NoError(t, dec.Decode(enc.Buffer().Bytes()))
require.Equal(t, int64(3000), dec.EncodeNanos())
sp, err = dec.StoragePolicy()
require.NoError(t, err)
sp = dec.StoragePolicy()
require.Equal(t, testAggregatedMetric2.StoragePolicy, sp)
require.Equal(t, string(testAggregatedMetric2.ID), string(dec.ID()))
require.Equal(t, testAggregatedMetric2.TimeNanos, dec.TimeNanos())
Expand Down
18 changes: 10 additions & 8 deletions src/metrics/encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ import (
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
)

func resetAggregatedMetricProto(pb *metricpb.AggregatedMetric) {
if pb == nil {
return
}
resetTimedMetricWithStoragePolicyProto(&pb.Metric)
pb.EncodeNanos = 0
}

// ReuseMetricWithMetadatasProto allows for zero-alloc reuse of
// *metricpb.MetricWithMetadatas by deep resetting the internal slices
// and when using gogoprotobuf's unmarshal function will reuse the slices
Expand Down Expand Up @@ -60,6 +52,16 @@ func ReuseMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetTimedMetricWithStoragePolicyProto(pb.TimedMetricWithStoragePolicy)
}

// ReuseAggregatedMetricProto allows for zero-alloc reuse of
// *metricpb.AggregatedMetric
func ReuseAggregatedMetricProto(pb *metricpb.AggregatedMetric) {
if pb == nil {
return
}
resetTimedMetricWithStoragePolicyProto(&pb.Metric)
pb.EncodeNanos = 0
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
if pb == nil {
return
Expand Down
8 changes: 4 additions & 4 deletions src/metrics/encoding/protobuf/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ func TestReuseMetricWithMetadatasProtoNilProto(t *testing.T) {
require.NotPanics(t, func() { ReuseMetricWithMetadatasProto(nil) })
}

func TestResetAggregatedMetricProto(t *testing.T) {
func TestReuseAggregatedMetricProto(t *testing.T) {
input := &metricpb.AggregatedMetric{
Metric: metricpb.TimedMetricWithStoragePolicy{
TimedMetric: testTimedMetricBeforeResetProto,
StoragePolicy: policypb.StoragePolicy{
Resolution: &policypb.Resolution{
Resolution: policypb.Resolution{
WindowSize: 10 * time.Second.Nanoseconds(),
Precision: time.Second.Nanoseconds(),
},
Retention: &policypb.Retention{
Retention: policypb.Retention{
Period: (6 * time.Hour).Nanoseconds(),
},
},
},
EncodeNanos: 1234,
}
resetAggregatedMetricProto(input)
ReuseAggregatedMetricProto(input)
require.Equal(t, metricpb.AggregatedMetric{
Metric: metricpb.TimedMetricWithStoragePolicy{
TimedMetric: metricpb.TimedMetric{Id: []byte{}},
Expand Down
Loading

0 comments on commit 9681fcd

Please sign in to comment.