forked from mostafa/xk6-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
topic_test.go
120 lines (102 loc) · 3.46 KB
/
topic_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package kafka
import (
"testing"
"github.com/grafana/sobek"
kafkago "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestGetKafkaControllerConnection tests whether a connection can be established to a kafka broker.
func TestGetKafkaControllerConnection(t *testing.T) {
test := getTestModuleInstance(t)
assert.NotPanics(t, func() {
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
assert.NotNil(t, connection)
connection.Close()
})
}
// TestGetKafkaControllerConnectionFails tests whether a connection can be established
// to a kafka broker and fails if the given broker is not reachable.
func TestGetKafkaControllerConnectionFails(t *testing.T) {
test := getTestModuleInstance(t)
assert.Panics(t, func() {
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9094",
})
assert.Nil(t, connection)
})
}
// TestTopics tests various functions to create, delete and list topics.
func TestTopics(t *testing.T) {
test := getTestModuleInstance(t)
require.NoError(t, test.moveToVUCode())
assert.NotPanics(t, func() {
topic := "test-topics"
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
test.module.Kafka.createTopic(connection, &kafkago.TopicConfig{
Topic: topic,
})
topics := test.module.Kafka.listTopics(connection)
assert.Contains(t, topics, topic)
test.module.Kafka.deleteTopic(connection, topic)
topics = test.module.Kafka.listTopics(connection)
assert.NotContains(t, topics, topic)
connection.Close()
})
}
// TestConnectionClass tests the connection class that is exported to JS.
func TestConnectionClass(t *testing.T) {
test := getTestModuleInstance(t)
require.NoError(t, test.moveToVUCode())
assert.NotPanics(t, func() {
// Create a connection
connection := test.module.Kafka.connectionClass(sobek.ConstructorCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"url": "localhost:9092",
},
),
},
})
assert.NotNil(t, connection)
// Create a topic
createTopic := connection.Get("createTopic").Export().(func(sobek.FunctionCall) sobek.Value)
assert.NotNil(t, createTopic)
result := createTopic(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"topic": "test-connection-class",
},
),
},
}).Export()
assert.Nil(t, result)
// List all topics
listTopics := connection.Get("listTopics").Export().(func(sobek.FunctionCall) sobek.Value)
assert.NotNil(t, listTopics)
allTopics := listTopics(sobek.FunctionCall{}).Export().([]string)
assert.Contains(t, allTopics, "test-connection-class")
// Delete the topic
deleteTopic := connection.Get("deleteTopic").Export().(func(sobek.FunctionCall) sobek.Value)
assert.NotNil(t, deleteTopic)
result = deleteTopic(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue("test-connection-class"),
},
}).Export()
assert.Nil(t, result)
allTopics = listTopics(sobek.FunctionCall{}).Export().([]string)
assert.NotContains(t, allTopics, "test-connection-class")
// Close the connection
close := connection.Get("close").Export().(func(sobek.FunctionCall) sobek.Value)
assert.NotNil(t, close)
result = close(sobek.FunctionCall{}).Export()
assert.Nil(t, result)
})
}