Skip to content

Commit

Permalink
[aggregator] Add integration test for aggregator placement changes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis authored May 6, 2021
1 parent c4bd395 commit 6c4b901
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 186 deletions.
58 changes: 58 additions & 0 deletions src/aggregator/integration/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// +build integration

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"sync"
"time"

"github.com/m3db/m3/src/x/clock"
)

type testClock struct {
sync.RWMutex

now time.Time
}

func newTestClock(now time.Time) *testClock {
return &testClock{
now: now,
}
}

func (c *testClock) Now() time.Time {
c.RLock()
defer c.RUnlock()
return c.now
}

func (c *testClock) SetNow(now time.Time) {
c.Lock()
defer c.Unlock()
c.now = now
}

func (c *testClock) Options() clock.Options {
return clock.NewOptions().SetNowFn(c.Now)
}
26 changes: 5 additions & 21 deletions src/aggregator/integration/custom_aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ package integration

import (
"sort"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/x/clock"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -79,21 +76,8 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) {
SetAggregationTypesOptions(aggTypesOpts)

// Clock setup.
var lock sync.RWMutex
now := time.Now().Truncate(time.Hour)
getNowFn := func() time.Time {
lock.RLock()
t := now
lock.RUnlock()
return t
}
setNowFn := func(t time.Time) {
lock.Lock()
now = t
lock.Unlock()
}
clockOpts := clock.NewOptions().SetNowFn(getNowFn)
serverOpts = serverOpts.SetClockOptions(clockOpts)
clock := newTestClock(time.Now().Truncate(time.Hour))
serverOpts = serverOpts.SetClockOptions(clock.Options())

// Placement setup.
numShards := 1024
Expand Down Expand Up @@ -124,7 +108,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) {
var (
idPrefix = "foo"
numIDs = 100
start = getNowFn()
start = clock.Now()
t1 = start.Add(2 * time.Second)
t2 = start.Add(4 * time.Second)
t3 = start.Add(6 * time.Second)
Expand Down Expand Up @@ -180,7 +164,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) {
}
for _, dataset := range inputs {
for _, data := range dataset {
setNowFn(data.timestamp)
clock.SetNow(data.timestamp)
for _, mm := range data.metricWithMetadatas {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
Expand All @@ -194,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) {
// Move time forward and wait for ticking to happen. The sleep time
// must be the longer than the lowest resolution across all policies.
finalTime := end.Add(time.Second)
setNowFn(finalTime)
clock.SetNow(finalTime)
time.Sleep(6 * time.Second)

// Stop the server.
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package integration contains integration tests for aggregator.
package integration

import (
Expand Down
26 changes: 5 additions & 21 deletions src/aggregator/integration/metadata_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ package integration

import (
"sort"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/x/clock"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -59,21 +56,8 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) {
serverOpts := newTestServerOptions()

// Clock setup.
var lock sync.RWMutex
now := time.Now().Truncate(time.Hour)
getNowFn := func() time.Time {
lock.RLock()
t := now
lock.RUnlock()
return t
}
setNowFn := func(t time.Time) {
lock.Lock()
now = t
lock.Unlock()
}
clockOpts := clock.NewOptions().SetNowFn(getNowFn)
serverOpts = serverOpts.SetClockOptions(clockOpts)
clock := newTestClock(time.Now().Truncate(time.Hour))
serverOpts = serverOpts.SetClockOptions(clock.Options())

// Placement setup.
numShards := 1024
Expand Down Expand Up @@ -104,7 +88,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) {
var (
idPrefix = "foo"
numIDs = 100
start = getNowFn()
start = clock.Now()
middle = start.Add(4 * time.Second)
end = start.Add(10 * time.Second)
interval = time.Second
Expand Down Expand Up @@ -138,7 +122,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) {
}
for _, dataset := range inputs {
for _, data := range dataset {
setNowFn(data.timestamp)
clock.SetNow(data.timestamp)
for _, mm := range data.metricWithMetadatas {
require.NoError(t, client.writeUntimedMetricWithMetadatas(mm.metric.untimed, mm.metadata.stagedMetadatas))
}
Expand All @@ -152,7 +136,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) {
// Move time forward and wait for ticking to happen. The sleep time
// must be the longer than the lowest resolution across all policies.
finalTime := end.Add(time.Second)
setNowFn(finalTime)
clock.SetNow(finalTime)
time.Sleep(6 * time.Second)

// Stop the server.
Expand Down
26 changes: 5 additions & 21 deletions src/aggregator/integration/multi_client_one_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ package integration

import (
"math/rand"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/x/clock"

"github.com/stretchr/testify/require"
)

Expand All @@ -53,21 +50,8 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) {
serverOpts := newTestServerOptions()

// Clock setup.
var lock sync.RWMutex
now := time.Now().Truncate(time.Hour)
getNowFn := func() time.Time {
lock.RLock()
t := now
lock.RUnlock()
return t
}
setNowFn := func(t time.Time) {
lock.Lock()
now = t
lock.Unlock()
}
clockOpts := clock.NewOptions().SetNowFn(getNowFn)
serverOpts = serverOpts.SetClockOptions(clockOpts)
clock := newTestClock(time.Now().Truncate(time.Hour))
serverOpts = serverOpts.SetClockOptions(clock.Options())

// Placement setup.
numShards := 1024
Expand Down Expand Up @@ -98,7 +82,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) {
var (
idPrefix = "foo"
numIDs = 100
start = getNowFn()
start = clock.Now()
stop = start.Add(10 * time.Second)
interval = time.Second
numClients = 10
Expand All @@ -123,7 +107,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) {
metadataFn: metadataFn,
})
for _, data := range dataset {
setNowFn(data.timestamp)
clock.SetNow(data.timestamp)
for _, mm := range data.metricWithMetadatas {
// Randomly pick one client to write the metric.
client := clients[rand.Int63n(int64(numClients))]
Expand All @@ -140,7 +124,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) {
// Move time forward and wait for ticking to happen. The sleep time
// must be the longer than the lowest resolution across all policies.
finalTime := stop.Add(time.Second)
setNowFn(finalTime)
clock.SetNow(finalTime)
time.Sleep(4 * time.Second)

// Stop the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/m3db/m3/src/metrics/pipeline/applied"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/metrics/transformation"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
Expand All @@ -66,20 +65,7 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues
}

// Clock setup.
var lock sync.RWMutex
now := time.Now().Truncate(time.Hour)
getNowFn := func() time.Time {
lock.RLock()
t := now
lock.RUnlock()
return t
}
setNowFn := func(t time.Time) {
lock.Lock()
now = t
lock.Unlock()
}
clockOpts := clock.NewOptions().SetNowFn(getNowFn)
clock := newTestClock(time.Now().Truncate(time.Hour))

// Placement setup.
var (
Expand Down Expand Up @@ -170,7 +156,7 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues
)
instrumentOpts = instrumentOpts.SetLogger(logger)
serverOpts := newTestServerOptions().
SetClockOptions(clockOpts).
SetClockOptions(clock.Options()).
SetInstrumentOptions(instrumentOpts).
SetElectionCluster(electionCluster).
SetHTTPAddr(mss.httpAddr).
Expand Down Expand Up @@ -233,7 +219,7 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues
var (
idPrefix = "foo"
numIDs = 2
start = getNowFn()
start = clock.Now()
stop = start.Add(12 * time.Second)
interval = time.Second
storagePolicies = policy.StoragePolicies{
Expand Down Expand Up @@ -304,7 +290,7 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues

writingClients := clients[:2]
for _, data := range dataset {
setNowFn(data.timestamp)
clock.SetNow(data.timestamp)

for _, mm := range data.metricWithMetadatas {
for _, c := range writingClients {
Expand All @@ -323,15 +309,15 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues
// at the originating server (where the raw metrics are aggregated).
originatingServerflushTime := stop.Add(2 * storagePolicies[1].Resolution().Window)
for currTime := stop; !currTime.After(originatingServerflushTime); currTime = currTime.Add(time.Second) {
setNowFn(currTime)
clock.SetNow(currTime)
time.Sleep(time.Second)
}

// Move time forward using the larger resolution again and wait for flushing to
// happen at the destination server (where the rollup metrics are aggregated).
destinationServerflushTime := originatingServerflushTime.Add(2 * storagePolicies[1].Resolution().Window)
for currTime := originatingServerflushTime; !currTime.After(destinationServerflushTime); currTime = currTime.Add(time.Second) {
setNowFn(currTime)
clock.SetNow(currTime)
time.Sleep(time.Second)
}

Expand Down
Loading

0 comments on commit 6c4b901

Please sign in to comment.