Skip to content

Commit

Permalink
receive: make az aware ketama hashring
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Rickardsson <[email protected]>
  • Loading branch information
alxric committed May 19, 2023
1 parent 5289449 commit 7bc9fdf
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#6369](https://github.com/thanos-io/thanos/pull/6369) Receive: add az-aware replication support for Ketama algorithm
- [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name.
- [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.
Expand Down
21 changes: 21 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

### AZ-aware Ketama hashring (experimental)

In order to ensure even spread for replication over nodes in different availability-zones, you can choose to indicate AZ as part of the endpoint. If we for example have a 6 node cluster, spread over 3 different availability zones: A, B and C, we could use the following example `hashring.json`:

```json
[
{
"endpoints": [
"127.0.0.1:10907,A",
"127.0.0.1:11907,B",
"127.0.0.1:12907,C",
"127.0.0.1:13907,A",
"127.0.0.1:14907,B",
"127.0.0.1:16907,C"
]
}
]
```

This is only supported for the Ketama algorithm.

## Limits & gates (experimental)

Thanos Receive has some limits and gates that can be configured to control resource usage. Here's the difference between limits and gates:
Expand Down
66 changes: 58 additions & 8 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package receive
import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"

"github.com/cespare/xxhash"
Expand Down Expand Up @@ -91,6 +93,7 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

type section struct {
az string
endpointIndex uint64
hash uint64
replicas []uint64
Expand All @@ -103,27 +106,51 @@ func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

type ketamaEndpoint struct {
endpoint string
az string
}

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
endpoints []ketamaEndpoint
sections sections
numEndpoints uint64
}

func getAZInfo(endpoint string) (string, string) {
s := strings.Split(endpoint, ",")
ep := s[0]
var az string
if len(s) == 1 {
az = ""
} else {
az = s[1]
}
return ep, az
}

func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

if len(endpoints) < int(replicationFactor) {
return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor")

}

availabilityZones := make(map[string]struct{})
ketamaEndpoints := []ketamaEndpoint{}
for _, endpoint := range endpoints {
ep, az := getAZInfo(endpoint)
availabilityZones[az] = struct{}{}
ketamaEndpoints = append(ketamaEndpoints, ketamaEndpoint{endpoint: ep, az: az})
}
hash := xxhash.New()
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range endpoints {
for endpointIndex, endpoint := range ketamaEndpoints {
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
_, _ = hash.Write([]byte(endpoint.endpoint + ":" + strconv.Itoa(i)))
n := &section{
az: endpoint.az,
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
replicas: make([]uint64, 0, replicationFactor),
Expand All @@ -134,28 +161,51 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
}
}
sort.Sort(ringSections)
calculateSectionReplicas(ringSections, replicationFactor)
calculateSectionReplicas(ringSections, replicationFactor, availabilityZones)

return &ketamaHashring{
endpoints: endpoints,
endpoints: ketamaEndpoints,
sections: ringSections,
numEndpoints: uint64(len(endpoints)),
}, nil
}

func getMinAz(m map[string]int64) int64 {
var minValue int64

minValue = math.MaxInt64

for _, value := range m {
if value < minValue {
minValue = value
}
}
return minValue
}

// calculateSectionReplicas pre-calculates replicas for each section,
// ensuring that replicas for each ring section are owned by different endpoints.
func calculateSectionReplicas(ringSections sections, replicationFactor uint64) {
func calculateSectionReplicas(ringSections sections, replicationFactor uint64, availabilityZones map[string]struct{}) {
for i, s := range ringSections {
replicas := make(map[uint64]struct{})
azSpread := make(map[string]int64)
for az := range availabilityZones {
// This is to make sure each az is initially represented
azSpread[az] = 0
}
j := i - 1
for uint64(len(replicas)) < replicationFactor {
j = (j + 1) % len(ringSections)
rep := ringSections[j]
if _, ok := replicas[rep.endpointIndex]; ok {
continue
}
if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > getMinAz(azSpread) {
// We want to ensure even AZ spread before we add more replicas within the same AZ
continue
}
replicas[rep.endpointIndex] = struct{}{}
azSpread[rep.az]++
s.replicas = append(s.replicas, rep.endpointIndex)
}
}
Expand Down Expand Up @@ -183,7 +233,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

endpointIndex := c.sections[i].replicas[n]
return c.endpoints[endpointIndex], nil
return c.endpoints[endpointIndex].endpoint, nil
}

// multiHashring represents a set of hashrings.
Expand Down
137 changes: 137 additions & 0 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package receive

import (
"fmt"
"math"
"strings"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -323,6 +326,140 @@ func TestKetamaHashringReplicationConsistency(t *testing.T) {
}
}

func TestKetamaHashringEvenAZSpread(t *testing.T) {
tenant := "default-tenant"
ts := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}

for _, tt := range []struct {
nodes []string
replicas uint64
}{
{
nodes: []string{"a,1", "b,2", "c,1", "d,2"},
replicas: 1,
},
{
nodes: []string{"a", "b", "c", "d"},
replicas: 1,
},
{
nodes: []string{"a,1", "b,2", "c,1", "d,2"},
replicas: 2,
},
{
nodes: []string{"a,1", "b,2", "c,3", "d,1", "e,2", "f,3"},
replicas: 3,
},
{
nodes: []string{"a", "b", "c", "d", "e", "f"},
replicas: 3,
},
{
nodes: []string{"a,1", "b,2", "c,3", "d,1", "e,2", "f,3", "g,4", "h,4", "i,4", "j,5", "k,5", "l,5"},
replicas: 10,
},
} {
t.Run("", func(t *testing.T) {
hashRing, err := newKetamaHashring(tt.nodes, SectionsPerNode, tt.replicas)
testutil.Ok(t, err)

availableAzs := make(map[string]int64)
for _, n := range tt.nodes {
_, az := getAZInfo(n)

availableAzs[az] = 0
}

azSpread := make(map[string]int64)
for i := 0; i < int(tt.replicas); i++ {
r, err := hashRing.GetN(tenant, ts, uint64(i))
testutil.Ok(t, err)

for _, n := range tt.nodes {
_, az := getAZInfo(n)
if !strings.HasPrefix(n, r) {
continue
}
azSpread[az]++
}
}

expectedAzSpreadLength := int(tt.replicas)
if int(tt.replicas) > len(availableAzs) {
expectedAzSpreadLength = len(availableAzs)
}
testutil.Equals(t, len(azSpread), expectedAzSpreadLength)

for _, writeToAz := range azSpread {
minAz := getMinAz(azSpread)
testutil.Assert(t, math.Abs(float64(writeToAz-minAz)) <= 1.0)
}
})
}
}

func TestKetamaHashringEvenNodeSpread(t *testing.T) {
tenant := "default-tenant"

for _, tt := range []struct {
nodes []string
replicas uint64
numSeries uint64
}{
{
nodes: []string{"a,1", "b,2", "c,1", "d,2"},
replicas: 2,
numSeries: 1000,
},
{
nodes: []string{"a", "b", "c", "d"},
replicas: 2,
numSeries: 1000,
},
{
nodes: []string{"a,1", "b,2", "c,3", "d,1", "e,2", "f,3"},
replicas: 3,
numSeries: 10000,
},
{
nodes: []string{"a,1", "b,2", "c,3", "d,1", "e,2", "f,3", "h,1", "i,2", "j,3"},
replicas: 2,
numSeries: 10000,
},
{
nodes: []string{"a,1", "b,2", "c,3", "d,1", "e,2", "f,3", "h,1", "i,2", "j,3"},
replicas: 9,
numSeries: 10000,
},
} {
t.Run("", func(t *testing.T) {
hashRing, err := newKetamaHashring(tt.nodes, SectionsPerNode, tt.replicas)
testutil.Ok(t, err)
optimalSpread := int(tt.numSeries*tt.replicas) / len(tt.nodes)
nodeSpread := make(map[string]int)
for i := 0; i < int(tt.numSeries); i++ {
ts := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", fmt.Sprintf("%d", i))),
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}
for j := 0; j < int(tt.replicas); j++ {
r, err := hashRing.GetN(tenant, ts, uint64(j))
testutil.Ok(t, err)

nodeSpread[r]++
}
}
for _, node := range nodeSpread {
diff := math.Abs(float64(node) - float64(optimalSpread))
testutil.Assert(t, diff/float64(optimalSpread) < 0.1)
}
})
}
}

func makeSeries() []prompb.TimeSeries {
numSeries := 10000
series := make([]prompb.TimeSeries, numSeries)
Expand Down

0 comments on commit 7bc9fdf

Please sign in to comment.