From ade433f4733398ba1826ab6f0eb1e5fee5d4b755 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Fri, 26 May 2023 13:47:17 +0200 Subject: [PATCH] receive: add some tests for consistent hashing in presence of AZs Signed-off-by: Michael Hoffmann --- pkg/receive/hashring.go | 6 ++-- pkg/receive/hashring_test.go | 53 ++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index c8e2ccf2c04..18925cc4cc2 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -157,9 +157,9 @@ func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFac }, nil } -func getMinAz(m map[string]int64) int64 { +func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 { minValue := int64(math.MaxInt64) - for _, value := range m { + for _, value := range azSpread { if value < minValue { minValue = value } @@ -184,7 +184,7 @@ func calculateSectionReplicas(ringSections sections, replicationFactor uint64, a if _, ok := replicas[rep.endpointIndex]; ok { continue } - if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > getMinAz(azSpread) { + if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > sizeOfLeastOccupiedAZ(azSpread) { // We want to ensure even AZ spread before we add more replicas within the same AZ continue } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 68ab3a85019..4f1b549d38f 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -325,6 +325,59 @@ func TestKetamaHashringReplicationConsistency(t *testing.T) { } } } + +func TestKetamaHashringReplicationConsistencyWithAZs(t *testing.T) { + for _, tt := range []struct { + initialRing []Endpoint + resizedRing []Endpoint + replicas uint64 + }{ + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "a", AZ: "1"}, {Address: "d", AZ: "2"}, {Address: "e", AZ: "4"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "1"}, {Address: "e", AZ: "2"}, {Address: "f", AZ: "3"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}}, + replicas: 2, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "f", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "d", AZ: "2"}, {Address: "f", AZ: "3"}}, + replicas: 2, + }, + } { + t.Run("", func(t *testing.T) { + series := makeSeries() + + initialAssignments, err := assignReplicatedSeries(series, tt.initialRing, tt.replicas) + require.NoError(t, err) + + reassignments, err := assignReplicatedSeries(series, tt.resizedRing, tt.replicas) + require.NoError(t, err) + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range tt.initialRing { + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) + require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) + } + } + }) + } +} + func TestKetamaHashringEvenAZSpread(t *testing.T) { tenant := "default-tenant" ts := &prompb.TimeSeries{