Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header support for consumer/producer records #1086

Merged
merged 17 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
authors = ["Ballerina"]
keywords = ["kafka", "event streaming", "network", "messaging"]
repository = "https://github.com/ballerina-platform/module-ballerinax-kafka"
Expand All @@ -15,8 +15,8 @@ graalvmCompatible = true
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "kafka-native"
version = "3.10.0"
path = "../native/build/libs/kafka-native-3.10.0.jar"
version = "3.10.1"
path = "../native/build/libs/kafka-native-3.10.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.apache.kafka"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin"
class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.0.jar"
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.1-SNAPSHOT.jar"
9 changes: 5 additions & 4 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "cache"
version = "3.7.0"
version = "3.7.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "constraint"},
Expand Down Expand Up @@ -71,7 +71,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.0"
version = "2.10.11"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "auth"},
Expand Down Expand Up @@ -283,7 +283,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.0"
version = "1.2.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -325,6 +325,7 @@ modules = [
org = "ballerina"
name = "time"
version = "2.4.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -389,7 +390,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
16 changes: 11 additions & 5 deletions ballerina/kafka_records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public type TopicPartition record {|
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord
# instead to support data-binding
Expand All @@ -206,6 +207,7 @@ public type ConsumerRecord record {|
byte[] value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

# Type related to anydata consumer record.
Expand All @@ -214,11 +216,13 @@ public type ConsumerRecord record {|
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
public type AnydataConsumerRecord record {|
anydata key?;
anydata value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]> headers;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
|};

# Subtype related to `kafka:AnydataConsumerRecord` record.
Expand Down Expand Up @@ -250,17 +254,19 @@ public type ProducerRecord record {|

# Details related to the anydata producer record.
#
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
public type AnydataProducerRecord record {|
string topic;
anydata key?;
anydata value;
int timestamp?;
int partition?;
map<byte[]|byte[][]> headers?;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
|};

# Subtype related to `kafka:AnydataProducerRecord` record.
Expand Down
2 changes: 1 addition & 1 deletion ballerina/producer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public client isolated class Producer {
} else if anydataKey !is () {
key = anydataKey.toJsonString().toBytes();
}
return sendByteArrayValues(self, value, producerRecord.topic, key,
return sendByteArrayValues(self, value, producerRecord.topic, producerRecord.headers, key,
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType);
}
}
10 changes: 5 additions & 5 deletions ballerina/producer_utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

import ballerina/jballerina.java;

isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, anydata? key, int? partition,
isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, map<byte[]|byte[][]>? headers, anydata? key, int? partition,
int? timestamp, string keySerializerType) returns Error? {
if key is () {
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp);
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers);
}
if keySerializerType == SER_BYTE_ARRAY {
if key is byte[] {
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp);
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp, headers);
}
panic getKeyTypeMismatchError(BYTE_ARRAY);
}
}

//Send byte[] values with different types of keys
isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (),
int? timestamp = ()) returns Error? =
int? timestamp = (), map<byte[]|byte[][]>? headers = ()) returns Error? =

Check warning on line 34 in ballerina/producer_utils.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/producer_utils.bal#L34

Added line #L34 was not covered by tests
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;

isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key,
int? partition = (), int? timestamp = ()) returns Error? =
int? partition = (), int? timestamp = (), map<byte[]|byte[][]>? headers = ()) returns Error? =

Check warning on line 40 in ballerina/producer_utils.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/producer_utils.bal#L40

Added line #L40 was not covered by tests
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;
24 changes: 22 additions & 2 deletions ballerina/tests/consumer_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,26 @@ function commitOffsetWithPolledOffsetValue() returns error? {
check consumer->close();
}

function sendMessage(anydata message, string topic, anydata? key = ()) returns error? {
return producer->send({ topic: topic, value: message, key });
@test:Config {enable: true}
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
function consumerReadHeadersTest() returns error? {
string topic = "consumer-read-headers-test-topic";
kafkaTopics.push(topic);
map<byte[]|byte[][]>? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()};
check sendMessage(TEST_MESSAGE.toBytes(), topic, (), headers);
ConsumerConfiguration consumerConfiguration = {
topics: [topic],
offsetReset: OFFSET_RESET_EARLIEST,
groupId: "consumer-read-headers-test-group",
clientId: "test-consumer-61"
};
Consumer consumer = check new (DEFAULT_URL, consumerConfiguration);
BytesConsumerRecord[] consumerRecords = check consumer->poll(5);
test:assertEquals(consumerRecords.length(), 1);
map<byte[]|byte[][]> receivedHeaders = consumerRecords[0].headers;
test:assertEquals(receivedHeaders, headers);
check consumer->close();
}

function sendMessage(anydata message, string topic, anydata? key = (), map<byte[]|byte[][]>? headers = ()) returns error? {
return producer->send({ topic: topic, value: message, key, headers });
}
3 changes: 1 addition & 2 deletions ballerina/tests/consumer_constraint_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ public type StringConstraintConsumerRecord record {|
|};

public type IntConstraintConsumerRecord record {|
*AnydataConsumerRecord;
int key?;
@constraint:Int {
maxValue: 100,
minValue: 10
}
int value;
int timestamp;
PartitionOffset offset;
|};

@constraint:Float {
Expand Down
31 changes: 31 additions & 0 deletions ballerina/tests/listener_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ string detachMsg1 = "";
string detachMsg2 = "";
string incorrectEndpointMsg = "";
string receivedTimeoutConfigValue = "";
map<byte[]|byte[][]> receivedHeaders = {};

int receivedMsgCount = 0;

Expand Down Expand Up @@ -986,3 +987,33 @@ function listenerWithPollTimeoutConfigTest() returns error? {
check configListener.gracefulStop();
test:assertEquals(receivedTimeoutConfigValue, TEST_MESSAGE);
}

@test:Config {enable: true}
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
function listenerWithConsumerHeadersTest() returns error? {
string topic = "listener-consumer-headers-test-topic";
kafkaTopics.push(topic);
map<byte[]|byte[][]>? headers = {"key1": ["header1".toBytes(), "header2".toBytes()], "key2": "header3".toBytes()};
check sendMessage(TEST_MESSAGE, topic, (), headers);

Service headersService =
service object {
remote function onConsumerRecord(BytesConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() - 1 {
receivedHeaders = records[i].headers;
}
}
};

ConsumerConfiguration consumerConfiguration = {
topics: topic,
offsetReset: OFFSET_RESET_EARLIEST,
groupId: "test-listener-group-29",
clientId: "test-listener-29"
};
Listener headersListener = check new (DEFAULT_URL, consumerConfiguration);
check headersListener.attach(headersService);
check headersListener.'start();
runtime:sleep(3);
check headersListener.gracefulStop();
test:assertEquals(receivedHeaders, headers);
}
4 changes: 4 additions & 0 deletions ballerina/tests/listener_data_binding_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public type IntConsumerRecord record {|
int value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type FloatConsumerRecord record {|
Expand Down Expand Up @@ -139,13 +140,15 @@ public type TableConsumerRecord record {|
table<Person> value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type JsonConsumerRecord record {|
PartitionOffset offset;
json key?;
int timestamp;
json value;
map<byte[]|byte[][]> headers;
|};

public type PayloadConsumerRecord record {|
Expand All @@ -159,6 +162,7 @@ public type PayloadConsumerRecord record {|
int partition;
|} partition;
|} offset;
map<byte[]|byte[][]> headers?;
|};

PayloadConsumerRecord payloadConsumerRecord = {
Expand Down
2 changes: 1 addition & 1 deletion ballerina/tests/producer_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function producerKeyTypeMismatchErrorTest() returns error? {
string topic = "key-type-mismatch-error-test-topic";
Producer producer = check new (DEFAULT_URL, producerConfiguration);
string message = "Hello, Ballerina";
error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, MESSAGE_KEY, 0, (), SER_BYTE_ARRAY);
error? result = trap sendByteArrayValues(producer, message.toBytes(), topic, (), MESSAGE_KEY, 0, (), SER_BYTE_ARRAY);
if result is error {
string expectedErr = "Invalid type found for Kafka key. Expected key type: 'byte[]'.";
test:assertEquals(result.message(), expectedErr);
Expand Down
13 changes: 12 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Added

- [Added header support for Consumr and Producer](https://github.com/ballerina-platform/ballerina-library/issues/6196)

## [3.10.0] - 2023-09-18

### Changed

- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731)
- [Updated Apache Kafka client version to `3.5.1`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4752)

### Fixed
- [Removed `client` keyword from the `kafka:Listener`](https://github.com/ballerina-platform/ballerina-standard-library/issues/4750)

## [3.9.1] - 2023-08-18

### Changed

- [Changed disallowing service level annotations in the compiler plugin](https://github.com/ballerina-platform/ballerina-standard-library/issues/4731)

## [3.8.0] - 2023-06-01

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ public type PayloadConsumerRecord record {|
int partition;
|} partition;
|} offset;
map<byte[]|byte[][]> headers;
|};

public type PayloadConsumerRecordWithTypeReference record {|
string key?;
string value;
int timestamp;
kafka:PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type Person record {|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,15 @@ public type PayloadConsumerRecord record {|
int partition;
|} partition;
|} offset;
map<byte[]|byte[][]> headers;
|};

public type PayloadConsumerRecordWithTypeReference record {|
string key?;
string value;
int timestamp;
kafka:PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type Person record {|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public type IntConsumerRecord record {|
int value;
int timestamp;
kafka:PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type FloatConsumerRecord record {|
Expand Down Expand Up @@ -252,11 +253,13 @@ public type TableConsumerRecord record {|
table<Person> value;
int timestamp;
kafka:PartitionOffset offset;
map<byte[]|byte[][]> headers;
|};

public type JsonConsumerRecord record {|
kafka:PartitionOffset offset;
json key?;
int timestamp;
json value;
map<byte[]|byte[][]> headers;
|};
Loading
Loading