-
Notifications
You must be signed in to change notification settings - Fork 69
/
test_sasl_auth.js
168 lines (153 loc) · 3.86 KB
/
test_sasl_auth.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 JSON messages per iteration. It
also uses SASL authentication.
*/
import { check } from "k6";
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_JSON,
SASL_PLAIN,
TLS_1_2,
} from "k6/x/kafka"; // import kafka extension
export const options = {
// This is used for testing purposes. For real-world use, you should use your own options:
// https://k6.io/docs/using-k6/k6-options/
scenarios: {
sasl_auth: {
executor: "constant-vus",
vus: 1,
duration: "10s",
gracefulStop: "1s",
},
},
};
const brokers = ["localhost:9092"];
const topic = "xk6_kafka_json_topic";
// SASL config is optional
const saslConfig = {
username: "client",
password: "client-secret",
// Possible values for the algorithm is:
// NONE (default)
// SASL_PLAIN
// SASL_SCRAM_SHA256
// SASL_SCRAM_SHA512
// SASL_SSL (must enable TLS)
// SASL_AWS_IAM (configurable via env or AWS IAM config files - no username/password needed)
algorithm: SASL_PLAIN,
};
// TLS config is optional
const tlsConfig = {
// Enable/disable TLS (default: false)
enableTls: false,
// Skip TLS verification if the certificate is invalid or self-signed (default: false)
insecureSkipTlsVerify: false,
// Possible values:
// TLS_1_0
// TLS_1_1
// TLS_1_2 (default)
// TLS_1_3
minVersion: TLS_1_2,
// Only needed if you have a custom or self-signed certificate and keys
// clientCertPem: "/path/to/your/client.pem",
// clientKeyPem: "/path/to/your/client-key.pem",
// serverCaPem: "/path/to/your/ca.pem",
};
const offset = 0;
// partition and groupId are mutually exclusive
const partition = 0;
const numPartitions = 1;
const replicationFactor = 1;
const writer = new Writer({
brokers: brokers,
topic: topic,
sasl: saslConfig,
tls: tlsConfig,
});
const reader = new Reader({
brokers: brokers,
topic: topic,
partition: partition,
offset: offset,
sasl: saslConfig,
tls: tlsConfig,
});
const connection = new Connection({
address: brokers[0],
sasl: saslConfig,
tls: tlsConfig,
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({
topic: topic,
numPartitions: numPartitions,
replicationFactor: replicationFactor,
});
console.log(
"Existing topics: ",
connection.listTopics(saslConfig, tlsConfig),
);
}
export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
key: schemaRegistry.serialize({
data: {
correlationId: "test-id-abc-" + index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
value: schemaRegistry.serialize({
data: {
name: "xk6-kafka",
},
schemaType: SCHEMA_TYPE_JSON,
}),
},
{
key: schemaRegistry.serialize({
data: {
correlationId: "test-id-def-" + index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
value: schemaRegistry.serialize({
data: {
name: "xk6-kafka",
},
schemaType: SCHEMA_TYPE_JSON,
}),
},
];
writer.produce({ messages: messages });
}
// Read 10 messages only
let messages = reader.consume({ limit: 10 });
check(messages, {
"10 messages returned": (msgs) => msgs.length == 10,
"key is correct": (msgs) =>
schemaRegistry
.deserialize({ data: msgs[0].key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
"value is correct": (msgs) =>
schemaRegistry.deserialize({
data: msgs[0].value,
schemaType: SCHEMA_TYPE_JSON,
}).name == "xk6-kafka",
});
}
export function teardown(data) {
if (__VU == 0) {
// Delete the topic
connection.deleteTopic(topic);
}
writer.close();
reader.close();
connection.close();
}