Skip to content

Commit

Permalink
receive: pass endpoints in hashring config as object
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
mhoffm-aiven committed May 25, 2023
1 parent 3f305a6 commit 5fb2e31
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 164 deletions.
29 changes: 21 additions & 8 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,35 @@ const (
RouterIngestor ReceiverMode = "RouterIngestor"
)

type AZAwareEndpoint struct {
type Endpoint struct {
Address string `json:"address"`
AZ string `json:"az"`
}

func (e *Endpoint) UnmarshalJSON(data []byte) error {
// First try to unmarshal as a string.
err := json.Unmarshal(data, &e.Address)
if err == nil {
return nil
}

// If that fails, try to unmarshal as an endpoint object.
type endpointAlias Endpoint
var configEndpoint endpointAlias
err = json.Unmarshal(data, &configEndpoint)
if err == nil {
e.Address = configEndpoint.Address
e.AZ = configEndpoint.AZ
}
return err
}

// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}
Expand Down Expand Up @@ -248,12 +266,7 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.lastSuccessTimeGauge.SetToCurrentTime()

for _, c := range config {
switch v := c.Endpoints.(type) {
case []string:
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(v)))
case []AZAwareEndpoint:
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(v)))
}
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints)))
cw.hashringTenantsGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Tenants)))
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/receive/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestValidateConfig(t *testing.T) {
name: "valid config",
cfg: []HashringConfig{
{
Endpoints: []string{"node1"},
Endpoints: []Endpoint{{Address: "node1"}},
},
},
err: nil, // means it's valid.
Expand Down Expand Up @@ -71,3 +71,16 @@ func TestValidateConfig(t *testing.T) {
})
}
}

func TestUnmarshalEndpointSlice(t *testing.T) {
t.Run("Endpoints as string slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`["node-1"]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1"}})
})
t.Run("Endpoints as endpoints slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`[{"address": "node-1", "az": "az-1"}]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1", AZ: "az-1"}})
})
}
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
h.peers = peers
addr := ag.newAddr()
h.options.Endpoint = addr
cfg[0].Endpoints = append(cfg[0].Endpoints.([]string), h.options.Endpoint)
cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint})
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
// Use hashmod as default.
Expand Down
63 changes: 24 additions & 39 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

func newSimpleHashring(endpoints []Endpoint) (Hashring, error) {
addresses := make([]string, len(endpoints))
for i := range endpoints {
if endpoints[i].AZ != "" {
return nil, errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.")
}
addresses[i] = endpoints[i].Address
}
return simpleHashring(addresses), nil
}

// Get returns a target to handle the given tenant and time series.
func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return s.GetN(tenant, ts, 0)
Expand Down Expand Up @@ -106,32 +117,23 @@ func (p sections) Sort() { sort.Sort(p) }

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []AZAwareEndpoint
endpoints []Endpoint
sections sections
numEndpoints uint64
}

func newKetamaHashring(endpoints interface{}, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
azEndpoints := []AZAwareEndpoint{}
func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

switch v := endpoints.(type) {
case []string:
for _, endpoint := range v {
azEndpoints = append(azEndpoints, AZAwareEndpoint{Address: endpoint, AZ: ""})
}
case []AZAwareEndpoint:
azEndpoints = v
}
numSections := len(azEndpoints) * sectionsPerNode

if len(azEndpoints) < int(replicationFactor) {
if len(endpoints) < int(replicationFactor) {
return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor")

}
availabilityZones := make(map[string]struct{})
hash := xxhash.New()
availabilityZones := make(map[string]struct{})
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range azEndpoints {
for endpointIndex, endpoint := range endpoints {
availabilityZones[endpoint.AZ] = struct{}{}
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i)))
n := &section{
Expand All @@ -149,17 +151,14 @@ func newKetamaHashring(endpoints interface{}, sectionsPerNode int, replicationFa
calculateSectionReplicas(ringSections, replicationFactor, availabilityZones)

return &ketamaHashring{
endpoints: azEndpoints,
endpoints: endpoints,
sections: ringSections,
numEndpoints: uint64(len(azEndpoints)),
numEndpoints: uint64(len(endpoints)),
}, nil
}

func getMinAz(m map[string]int64) int64 {
var minValue int64

minValue = math.MaxInt64

minValue := int64(math.MaxInt64)
for _, value := range m {
if value < minValue {
minValue = value
Expand Down Expand Up @@ -270,16 +269,6 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return "", errors.New("no matching hashring to handle tenant")
}

func validateAZHashring(endpoints interface{}, algorithm HashringAlgorithm) error {
switch endpoints.(type) {
case []string:
if algorithm == AlgorithmHashmod {
return errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.")
}
}
return nil
}

// newMultiHashring creates a multi-tenant hashring for a given slice of
// groups.
// Which hashring to use for a tenant is determined
Expand All @@ -296,10 +285,6 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
if h.Algorithm != "" {
activeAlgorithm = h.Algorithm
}
err = validateAZHashring(h.Endpoints, activeAlgorithm)
if err != nil {
return nil, err
}
hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
if err != nil {
return nil, err
Expand All @@ -317,17 +302,17 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
return m, nil
}

func newHashring(algorithm HashringAlgorithm, endpoints interface{}, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints.([]string)), nil
return newSimpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints.([]string)), nil
return newSimpleHashring(endpoints)
}
}
Loading

0 comments on commit 5fb2e31

Please sign in to comment.