Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header support for consumer/producer records #1086

Merged
merged 17 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
authors = ["Ballerina"]
keywords = ["kafka", "event streaming", "network", "messaging"]
repository = "https://github.com/ballerina-platform/module-ballerinax-kafka"
Expand All @@ -15,8 +15,8 @@ graalvmCompatible = true
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "kafka-native"
version = "3.10.0"
path = "../native/build/libs/kafka-native-3.10.0.jar"
version = "3.10.1"
path = "../native/build/libs/kafka-native-3.10.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.apache.kafka"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin"
class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.0.jar"
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.1-SNAPSHOT.jar"
9 changes: 5 additions & 4 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "cache"
version = "3.7.0"
version = "3.7.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "constraint"},
Expand Down Expand Up @@ -71,7 +71,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.0"
version = "2.10.12"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "auth"},
Expand Down Expand Up @@ -283,7 +283,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.0"
version = "1.2.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -325,6 +325,7 @@ modules = [
org = "ballerina"
name = "time"
version = "2.4.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -389,7 +390,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
22 changes: 17 additions & 5 deletions ballerina/kafka_records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public type TopicPartition record {|
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord
# instead to support data-binding
Expand All @@ -206,6 +207,7 @@ public type ConsumerRecord record {|
byte[] value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Type related to anydata consumer record.
Expand All @@ -214,19 +216,23 @@ public type ConsumerRecord record {|
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
public type AnydataConsumerRecord record {|
anydata key?;
anydata value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Subtype related to `kafka:AnydataConsumerRecord` record.
#
# + value - Record content in bytes
# + headers - Headers as a byte[] or byte[][]
public type BytesConsumerRecord record {|
*AnydataConsumerRecord;
byte[] value;
map<byte[]|byte[][]> headers;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
|};

# Details related to the producer record.
Expand All @@ -236,6 +242,7 @@ public type BytesConsumerRecord record {|
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataProducerRecord
# instead to support data-binding
Expand All @@ -246,29 +253,34 @@ public type ProducerRecord record {|
byte[] value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Details related to the anydata producer record.
#
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
public type AnydataProducerRecord record {|
string topic;
anydata key?;
anydata value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Subtype related to `kafka:AnydataProducerRecord` record.
#
# + value - Record content in bytes
# + headers - Headers as a byte[] or byte[][]
public type BytesProducerRecord record {|
*AnydataProducerRecord;
byte[] value;
map<byte[]|byte[][]> headers?;
|};

// Producer-related records
Expand Down
25 changes: 24 additions & 1 deletion ballerina/producer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,30 @@ public client isolated class Producer {
} else if anydataKey !is () {
key = anydataKey.toJsonString().toBytes();
}
return sendByteArrayValues(self, value, producerRecord.topic, key,
return sendByteArrayValues(self, value, producerRecord.topic, self.getHeaderValueAsByteArrayList(producerRecord?.headers), key,
producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType);
}

private isolated function getHeaderValueAsByteArrayList(map<byte[]|byte[][]|string|string[]>? headers) returns [string, byte[]][] {
[string, byte[]][] bytesHeaderList = [];
if headers is map<byte[]|byte[][]|string|string[]> {
foreach string key in headers.keys() {
byte[]|byte[][]|string|string[] values = headers.get(key);
if values is byte[] {
bytesHeaderList.push([key, values]);
} else if values is byte[][] {
foreach byte[] headerValue in values {
bytesHeaderList.push([key, headerValue]);
}
} else if values is string {
bytesHeaderList.push([key, values.toBytes()]);
} else if values is string[] {
foreach string headerValue in values {
bytesHeaderList.push([key, headerValue.toBytes()]);
}
}
}
}
return bytesHeaderList;
}
}
10 changes: 5 additions & 5 deletions ballerina/producer_utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

import ballerina/jballerina.java;

isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, anydata? key, int? partition,
isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, [string, byte[]][] headers, anydata? key, int? partition,
int? timestamp, string keySerializerType) returns Error? {
if key is () {
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp);
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers);
}
if keySerializerType == SER_BYTE_ARRAY {
if key is byte[] {
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp);
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp, headers);
}
panic getKeyTypeMismatchError(BYTE_ARRAY);
}
}

//Send byte[] values with different types of keys
isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (),
int? timestamp = ()) returns Error? =
int? timestamp = (), [string, byte[]][] headers = []) returns Error? =

Check warning on line 34 in ballerina/producer_utils.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/producer_utils.bal#L34

Added line #L34 was not covered by tests
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;

isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key,
int? partition = (), int? timestamp = ()) returns Error? =
int? partition = (), int? timestamp = (), [string, byte[]][] headers = []) returns Error? =

Check warning on line 40 in ballerina/producer_utils.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/producer_utils.bal#L40

Added line #L40 was not covered by tests
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;
5 changes: 3 additions & 2 deletions ballerina/tests/consumer_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ ProducerConfiguration producerConfiguration = {
requestTimeout: 2,
retryCount: 3
};

Producer producer = check new (DEFAULT_URL, producerConfiguration);

@test:Config {enable: true}
Expand Down Expand Up @@ -1483,6 +1484,6 @@ function commitOffsetWithPolledOffsetValue() returns error? {
check consumer->close();
}

function sendMessage(anydata message, string topic, anydata? key = ()) returns error? {
return producer->send({ topic: topic, value: message, key });
function sendMessage(anydata message, string topic, anydata? key = (), map<byte[]|byte[][]|string|string[]>? headers = ()) returns error? {
return producer->send({ topic: topic, value: message, key, headers });
}
3 changes: 1 addition & 2 deletions ballerina/tests/consumer_constraint_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ public type StringConstraintConsumerRecord record {|
|};

public type IntConstraintConsumerRecord record {|
*AnydataConsumerRecord;
int key?;
@constraint:Int {
maxValue: 100,
minValue: 10
}
int value;
int timestamp;
PartitionOffset offset;
|};

@constraint:Float {
Expand Down
Loading
Loading