Skip to content

Commit

Permalink
Raise error when the reader cannot consume anymore (#287)
Browse files Browse the repository at this point in the history
* Raise an error to be caught by JS when the reader cannot consume anymore due to errors
* Add try/catch to test_json.js for demo
* Fix tests to reflect changes
* Bump golang.org/x/net from 0.22.0 to 0.23.0

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.22.0 to 0.23.0.
- [Commits](golang/net@v0.22.0...v0.23.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
mostafa and dependabot[bot] authored Apr 29, 2024
1 parent 0fdd2e1 commit f07390e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .golanci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ linters:
- exhaustruct
- gocognit
- gochecknoinits
- gocyclo
- gocyclo
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,15 @@ func (k *Kafka) consume(

err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil)
logger.WithField("error", err).Info(err)
return messages
common.Throw(k.vu.Runtime(), err)
}

if err != nil {
k.reportReaderStats(reader.Stats())

err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil)
logger.WithField("error", err).Error(err)
return messages
common.Throw(k.vu.Runtime(), err)
}

var messageTime string
Expand Down
4 changes: 2 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConsumerMaxWaitExceeded(t *testing.T) {
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.NotPanics(t, func() {
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestConsumerContextCancelled(t *testing.T) {
test.cancelContext()

// Consume a message in the VU function.
assert.NotPanics(t, func() {
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
Expand Down
60 changes: 32 additions & 28 deletions scripts/test_json.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,40 @@ export default function () {
writer.produce({ messages: messages });
}

// Read 10 messages only
let messages = reader.consume({ limit: 10 });
try {
// Read 10 messages only
let messages = reader.consume({ limit: 10 });

check(messages, {
"10 messages are received": (messages) => messages.length == 10,
});
check(messages, {
"10 messages are received": (messages) => messages.length == 10,
});

check(messages[0], {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
"Key contains key/value and is JSON": (msg) =>
schemaRegistry
.deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
"Value contains key/value and is JSON": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}) == "object" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}).name == "xk6-kafka",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers &&
String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Partition is zero": (msg) => msg["partition"] == 0,
"Offset is gte zero": (msg) => msg["offset"] >= 0,
"High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
});
check(messages[0], {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
"Key contains key/value and is JSON": (msg) =>
schemaRegistry
.deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
"Value contains key/value and is JSON": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}) == "object" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}).name == "xk6-kafka",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers &&
String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Partition is zero": (msg) => msg["partition"] == 0,
"Offset is gte zero": (msg) => msg["offset"] >= 0,
"High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
});
} catch (error) {
console.error(error);
}
}

export function teardown(data) {
Expand Down

0 comments on commit f07390e

Please sign in to comment.