Skip to content

Commit

Permalink
Merge pull request thanos-io#5653 from haanhvu/issue5567
Browse files Browse the repository at this point in the history
Receive: Allow setting hashing algorithm per tenant in hashrings config
  • Loading branch information
yeya24 authored Feb 7, 2023
2 parents 48e82c5 + 7930c75 commit 4bb560e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6008](https://github.com/thanos-io/thanos/pull/6008) *: Add counter metric `gate_queries_total` to gate.
- [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`.
- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file.
- [#5653](https://github.com/thanos-io/thanos/pull/5653) Receive: Allow setting hashing algorithm per tenant in hashrings config

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext).
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config.").
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))

Expand Down
4 changes: 3 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ Flags:
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings. Must be one of hashmod, ketama
the hashrings. Must be one of hashmod, ketama.
Will be overwritten by the tenant-specific
algorithm in the hashring config.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized
Expand Down
7 changes: 4 additions & 3 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down
37 changes: 25 additions & 12 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (

"github.com/cespare/xxhash"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -236,19 +239,14 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
cache: make(map[string]Hashring),
}

newHashring := func(endpoints []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
return simpleHashring(endpoints)
}
}

for _, h := range cfg {
m.hashrings = append(m.hashrings, newHashring(h.Endpoints))
var hashring Hashring
if h.Algorithm != "" {
hashring = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
} else {
hashring = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
}
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
Expand Down Expand Up @@ -299,3 +297,18 @@ func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, c

return newMultiHashring(algorithm, replicationFactor, config), err
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints)
}
}

0 comments on commit 4bb560e

Please sign in to comment.