Skip to content

Commit

Permalink
receive: add some tests for consistent hashing in presence of AZs
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
mhoffm-aiven committed May 26, 2023
1 parent 346500d commit ade433f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFac
}, nil
}

func getMinAz(m map[string]int64) int64 {
func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 {
minValue := int64(math.MaxInt64)
for _, value := range m {
for _, value := range azSpread {
if value < minValue {
minValue = value
}
Expand All @@ -184,7 +184,7 @@ func calculateSectionReplicas(ringSections sections, replicationFactor uint64, a
if _, ok := replicas[rep.endpointIndex]; ok {
continue
}
if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > getMinAz(azSpread) {
if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > sizeOfLeastOccupiedAZ(azSpread) {
// We want to ensure even AZ spread before we add more replicas within the same AZ
continue
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,59 @@ func TestKetamaHashringReplicationConsistency(t *testing.T) {
}
}
}

func TestKetamaHashringReplicationConsistencyWithAZs(t *testing.T) {
for _, tt := range []struct {
initialRing []Endpoint
resizedRing []Endpoint
replicas uint64
}{
{
initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}},
resizedRing: []Endpoint{{Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "a", AZ: "1"}, {Address: "d", AZ: "2"}, {Address: "e", AZ: "4"}},
replicas: 3,
},
{
initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}},
resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "1"}, {Address: "e", AZ: "2"}, {Address: "f", AZ: "3"}},
replicas: 3,
},
{
initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}},
resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}},
replicas: 3,
},
{
initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}},
resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}},
replicas: 2,
},
{
initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "f", AZ: "3"}},
resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "d", AZ: "2"}, {Address: "f", AZ: "3"}},
replicas: 2,
},
} {
t.Run("", func(t *testing.T) {
series := makeSeries()

initialAssignments, err := assignReplicatedSeries(series, tt.initialRing, tt.replicas)
require.NoError(t, err)

reassignments, err := assignReplicatedSeries(series, tt.resizedRing, tt.replicas)
require.NoError(t, err)

// Assert that the initial nodes have no new keys after increasing the ring size
for _, node := range tt.initialRing {
for _, ts := range reassignments[node.Address] {
foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts)
require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node)
}
}
})
}
}

func TestKetamaHashringEvenAZSpread(t *testing.T) {
tenant := "default-tenant"
ts := &prompb.TimeSeries{
Expand Down

0 comments on commit ade433f

Please sign in to comment.