From ae47575db1ad14989d56523fb15967764002672f Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 25 May 2023 11:52:08 +0200 Subject: [PATCH] receive: pass endpoints in hashring config as object Signed-off-by: Michael Hoffmann --- pkg/receive/config.go | 29 +++++-- pkg/receive/config_test.go | 15 +++- pkg/receive/handler_test.go | 2 +- pkg/receive/hashring.go | 63 ++++++--------- pkg/receive/hashring_test.go | 150 +++++++++++++++-------------------- pkg/receive/receive_test.go | 58 +++++++------- 6 files changed, 152 insertions(+), 165 deletions(-) diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 7d4f980677d..2772e1fd989 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -37,17 +37,35 @@ const ( RouterIngestor ReceiverMode = "RouterIngestor" ) -type AZAwareEndpoint struct { +type Endpoint struct { Address string `json:"address"` AZ string `json:"az"` } +func (e *Endpoint) UnmarshalJSON(data []byte) error { + // First try to unmarshal as a string. + err := json.Unmarshal(data, &e.Address) + if err == nil { + return nil + } + + // If that fails, try to unmarshal as an endpoint object. + type endpointAlias Endpoint + var configEndpoint endpointAlias + err = json.Unmarshal(data, &configEndpoint) + if err == nil { + e.Address = configEndpoint.Address + e.AZ = configEndpoint.AZ + } + return err +} + // HashringConfig represents the configuration for a hashring // a receive node knows about. type HashringConfig struct { Hashring string `json:"hashring,omitempty"` Tenants []string `json:"tenants,omitempty"` - Endpoints []string `json:"endpoints"` + Endpoints []Endpoint `json:"endpoints"` Algorithm HashringAlgorithm `json:"algorithm,omitempty"` ExternalLabels map[string]string `json:"external_labels,omitempty"` } @@ -248,12 +266,7 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) { cw.lastSuccessTimeGauge.SetToCurrentTime() for _, c := range config { - switch v := c.Endpoints.(type) { - case []string: - cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(v))) - case []AZAwareEndpoint: - cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(v))) - } + cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints))) cw.hashringTenantsGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Tenants))) } diff --git a/pkg/receive/config_test.go b/pkg/receive/config_test.go index 47146fb424f..c1230f327e6 100644 --- a/pkg/receive/config_test.go +++ b/pkg/receive/config_test.go @@ -38,7 +38,7 @@ func TestValidateConfig(t *testing.T) { name: "valid config", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, err: nil, // means it's valid. @@ -71,3 +71,16 @@ func TestValidateConfig(t *testing.T) { }) } } + +func TestUnmarshalEndpointSlice(t *testing.T) { + t.Run("Endpoints as string slice", func(t *testing.T) { + var endpoints []Endpoint + testutil.Ok(t, json.Unmarshal([]byte(`["node-1"]`), &endpoints)) + testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1"}}) + }) + t.Run("Endpoints as endpoints slice", func(t *testing.T) { + var endpoints []Endpoint + testutil.Ok(t, json.Unmarshal([]byte(`[{"address": "node-1", "az": "az-1"}]`), &endpoints)) + testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1", AZ: "az-1"}}) + }) +} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index cbf75874028..ae4ec9b9850 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -195,7 +195,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin h.peers = peers addr := ag.newAddr() h.options.Endpoint = addr - cfg[0].Endpoints = append(cfg[0].Endpoints.([]string), h.options.Endpoint) + cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint}) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } // Use hashmod as default. diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index c7e1cfa038b..c8e2ccf2c04 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -76,6 +76,17 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri // simpleHashring represents a group of nodes handling write requests by hashmoding individual series. type simpleHashring []string +func newSimpleHashring(endpoints []Endpoint) (Hashring, error) { + addresses := make([]string, len(endpoints)) + for i := range endpoints { + if endpoints[i].AZ != "" { + return nil, errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.") + } + addresses[i] = endpoints[i].Address + } + return simpleHashring(addresses), nil +} + // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { return s.GetN(tenant, ts, 0) @@ -106,32 +117,23 @@ func (p sections) Sort() { sort.Sort(p) } // ketamaHashring represents a group of nodes handling write requests with consistent hashing. type ketamaHashring struct { - endpoints []AZAwareEndpoint + endpoints []Endpoint sections sections numEndpoints uint64 } -func newKetamaHashring(endpoints interface{}, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { - azEndpoints := []AZAwareEndpoint{} +func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { + numSections := len(endpoints) * sectionsPerNode - switch v := endpoints.(type) { - case []string: - for _, endpoint := range v { - azEndpoints = append(azEndpoints, AZAwareEndpoint{Address: endpoint, AZ: ""}) - } - case []AZAwareEndpoint: - azEndpoints = v - } - numSections := len(azEndpoints) * sectionsPerNode - - if len(azEndpoints) < int(replicationFactor) { + if len(endpoints) < int(replicationFactor) { return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor") } - availabilityZones := make(map[string]struct{}) hash := xxhash.New() + availabilityZones := make(map[string]struct{}) ringSections := make(sections, 0, numSections) - for endpointIndex, endpoint := range azEndpoints { + for endpointIndex, endpoint := range endpoints { + availabilityZones[endpoint.AZ] = struct{}{} for i := 1; i <= sectionsPerNode; i++ { _, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i))) n := §ion{ @@ -149,17 +151,14 @@ func newKetamaHashring(endpoints interface{}, sectionsPerNode int, replicationFa calculateSectionReplicas(ringSections, replicationFactor, availabilityZones) return &ketamaHashring{ - endpoints: azEndpoints, + endpoints: endpoints, sections: ringSections, - numEndpoints: uint64(len(azEndpoints)), + numEndpoints: uint64(len(endpoints)), }, nil } func getMinAz(m map[string]int64) int64 { - var minValue int64 - - minValue = math.MaxInt64 - + minValue := int64(math.MaxInt64) for _, value := range m { if value < minValue { minValue = value @@ -270,16 +269,6 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st return "", errors.New("no matching hashring to handle tenant") } -func validateAZHashring(endpoints interface{}, algorithm HashringAlgorithm) error { - switch endpoints.(type) { - case []string: - if algorithm == AlgorithmHashmod { - return errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.") - } - } - return nil -} - // newMultiHashring creates a multi-tenant hashring for a given slice of // groups. // Which hashring to use for a tenant is determined @@ -296,10 +285,6 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg if h.Algorithm != "" { activeAlgorithm = h.Algorithm } - err = validateAZHashring(h.Endpoints, activeAlgorithm) - if err != nil { - return nil, err - } hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) if err != nil { return nil, err @@ -317,10 +302,10 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg return m, nil } -func newHashring(algorithm HashringAlgorithm, endpoints interface{}, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { +func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { switch algorithm { case AlgorithmHashmod: - return simpleHashring(endpoints.([]string)), nil + return newSimpleHashring(endpoints) case AlgorithmKetama: return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) default: @@ -328,6 +313,6 @@ func newHashring(algorithm HashringAlgorithm, endpoints interface{}, replication level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.", "hashring", hashring, "tenants", tenants) - return simpleHashring(endpoints.([]string)), nil + return newSimpleHashring(endpoints) } } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 59420c16422..68ab3a85019 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -47,7 +47,7 @@ func TestHashringGet(t *testing.T) { name: "simple", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, nodes: map[string]struct{}{"node1": {}}, @@ -56,11 +56,11 @@ func TestHashringGet(t *testing.T) { name: "specific", cfg: []HashringConfig{ { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, nodes: map[string]struct{}{"node2": {}}, @@ -70,15 +70,15 @@ func TestHashringGet(t *testing.T) { name: "many tenants", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node3"}, + Endpoints: []Endpoint{{Address: "node3"}}, Tenants: []string{"tenant3"}, }, }, @@ -89,15 +89,15 @@ func TestHashringGet(t *testing.T) { name: "many tenants error", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node3"}, + Endpoints: []Endpoint{{Address: "node3"}}, Tenants: []string{"tenant3"}, }, }, @@ -107,11 +107,11 @@ func TestHashringGet(t *testing.T) { name: "many nodes", cfg: []HashringConfig{ { - Endpoints: []string{"node1", "node2", "node3"}, + Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node4", "node5", "node6"}, + Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}}, }, }, nodes: map[string]struct{}{ @@ -125,11 +125,11 @@ func TestHashringGet(t *testing.T) { name: "many nodes default", cfg: []HashringConfig{ { - Endpoints: []string{"node1", "node2", "node3"}, + Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node4", "node5", "node6"}, + Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}}, }, }, nodes: map[string]struct{}{ @@ -170,53 +170,53 @@ func TestKetamaHashringGet(t *testing.T) { } tests := []struct { name string - nodes []string + endpoints []Endpoint expectedNode string ts *prompb.TimeSeries n uint64 }{ { name: "base case", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, expectedNode: "node-2", }, { name: "base case with replication", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, n: 1, expectedNode: "node-1", }, { name: "base case with replication", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, n: 2, expectedNode: "node-3", }, { name: "base case with replication and reordered nodes", - nodes: []string{"node-1", "node-3", "node-2"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-3"}, {Address: "node-2"}}, ts: baseTS, n: 2, expectedNode: "node-3", }, { name: "base case with new node at beginning of ring", - nodes: []string{"node-0", "node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-0"}, {Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, expectedNode: "node-2", }, { name: "base case with new node at end of ring", - nodes: []string{"node-1", "node-2", "node-3", "node-4"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}, {Address: "node-4"}}, ts: baseTS, expectedNode: "node-2", }, { - name: "base case with different timeseries", - nodes: []string{"node-1", "node-2", "node-3"}, + name: "base case with different timeseries", + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -231,7 +231,7 @@ func TestKetamaHashringGet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hashRing, err := newKetamaHashring(test.nodes, 10, test.n+1) + hashRing, err := newKetamaHashring(test.endpoints, 10, test.n+1) require.NoError(t, err) result, err := hashRing.GetN("tenant", test.ts, test.n) @@ -242,18 +242,18 @@ func TestKetamaHashringGet(t *testing.T) { } func TestKetamaHashringBadConfigIsRejected(t *testing.T) { - _, err := newKetamaHashring([]string{"node-1"}, 1, 2) + _, err := newKetamaHashring([]Endpoint{{Address: "node-1"}}, 1, 2) require.Error(t, err) } func TestKetamaHashringConsistency(t *testing.T) { series := makeSeries() - ringA := []string{"node-1", "node-2", "node-3"} + ringA := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} a1, err := assignSeries(series, ringA) require.NoError(t, err) - ringB := []string{"node-1", "node-2", "node-3"} + ringB := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} a2, err := assignSeries(series, ringB) require.NoError(t, err) @@ -269,18 +269,18 @@ func TestKetamaHashringConsistency(t *testing.T) { func TestKetamaHashringIncreaseAtEnd(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-2", "node-3"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} initialAssignments, err := assignSeries(series, initialRing) require.NoError(t, err) - resizedRing := []string{"node-1", "node-2", "node-3", "node-4", "node-5"} + resizedRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}, {Address: "node-4"}, {Address: "node-5"}} reassignments, err := assignSeries(series, resizedRing) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } @@ -289,18 +289,18 @@ func TestKetamaHashringIncreaseAtEnd(t *testing.T) { func TestKetamaHashringIncreaseInMiddle(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-3"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-3"}} initialAssignments, err := assignSeries(series, initialRing) require.NoError(t, err) - resizedRing := []string{"node-1", "node-2", "node-3"} + resizedRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} reassignments, err := assignSeries(series, resizedRing) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } @@ -309,23 +309,22 @@ func TestKetamaHashringIncreaseInMiddle(t *testing.T) { func TestKetamaHashringReplicationConsistency(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-4", "node-5"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-4"}, {Address: "node-5"}} initialAssignments, err := assignReplicatedSeries(series, initialRing, 2) require.NoError(t, err) - resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"} + resizedRing := []Endpoint{{Address: "node-4"}, {Address: "node-3"}, {Address: "node-1"}, {Address: "node-2"}, {Address: "node-5"}} reassignments, err := assignReplicatedSeries(series, resizedRing, 2) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } } - func TestKetamaHashringEvenAZSpread(t *testing.T) { tenant := "default-tenant" ts := &prompb.TimeSeries{ @@ -334,11 +333,11 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { } for _, tt := range []struct { - nodes interface{} + nodes []Endpoint replicas uint64 }{ { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "1"}, @@ -347,11 +346,11 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { replicas: 1, }, { - nodes: []string{"a", "b", "c", "d"}, + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}}, replicas: 1, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "1"}, @@ -360,7 +359,7 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { replicas: 2, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, @@ -368,14 +367,14 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { {Address: "e", AZ: "2"}, {Address: "f", AZ: "3"}, }, - replicas: 4, + replicas: 3, }, { - nodes: []string{"a", "b", "c", "d", "e", "f"}, + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}, {Address: "e"}, {Address: "f"}, {Address: "g"}}, replicas: 3, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, @@ -397,13 +396,8 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { testutil.Ok(t, err) availableAzs := make(map[string]int64) - switch v := tt.nodes.(type) { - case []string: - availableAzs[""] = 0 - case []AZAwareEndpoint: - for _, endpoint := range v { - availableAzs[endpoint.AZ] = 0 - } + for _, endpoint := range tt.nodes { + availableAzs[endpoint.AZ] = 0 } azSpread := make(map[string]int64) @@ -411,22 +405,11 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { r, err := hashRing.GetN(tenant, ts, uint64(i)) testutil.Ok(t, err) - switch v := tt.nodes.(type) { - case []string: - for _, n := range v { - az := "" - if !strings.HasPrefix(n, r) { - continue - } - azSpread[az]++ - } - case []AZAwareEndpoint: - for _, n := range v { - if !strings.HasPrefix(n.Address, r) { - continue - } - azSpread[n.AZ]++ + for _, n := range tt.nodes { + if !strings.HasPrefix(n.Address, r) { + continue } + azSpread[n.AZ]++ } } @@ -449,12 +432,12 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { tenant := "default-tenant" for _, tt := range []struct { - nodes interface{} + nodes []Endpoint replicas uint64 numSeries uint64 }{ { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "1"}, @@ -464,12 +447,12 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { numSeries: 1000, }, { - nodes: []string{"a", "b", "c", "d"}, + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}}, replicas: 2, numSeries: 1000, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, @@ -481,7 +464,7 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { numSeries: 10000, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, @@ -496,7 +479,7 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { numSeries: 10000, }, { - nodes: []AZAwareEndpoint{ + nodes: []Endpoint{ {Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, @@ -514,13 +497,7 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { t.Run("", func(t *testing.T) { hashRing, err := newKetamaHashring(tt.nodes, SectionsPerNode, tt.replicas) testutil.Ok(t, err) - var optimalSpread int - switch v := tt.nodes.(type) { - case []string: - optimalSpread = int(tt.numSeries*tt.replicas) / len(v) - case []AZAwareEndpoint: - optimalSpread = int(tt.numSeries*tt.replicas) / len(v) - } + optimalSpread := int(tt.numSeries*tt.replicas) / len(tt.nodes) nodeSpread := make(map[string]int) for i := 0; i < int(tt.numSeries); i++ { ts := &prompb.TimeSeries{ @@ -550,14 +527,13 @@ func TestInvalidAZHashringCfg(t *testing.T) { expectedError string }{ { - cfg: []HashringConfig{{Endpoints: []string{"a,1", "b,2", "c,1", "d,2"}}}, + cfg: []HashringConfig{{Endpoints: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}}}}, replicas: 2, - algorithm: AlgorithmHashmod, expectedError: "Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.", }, } { t.Run("", func(t *testing.T) { - _, err := newMultiHashring(tt.algorithm, tt.replicas, tt.cfg) + _, err := NewMultiHashring(tt.algorithm, tt.replicas, tt.cfg) require.EqualError(t, err, tt.expectedError) }) } @@ -591,11 +567,11 @@ func findSeries(initialAssignments map[string][]prompb.TimeSeries, node string, return false } -func assignSeries(series []prompb.TimeSeries, nodes []string) (map[string][]prompb.TimeSeries, error) { +func assignSeries(series []prompb.TimeSeries, nodes []Endpoint) (map[string][]prompb.TimeSeries, error) { return assignReplicatedSeries(series, nodes, 0) } -func assignReplicatedSeries(series []prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]prompb.TimeSeries, error) { +func assignReplicatedSeries(series []prompb.TimeSeries, nodes []Endpoint, replicas uint64) (map[string][]prompb.TimeSeries, error) { hashRing, err := newKetamaHashring(nodes, SectionsPerNode, replicas) if err != nil { return nil, err diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 7c2fae78b7f..55b59d182a2 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -38,7 +38,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - No labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, }, @@ -50,7 +50,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - One label", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -65,7 +65,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - Multiple labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -83,7 +83,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - No labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, }, }, @@ -97,7 +97,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - One label", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -114,7 +114,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - Multiple labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -136,7 +136,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple hashrings - No repeated tenants", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -145,7 +145,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name6": "value6", @@ -173,7 +173,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple hashrings - One repeated tenant", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -182,7 +182,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -245,7 +245,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -265,7 +265,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { changedConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3", "tenant4", "tenant5"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -354,7 +354,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -363,7 +363,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name6": "value6", @@ -396,7 +396,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Adding labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -406,7 +406,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -435,7 +435,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Deleting some labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -443,7 +443,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -470,11 +470,11 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Deleting all labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, }, }, @@ -491,7 +491,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Changing values of some labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value3", @@ -500,7 +500,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value6", @@ -584,7 +584,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -593,7 +593,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -624,7 +624,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { name: "Adding labels in first hashring that tenant appears", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -634,7 +634,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -660,7 +660,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { name: "Adding labels in second hashring that tenant appears", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -669,7 +669,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -752,7 +752,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { cfg := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "replica": "0",