Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Remove msgpack support #2894

Merged
merged 6 commits into from
Nov 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,7 @@ rawtcp:
maxBackoff: 1s
forever: true
jitter: true
readBufferSize: 1440
msgpackIterator:
ignoreHigherVersion: false
readerBufferSize: 1440
largeFloatsSize: 1024
largeFloatsPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
watermark:
low: 0.001
high: 0.002
readBufferSize: 65536
protobufIterator:
initBufferSize: 1440
maxMessageSize: 50000000 # max message size is 50MB
Expand Down
3 changes: 1 addition & 2 deletions src/aggregator/client/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"time"

"github.com/m3db/m3/src/metrics/encoding"
"github.com/m3db/m3/src/metrics/encoding/migration"
"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric"
Expand Down Expand Up @@ -974,7 +973,7 @@ func testWriterConcurrentWriteStress(
)
for i := 0; i < len(results); i++ {
buf := bytes.NewBuffer(results[i])
iter := migration.NewUnaggregatedIterator(buf, nil, protobuf.NewUnaggregatedOptions())
iter := protobuf.NewUnaggregatedIterator(buf, protobuf.NewUnaggregatedOptions())
for iter.Next() {
msgResult := iter.Current()
switch msgResult.Type {
Expand Down
23 changes: 1 addition & 22 deletions src/aggregator/config/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,7 @@ rawtcp:
maxBackoff: 1s
forever: true
jitter: true
readBufferSize: 1440
msgpackIterator:
ignoreHigherVersion: false
readerBufferSize: 1440
largeFloatsSize: 1024
largeFloatsPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
watermark:
low: 0.001
high: 0.002
readBufferSize: 65536
protobufIterator:
initBufferSize: 1440
maxMessageSize: 50000000 # max message size is 50MB
Expand Down
70 changes: 0 additions & 70 deletions src/aggregator/integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/m3db/m3/src/metrics/encoding"
"github.com/m3db/m3/src/metrics/encoding/msgpack"
"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric"
Expand All @@ -40,7 +39,6 @@ type client struct {
address string
batchSize int
connectTimeout time.Duration
msgpackEncoder msgpack.UnaggregatedEncoder
protobufEncoder protobuf.UnaggregatedEncoder
conn net.Conn
}
Expand All @@ -54,7 +52,6 @@ func newClient(
address: address,
batchSize: batchSize,
connectTimeout: connectTimeout,
msgpackEncoder: msgpack.NewUnaggregatedEncoder(msgpack.NewPooledBufferedEncoder(nil)),
protobufEncoder: protobuf.NewUnaggregatedEncoder(protobuf.NewUnaggregatedOptions()),
}
}
Expand All @@ -77,55 +74,6 @@ func (c *client) testConnection() bool {
return true
}

func (c *client) writeUntimedMetricWithPoliciesList(
mu unaggregated.MetricUnion,
pl policy.PoliciesList,
) error {
encoder := c.msgpackEncoder.Encoder()
sizeBefore := encoder.Buffer().Len()
var err error
switch mu.Type {
case metric.CounterType:
err = c.msgpackEncoder.EncodeCounterWithPoliciesList(unaggregated.CounterWithPoliciesList{
Counter: mu.Counter(),
PoliciesList: pl,
})
case metric.TimerType:
err = c.msgpackEncoder.EncodeBatchTimerWithPoliciesList(unaggregated.BatchTimerWithPoliciesList{
BatchTimer: mu.BatchTimer(),
PoliciesList: pl,
})
case metric.GaugeType:
err = c.msgpackEncoder.EncodeGaugeWithPoliciesList(unaggregated.GaugeWithPoliciesList{
Gauge: mu.Gauge(),
PoliciesList: pl,
})
default:
err = fmt.Errorf("unrecognized metric type %v", mu.Type)
}
if err != nil {
encoder.Buffer().Truncate(sizeBefore)
c.msgpackEncoder.Reset(encoder)
return err
}
sizeAfter := encoder.Buffer().Len()
// If the buffer size is not big enough, do nothing.
if sizeAfter < c.batchSize {
return nil
}
// Otherwise we get a new buffer and copy the bytes exceeding the max
// flush size to it, swap the new buffer with the old one, and flush out
// the old buffer.
encoder2 := msgpack.NewPooledBufferedEncoder(nil)
data := encoder.Bytes()
encoder2.Buffer().Write(data[sizeBefore:sizeAfter])
c.msgpackEncoder.Reset(encoder2)
encoder.Buffer().Truncate(sizeBefore)
_, err = c.conn.Write(encoder.Bytes())
encoder.Close()
return err
}

func (c *client) writeUntimedMetricWithMetadatas(
mu unaggregated.MetricUnion,
sm metadata.StagedMetadatas,
Expand Down Expand Up @@ -227,24 +175,6 @@ func (c *client) writeUnaggregatedMessage(
}

func (c *client) flush() error {
if err := c.flushMsgpackEncoder(); err != nil {
return err
}
return c.flushProtobufEncoder()
}

func (c *client) flushMsgpackEncoder() error {
encoder := c.msgpackEncoder.Encoder()
if len(encoder.Bytes()) == 0 {
return nil
}
c.msgpackEncoder.Reset(msgpack.NewPooledBufferedEncoder(nil))
_, err := c.conn.Write(encoder.Bytes())
encoder.Close()
return err
}

func (c *client) flushProtobufEncoder() error {
encoder := c.protobufEncoder
if encoder.Len() == 0 {
return nil
Expand Down
36 changes: 1 addition & 35 deletions src/aggregator/integration/custom_aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestCustomAggregationWithPoliciesList(t *testing.T) {
metadataFns := [4]metadataFn{
func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
},
func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesListWithCustomAggregation1,
}
},
func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesListWithCustomAggregation2,
}
},
func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
},
}
testCustomAggregations(t, metadataFns)
}

func TestCustomAggregationWithStagedMetadatas(t *testing.T) {
metadataFns := [4]metadataFn{
func(int) metadataUnion {
Expand Down Expand Up @@ -212,11 +182,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) {
for _, data := range dataset {
setNowFn(data.timestamp)
for _, mm := range data.metricWithMetadatas {
if mm.metadata.mType == policiesListType {
require.NoError(t, client.writeUntimedMetricWithPoliciesList(mm.metric.untimed, mm.metadata.policiesList))
} else {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.flush())

Expand Down
6 changes: 1 addition & 5 deletions src/aggregator/integration/integration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,7 @@ func (mu metricUnion) ID() metricid.RawID {
type metadataType int

const (
policiesListType metadataType = iota
stagedMetadatasType
stagedMetadatasType metadataType = iota
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just sanity checking that these values aren't used externally - otherwise we should _ this first one to maintain the existing iota values

forwardMetadataType
timedMetadataType
passthroughMetadataType
Expand All @@ -792,7 +791,6 @@ type metadataFn func(idx int) metadataUnion

type metadataUnion struct {
mType metadataType
policiesList policy.PoliciesList
stagedMetadatas metadata.StagedMetadatas
forwardMetadata metadata.ForwardMetadata
timedMetadata metadata.TimedMetadata
Expand All @@ -804,8 +802,6 @@ func (mu metadataUnion) expectedAggregationKeys(
defaultStoragePolicies []policy.StoragePolicy,
) (aggregationKeys, error) {
switch mu.mType {
case policiesListType:
return computeExpectedAggregationKeysFromPoliciesList(now, mu.policiesList, defaultStoragePolicies)
case stagedMetadatasType:
return computeExpectedAggregationKeysFromStagedMetadatas(now, mu.stagedMetadatas, defaultStoragePolicies)
case forwardMetadataType:
Expand Down
22 changes: 1 addition & 21 deletions src/aggregator/integration/metadata_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestMetadataChangeWithPoliciesList(t *testing.T) {
oldMetadataFn := func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
}
newMetadataFn := func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testUpdatedPoliciesList,
}
}
testMetadataChange(t, oldMetadataFn, newMetadataFn)
}

func TestMetadataChangeWithStagedMetadatas(t *testing.T) {
oldMetadataFn := func(int) metadataUnion {
return metadataUnion{
Expand Down Expand Up @@ -156,11 +140,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) {
for _, data := range dataset {
setNowFn(data.timestamp)
for _, mm := range data.metricWithMetadatas {
if mm.metadata.mType == policiesListType {
require.NoError(t, client.writeUntimedMetricWithPoliciesList(mm.metric.untimed, mm.metadata.policiesList))
} else {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.flush())

Expand Down
16 changes: 1 addition & 15 deletions src/aggregator/integration/multi_client_one_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestMultiClientOneTypeWithPoliciesList(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
}
testMultiClientOneType(t, metadataFn)
}

func TestMultiClientOneTypeWithStagedMetadatas(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
Expand Down Expand Up @@ -137,11 +127,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) {
for _, mm := range data.metricWithMetadatas {
// Randomly pick one client to write the metric.
client := clients[rand.Int63n(int64(numClients))]
if mm.metadata.mType == policiesListType {
require.NoError(t, client.writeUntimedMetricWithPoliciesList(mm.metric.untimed, mm.metadata.policiesList))
} else {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
for _, client := range clients {
require.NoError(t, client.flush())
Expand Down
16 changes: 1 addition & 15 deletions src/aggregator/integration/one_client_multi_type_untimed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestOneClientMultiTypeUntimedMetricsWithPoliciesList(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
}
testOneClientMultiType(t, metadataFn)
}

func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
Expand Down Expand Up @@ -127,11 +117,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) {
for _, data := range dataset {
setNowFn(data.timestamp)
for _, mm := range data.metricWithMetadatas {
if mm.metadata.mType == policiesListType {
require.NoError(t, client.writeUntimedMetricWithPoliciesList(mm.metric.untimed, mm.metadata.policiesList))
} else {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.flush())

Expand Down
16 changes: 1 addition & 15 deletions src/aggregator/integration/same_id_multi_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestSameIDMultiTypeWithPoliciesList(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
mType: policiesListType,
policiesList: testPoliciesList,
}
}
testSameIDMultiType(t, metadataFn)
}

func TestSameIDMultiTypeWithStagedMetadatas(t *testing.T) {
metadataFn := func(int) metadataUnion {
return metadataUnion{
Expand Down Expand Up @@ -151,11 +141,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) {
for _, data := range dataset {
setNowFn(data.timestamp)
for _, mm := range data.metricWithMetadatas {
if mm.metadata.mType == policiesListType {
require.NoError(t, client.writeUntimedMetricWithPoliciesList(mm.metric.untimed, mm.metadata.policiesList))
} else {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
require.NoError(t, client.flush())

Expand Down
Loading