Skip to content

Commit

Permalink
Address PR comments - part 5
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc committed Jul 27, 2023
1 parent 4538862 commit cae78f5
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
11 changes: 0 additions & 11 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string {
return brokerAzMap
}

func getBrokerRack(readOnlyConfig string) string {
if readOnlyConfig == "" {
return ""
}
match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig)
if len(match) == 2 {
return match[1]
}
return ""
}

func getCreatedPvcForBroker(
ctx context.Context,
c client.Reader,
Expand Down
82 changes: 82 additions & 0 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,3 +1526,85 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
})
}
}

func TestGetBrokerAzMap(t *testing.T) {
t.Parallel()
testCases := []struct {
testName string
kafkaCluster v1beta1.KafkaCluster
expectedAzMap map[int32]string
}{
{
testName: "Brokers have different AZs if no broker rack value is set",
kafkaCluster: v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: ""},
{Id: 201, ReadOnlyConfig: ""},
{Id: 301, ReadOnlyConfig: ""},
},
},
},
expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"},
},
{
testName: "Brokers have different AZs if one broker has no broker rack value set",
kafkaCluster: v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 102, ReadOnlyConfig: ""},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 202, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
{Id: 302, ReadOnlyConfig: "broker.rack=az3"},
},
},
},
expectedAzMap: map[int32]string{101: "101", 102: "102", 201: "201", 202: "202", 301: "301", 302: "302"},
},
{
testName: "Brokers have different AZs if read only configs is a corrupted string for one broker",
kafkaCluster: v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack;az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
},
},
expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"},
},
{
testName: "Brokers have correct AZs if read only configs is valid for all brokers",
kafkaCluster: v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az-1"},
{Id: 102, ReadOnlyConfig: "broker.rack=az-1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az-2"},
{Id: 202, ReadOnlyConfig: "broker.rack=az-2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az-3"},
{Id: 302, ReadOnlyConfig: "broker.rack=az-3"},
},
},
},
expectedAzMap: map[int32]string{
101: "az-1",
102: "az-1",
201: "az-2",
202: "az-2",
301: "az-3",
302: "az-3",
},
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
azMap := getBrokerAzMap(&test.kafkaCluster)
assert.Equal(t, test.expectedAzMap, azMap)
})
}
}

0 comments on commit cae78f5

Please sign in to comment.