Skip to content

Commit

Permalink
Merge branch 'master' into chrischinch/quickstarts-complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Chinchilla authored Dec 4, 2020
2 parents 28b739a + d86fe20 commit f7b46f4
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 81 deletions.
2 changes: 1 addition & 1 deletion site/content/overview/media.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Recordings of all past meetups can be found on a [Vimeo M3 Community Meetings fo

- [June 2020 Meetup](https://vimeo.com/440390957).

- [July 2020 Meetup and LinkedIn presentation](https://vimeo.com/440449118).
- [July 2020 Meetup and LinkedIn presentation](https://vimeo.com/440390957).

- [August 2020 Meetup and Walmart presentation](https://vimeo.com/449883279).

Expand Down
7 changes: 3 additions & 4 deletions site/static/about/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<meta property="og:title" content="M3: Open Source Metrics Engine"/>
<meta property="og:site_name" content="M3"/>

<meta property="og:url" content="#"/>
<meta property="og:url" content="/about"/>

<meta property="og:type" content="website"/>
<meta name="twitter:title" content="M3: Open Source Metrics Engine"/>
Expand All @@ -53,9 +53,8 @@

<!-- Image -->

<meta property="og:image" content="../images/page/og-about.jpg"/>
<meta name="twitter:image" content="../images/page/og-about.jpg"/>

<meta property="og:image" content="/images/logo-square.png"/>
<meta name="twitter:image" content="/images/logo-square.png"/>

<!-- Description -->

Expand Down
7 changes: 3 additions & 4 deletions site/static/community/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@
<meta property="og:title" content="M3: Open Source Metrics Engine"/>
<meta property="og:site_name" content="M3"/>

<meta property="og:url" content="#"/>
<meta property="og:url" content="/community"/>

<meta property="og:type" content="website"/>
<meta name="twitter:title" content="M3: Open Source Metrics Engine"/>
<meta name="twitter:site" content="@m3"/>
<meta name="twitter:card" content="summary_large_image"/>

<!-- Image -->

<meta property="og:image" content="../images/page/og-community.jpg"/>
<meta name="twitter:image" content="../images/page/og-community.jpg"/>
<meta property="og:image" content="/images/logo-square.png"/>
<meta name="twitter:image" content="/images/logo-square.png"/>


<!-- Description -->
Expand Down
6 changes: 3 additions & 3 deletions site/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<meta property="og:title" content="M3: Open Source Metrics Engine"/>
<meta property="og:site_name" content="M3"/>

<meta property="og:url" content="#"/>
<meta property="og:url" content="/"/>

<meta property="og:type" content="website"/>
<meta name="twitter:title" content="M3: Open Source Metrics Engine"/>
Expand All @@ -53,8 +53,8 @@

<!-- Image -->

<meta property="og:image" content="../images/page/og-home.jpg"/>
<meta name="twitter:image" content="../images/page/og-home.jpg"/>
<meta property="og:image" content="/images/logo-square.png"/>
<meta name="twitter:image" content="/images/logo-square.png"/>


<!-- Description -->
Expand Down
11 changes: 5 additions & 6 deletions src/cluster/placement/staged_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"errors"
"sort"
"sync"
"sync/atomic"

"go.uber.org/atomic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/x/clock"
Expand All @@ -44,7 +45,7 @@ type activeStagedPlacement struct {
onPlacementsAddedFn OnPlacementsAddedFn
onPlacementsRemovedFn OnPlacementsRemovedFn

expiring int32
expiring atomic.Int32
closed bool
doneFn DoneFn
}
Expand Down Expand Up @@ -98,8 +99,6 @@ func (p *activeStagedPlacement) Close() error {
}

func (p *activeStagedPlacement) Version() int {
p.RLock()
defer p.RUnlock()
return p.version
}

Expand All @@ -115,7 +114,7 @@ func (p *activeStagedPlacement) activePlacementWithLock(timeNanos int64) (Placem
}
placement := p.placements[idx]
// If the placement that's in effect is not the first placment, expire the stale ones.
if idx > 0 && atomic.CompareAndSwapInt32(&p.expiring, 0, 1) {
if idx > 0 && p.expiring.CAS(0, 1) {
go p.expire()
}
return placement, nil
Expand All @@ -126,7 +125,7 @@ func (p *activeStagedPlacement) expire() {
// because this code path is triggered very infrequently.
cleanup := func() {
p.Unlock()
atomic.StoreInt32(&p.expiring, 0)
p.expiring.Store(0)
}
p.Lock()
defer cleanup()
Expand Down
87 changes: 81 additions & 6 deletions src/cluster/placement/staged_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

var (
Expand Down Expand Up @@ -395,33 +396,107 @@ func TestActiveStagedPlacementExpireAlreadyClosed(t *testing.T) {
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 99999) },
expiring: 1,
closed: true,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Nil(t, removedInstances)
}

func TestActiveStagedPlacementVersionWhileExpiring(t *testing.T) {
for i := 0; i < 100; i++ {
// test itself is fast, unless there's a deadlock
testActiveStagedPlacementVersionWhileExpiring(t)
}
}

//nolint:gocyclo
func testActiveStagedPlacementVersionWhileExpiring(t *testing.T) {
var (
doneCh = make(chan struct{})
signalCh = make(chan struct{})
version int
ranCleanup atomic.Bool
)

p := newActiveStagedPlacement(append([]Placement{}, testActivePlacements...), 42, nil)
p.nowFn = func() time.Time {
return time.Unix(0, testActivePlacements[len(testActivePlacements)-1].CutoverNanos()+1)
}
p.onPlacementsRemovedFn = func(_ []Placement) {
ranCleanup.Store(true)
}

go func() {
defer close(doneCh)
for {
version = p.Version()
select {
case signalCh <- struct{}{}:
return
default:
}
}
}()

pl, doneFn, err := p.ActivePlacement()
require.NoError(t, err)
require.NotNil(t, pl)
require.NotNil(t, doneFn)

// active placement is not the first in the list - expiration of past
// placements must be triggered
require.Equal(t, int32(1), p.expiring.Load())

// make sure p.Version() call was attempted at least once
select {
case <-signalCh:
case <-time.After(time.Second):
t.Fatalf("test timed out, deadlock?")
}

// release placement lock to unblock expiration process
doneFn()
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatalf("test timed out, deadlock?")
}

// there's no good way to determine when expire process has been completed,
// try polling for 100ms
for i := 0; i < 100; i++ {
if ranCleanup.Load() && p.expiring.Load() == int32(0) {
break
}
time.Sleep(1 * time.Millisecond)
}

require.Equal(t, 42, version)
require.True(t, ranCleanup.Load())
require.Equal(t, int32(0), p.expiring.Load())
}

func TestActiveStagedPlacementExpireAlreadyExpired(t *testing.T) {
var removedInstances [][]Instance
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 0) },
expiring: 1,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Nil(t, removedInstances)
}

Expand All @@ -430,15 +505,15 @@ func TestActiveStagedPlacementExpireSuccess(t *testing.T) {
p := &activeStagedPlacement{
placements: append([]Placement{}, testActivePlacements...),
nowFn: func() time.Time { return time.Unix(0, 99999) },
expiring: 1,
onPlacementsRemovedFn: func(placements []Placement) {
for _, placement := range placements {
removedInstances = append(removedInstances, placement.Instances())
}
},
}
p.expiring.Store(1)
p.expire()
require.Equal(t, int32(0), p.expiring)
require.Equal(t, int32(0), p.expiring.Load())
require.Equal(t, [][]Instance{testActivePlacements[0].Instances()}, removedInstances)
require.Equal(t, 1, len(p.placements))
validateSnapshot(t, testActivePlacements[1], p.placements[0])
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAggregationType(t *tes
tags: map[string]string{
"__g0__": "nginx_edge",
"__g1__": "health",
"__g2__": "Max",
"__g2__": "upper",
},
values: []expectedValue{{value: 30}},
attributes: &storagemetadata.Attributes{
Expand Down Expand Up @@ -668,7 +668,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp
tags: map[string]string{
"__g0__": "nginx_edge",
"__g1__": "health",
"__g2__": "Max",
"__g2__": "upper",
},
values: []expectedValue{{value: 30}},
attributes: &storagemetadata.Attributes{
Expand All @@ -681,7 +681,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp
tags: map[string]string{
"__g0__": "nginx_edge",
"__g1__": "health",
"__g2__": "Sum",
"__g2__": "sum",
},
values: []expectedValue{{value: 60}},
attributes: &storagemetadata.Attributes{
Expand Down Expand Up @@ -742,7 +742,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixAndAggre
"__g1__": "counter",
"__g2__": "nginx_edge",
"__g3__": "health",
"__g4__": "Max",
"__g4__": "upper",
},
values: []expectedValue{{value: 30}},
attributes: &storagemetadata.Attributes{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (a *metricsAppender) augmentTags(
var (
count = tags.countPrefix(graphite.Prefix)
name = graphite.TagName(count)
value = types[0].Bytes()
value = types[0].Name()
)
tags.append(name, value)
}
Expand Down
34 changes: 34 additions & 0 deletions src/metrics/aggregation/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ var (
}

typeStringMap map[string]Type

typeStringNames = map[Type][]byte{
Last: []byte("last"),
Min: []byte("lower"),
Max: []byte("upper"),
Mean: []byte("mean"),
Median: []byte("median"),
Count: []byte("count"),
Sum: []byte("sum"),
SumSq: []byte("sum_sq"),
Stdev: []byte("stdev"),
P10: []byte("p10"),
P20: []byte("p20"),
P30: []byte("p30"),
P40: []byte("p40"),
P50: []byte("p50"),
P60: []byte("p60"),
P70: []byte("p70"),
P80: []byte("p80"),
P90: []byte("p90"),
P95: []byte("p95"),
P99: []byte("p99"),
P999: []byte("p999"),
P9999: []byte("p9999"),
}
)

// Type defines an aggregation function.
Expand Down Expand Up @@ -232,6 +257,15 @@ func (a *Type) UnmarshalText(data []byte) error {
return nil
}

// Name returns the name of the Type.
func (a Type) Name() []byte {
name, ok := typeStringNames[a]
if ok {
return name
}
return a.Bytes()
}

func validateProtoType(a aggregationpb.AggregationType) error {
_, ok := aggregationpb.AggregationType_name[int32(a)]
if !ok {
Expand Down
26 changes: 1 addition & 25 deletions src/metrics/aggregation/types_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,32 +449,8 @@ func validateQuantiles(t *testing.T, o TypesOptions) {
}

func typeStrings(overrides map[Type][]byte) [][]byte {
defaultTypeStrings := map[Type][]byte{
Last: []byte("last"),
Min: []byte("lower"),
Max: []byte("upper"),
Mean: []byte("mean"),
Median: []byte("median"),
Count: []byte("count"),
Sum: []byte("sum"),
SumSq: []byte("sum_sq"),
Stdev: []byte("stdev"),
P10: []byte("p10"),
P20: []byte("p20"),
P30: []byte("p30"),
P40: []byte("p40"),
P50: []byte("p50"),
P60: []byte("p60"),
P70: []byte("p70"),
P80: []byte("p80"),
P90: []byte("p90"),
P95: []byte("p95"),
P99: []byte("p99"),
P999: []byte("p999"),
P9999: []byte("p9999"),
}
res := make([][]byte, maxTypeID+1)
for t, bstr := range defaultTypeStrings {
for t, bstr := range typeStringNames {
if override, exist := overrides[t]; exist {
res[t.ID()] = override
continue
Expand Down
Loading

0 comments on commit f7b46f4

Please sign in to comment.