Skip to content

Commit

Permalink
Merge pull request #1086 from dilanSachi/add-header-support
Browse files Browse the repository at this point in the history
Add header support for consumer/producer records
  • Loading branch information
dilanSachi authored Mar 22, 2024
2 parents f1c219e + 2078638 commit fe64243
Show file tree
Hide file tree
Showing 22 changed files with 480 additions and 124 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
authors = ["Ballerina"]
keywords = ["kafka", "event streaming", "network", "messaging"]
repository = "https://github.com/ballerina-platform/module-ballerinax-kafka"
Expand All @@ -15,8 +15,8 @@ graalvmCompatible = true
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "kafka-native"
version = "3.10.0"
path = "../native/build/libs/kafka-native-3.10.0.jar"
version = "3.10.1"
path = "../native/build/libs/kafka-native-3.10.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.apache.kafka"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin"
class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.0.jar"
path = "../compiler-plugin/build/libs/kafka-compiler-plugin-3.10.1-SNAPSHOT.jar"
9 changes: 5 additions & 4 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "cache"
version = "3.7.0"
version = "3.7.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "constraint"},
Expand Down Expand Up @@ -71,7 +71,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.0"
version = "2.10.12"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "auth"},
Expand Down Expand Up @@ -283,7 +283,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.0"
version = "1.2.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -325,6 +325,7 @@ modules = [
org = "ballerina"
name = "time"
version = "2.4.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down Expand Up @@ -389,7 +390,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "kafka"
version = "3.10.0"
version = "3.10.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
22 changes: 17 additions & 5 deletions ballerina/kafka_records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public type TopicPartition record {|
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord
# instead to support data-binding
Expand All @@ -206,6 +207,7 @@ public type ConsumerRecord record {|
byte[] value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Type related to anydata consumer record.
Expand All @@ -214,19 +216,23 @@ public type ConsumerRecord record {|
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
public type AnydataConsumerRecord record {|
anydata key?;
anydata value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Subtype related to `kafka:AnydataConsumerRecord` record.
#
# + value - Record content in bytes
# + headers - Headers as a byte[] or byte[][]
public type BytesConsumerRecord record {|
*AnydataConsumerRecord;
byte[] value;
map<byte[]|byte[][]> headers;
|};

# Details related to the producer record.
Expand All @@ -236,6 +242,7 @@ public type BytesConsumerRecord record {|
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataProducerRecord
# instead to support data-binding
Expand All @@ -246,29 +253,34 @@ public type ProducerRecord record {|
byte[] value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Details related to the anydata producer record.
#
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Anydata record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
public type AnydataProducerRecord record {|
string topic;
anydata key?;
anydata value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Subtype related to `kafka:AnydataProducerRecord` record.
#
# + value - Record content in bytes
# + headers - Headers as a byte[] or byte[][]
public type BytesProducerRecord record {|
*AnydataProducerRecord;
byte[] value;
map<byte[]|byte[][]> headers?;
|};

// Producer-related records
Expand Down
25 changes: 24 additions & 1 deletion ballerina/producer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,30 @@ public client isolated class Producer {
} else if anydataKey !is () {
key = anydataKey.toJsonString().toBytes();
}
return sendByteArrayValues(self, value, producerRecord.topic, key,
return sendByteArrayValues(self, value, producerRecord.topic, self.getHeaderValueAsByteArrayList(producerRecord?.headers), key,
producerRecord?.partition, producerRecord?.timestamp, self.keySerializerType);
}

private isolated function getHeaderValueAsByteArrayList(map<byte[]|byte[][]|string|string[]>? headers) returns [string, byte[]][] {
[string, byte[]][] bytesHeaderList = [];
if headers is map<byte[]|byte[][]|string|string[]> {
foreach string key in headers.keys() {
byte[]|byte[][]|string|string[] values = headers.get(key);
if values is byte[] {
bytesHeaderList.push([key, values]);
} else if values is byte[][] {
foreach byte[] headerValue in values {
bytesHeaderList.push([key, headerValue]);
}
} else if values is string {
bytesHeaderList.push([key, values.toBytes()]);
} else if values is string[] {
foreach string headerValue in values {
bytesHeaderList.push([key, headerValue.toBytes()]);
}
}
}
}
return bytesHeaderList;
}
}
10 changes: 5 additions & 5 deletions ballerina/producer_utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

import ballerina/jballerina.java;

isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, anydata? key, int? partition,
isolated function sendByteArrayValues(Producer producer, byte[] value, string topic, [string, byte[]][] headers, anydata? key, int? partition,
int? timestamp, string keySerializerType) returns Error? {
if key is () {
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp);
return sendByteArrayValuesNilKeys(producer, value, topic, partition, timestamp, headers);
}
if keySerializerType == SER_BYTE_ARRAY {
if key is byte[] {
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp);
return sendByteArrayValuesByteArrayKeys(producer, value, topic, key, partition, timestamp, headers);
}
panic getKeyTypeMismatchError(BYTE_ARRAY);
}
}

//Send byte[] values with different types of keys
isolated function sendByteArrayValuesNilKeys(Producer producer, byte[] value, string topic, int? partition = (),
int? timestamp = ()) returns Error? =
int? timestamp = (), [string, byte[]][] headers = []) returns Error? =
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;

isolated function sendByteArrayValuesByteArrayKeys(Producer producer, byte[] value, string topic, byte[] key,
int? partition = (), int? timestamp = ()) returns Error? =
int? partition = (), int? timestamp = (), [string, byte[]][] headers = []) returns Error? =
@java:Method {
'class: "io.ballerina.stdlib.kafka.nativeimpl.producer.SendByteArrayValues"
} external;
5 changes: 3 additions & 2 deletions ballerina/tests/consumer_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ ProducerConfiguration producerConfiguration = {
requestTimeout: 2,
retryCount: 3
};

Producer producer = check new (DEFAULT_URL, producerConfiguration);

@test:Config {enable: true}
Expand Down Expand Up @@ -1483,6 +1484,6 @@ function commitOffsetWithPolledOffsetValue() returns error? {
check consumer->close();
}

function sendMessage(anydata message, string topic, anydata? key = ()) returns error? {
return producer->send({ topic: topic, value: message, key });
function sendMessage(anydata message, string topic, anydata? key = (), map<byte[]|byte[][]|string|string[]>? headers = ()) returns error? {
return producer->send({ topic: topic, value: message, key, headers });
}
3 changes: 1 addition & 2 deletions ballerina/tests/consumer_constraint_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ public type StringConstraintConsumerRecord record {|
|};

public type IntConstraintConsumerRecord record {|
*AnydataConsumerRecord;
int key?;
@constraint:Int {
maxValue: 100,
minValue: 10
}
int value;
int timestamp;
PartitionOffset offset;
|};

@constraint:Float {
Expand Down
Loading

0 comments on commit fe64243

Please sign in to comment.