Skip to content

Commit

Permalink
receive: make az aware ketama hashring (thanos-io#6369)
Browse files Browse the repository at this point in the history
* receive: make az aware ketama hashring

Signed-off-by: Alexander Rickardsson <[email protected]>

* receive: pass endpoints in hashring config as object

Signed-off-by: Michael Hoffmann <[email protected]>

* receive: add some tests for consistent hashing in presence of AZs

Signed-off-by: Michael Hoffmann <[email protected]>

* receive,docs: add migration note for az aware hashring

Signed-off-by: Michael Hoffmann <[email protected]>

---------

Signed-off-by: Alexander Rickardsson <[email protected]>
Signed-off-by: Michael Hoffmann <[email protected]>
Co-authored-by: Michael Hoffmann <[email protected]>
  • Loading branch information
2 people authored and HC Zhu committed Jun 27, 2023
1 parent 7918322 commit f480f53
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#6369](https://github.com/thanos-io/thanos/pull/6369) Receive: add az-aware replication support for Ketama algorithm
- [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name.
- [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.
Expand Down
41 changes: 41 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,47 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

### AZ-aware Ketama hashring (experimental)

In order to ensure even spread for replication over nodes in different availability-zones, you can choose to include az definition in your hashring config. If we for example have a 6 node cluster, spread over 3 different availability zones; A, B and C, we could use the following example `hashring.json`:

```json
[
{
"endpoints": [
{
"address": "127.0.0.1:10907",
"az": "A"
},
{
"address": "127.0.0.1:11907",
"az": "B"
},
{
"address": "127.0.0.1:12907",
"az": "C"
},
{
"address": "127.0.0.1:13907",
"az": "A"
},
{
"address": "127.0.0.1:14907",
"az": "B"
},
{
"address": "127.0.0.1:15907",
"az": "C"
}
]
}
]
```

This is only supported for the Ketama algorithm.

**NOTE:** This feature is made available from v0.32 onwards. Receive can still operate with `endpoints` set to an array of IP strings in ketama mode. But to use AZ-aware hashring, you would need to migrate your existing hashring (and surrounding automation) to the new JSON structure mentioned above.

## Limits & gates (experimental)

Thanos Receive has some limits and gates that can be configured to control resource usage. Here's the difference between limits and gates:
Expand Down
25 changes: 24 additions & 1 deletion pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,35 @@ const (
RouterIngestor ReceiverMode = "RouterIngestor"
)

type Endpoint struct {
Address string `json:"address"`
AZ string `json:"az"`
}

func (e *Endpoint) UnmarshalJSON(data []byte) error {
// First try to unmarshal as a string.
err := json.Unmarshal(data, &e.Address)
if err == nil {
return nil
}

// If that fails, try to unmarshal as an endpoint object.
type endpointAlias Endpoint
var configEndpoint endpointAlias
err = json.Unmarshal(data, &configEndpoint)
if err == nil {
e.Address = configEndpoint.Address
e.AZ = configEndpoint.AZ
}
return err
}

// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/receive/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestValidateConfig(t *testing.T) {
name: "valid config",
cfg: []HashringConfig{
{
Endpoints: []string{"node1"},
Endpoints: []Endpoint{{Address: "node1"}},
},
},
err: nil, // means it's valid.
Expand Down Expand Up @@ -71,3 +71,16 @@ func TestValidateConfig(t *testing.T) {
})
}
}

func TestUnmarshalEndpointSlice(t *testing.T) {
t.Run("Endpoints as string slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`["node-1"]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1"}})
})
t.Run("Endpoints as endpoints slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`[{"address": "node-1", "az": "az-1"}]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1", AZ: "az-1"}})
})
}
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
h.peers = peers
addr := ag.newAddr()
h.options.Endpoint = addr
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint})
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
// Use hashmod as default.
Expand Down
61 changes: 48 additions & 13 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"fmt"
"math"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -75,6 +76,17 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

func newSimpleHashring(endpoints []Endpoint) (Hashring, error) {
addresses := make([]string, len(endpoints))
for i := range endpoints {
if endpoints[i].AZ != "" {
return nil, errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.")
}
addresses[i] = endpoints[i].Address
}
return simpleHashring(addresses), nil
}

// Get returns a target to handle the given tenant and time series.
func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return s.GetN(tenant, ts, 0)
Expand All @@ -90,6 +102,7 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

type section struct {
az string
endpointIndex uint64
hash uint64
replicas []uint64
Expand All @@ -104,25 +117,27 @@ func (p sections) Sort() { sort.Sort(p) }

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
endpoints []Endpoint
sections sections
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

if len(endpoints) < int(replicationFactor) {
return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor")

}

hash := xxhash.New()
availabilityZones := make(map[string]struct{})
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range endpoints {
availabilityZones[endpoint.AZ] = struct{}{}
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
_, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i)))
n := &section{
az: endpoint.AZ,
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
replicas: make([]uint64, 0, replicationFactor),
Expand All @@ -133,7 +148,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
}
}
sort.Sort(ringSections)
calculateSectionReplicas(ringSections, replicationFactor)
calculateSectionReplicas(ringSections, replicationFactor, availabilityZones)

return &ketamaHashring{
endpoints: endpoints,
Expand All @@ -142,19 +157,39 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
}, nil
}

func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 {
minValue := int64(math.MaxInt64)
for _, value := range azSpread {
if value < minValue {
minValue = value
}
}
return minValue
}

// calculateSectionReplicas pre-calculates replicas for each section,
// ensuring that replicas for each ring section are owned by different endpoints.
func calculateSectionReplicas(ringSections sections, replicationFactor uint64) {
func calculateSectionReplicas(ringSections sections, replicationFactor uint64, availabilityZones map[string]struct{}) {
for i, s := range ringSections {
replicas := make(map[uint64]struct{})
azSpread := make(map[string]int64)
for az := range availabilityZones {
// This is to make sure each az is initially represented
azSpread[az] = 0
}
j := i - 1
for uint64(len(replicas)) < replicationFactor {
j = (j + 1) % len(ringSections)
rep := ringSections[j]
if _, ok := replicas[rep.endpointIndex]; ok {
continue
}
if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > sizeOfLeastOccupiedAZ(azSpread) {
// We want to ensure even AZ spread before we add more replicas within the same AZ
continue
}
replicas[rep.endpointIndex] = struct{}{}
azSpread[rep.az]++
s.replicas = append(s.replicas, rep.endpointIndex)
}
}
Expand Down Expand Up @@ -182,7 +217,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

endpointIndex := c.sections[i].replicas[n]
return c.endpoints[endpointIndex], nil
return c.endpoints[endpointIndex].Address, nil
}

// multiHashring represents a set of hashrings.
Expand Down Expand Up @@ -246,11 +281,11 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
for _, h := range cfg {
var hashring Hashring
var err error
activeAlgorithm := algorithm
if h.Algorithm != "" {
hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
} else {
hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
activeAlgorithm = h.Algorithm
}
hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
if err != nil {
return nil, err
}
Expand All @@ -267,17 +302,17 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
return m, nil
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints), nil
return newSimpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints), nil
return newSimpleHashring(endpoints)
}
}
Loading

0 comments on commit f480f53

Please sign in to comment.