Skip to content

Commit

Permalink
Merge pull request #1115 from dilanSachi/remove-deprecated
Browse files Browse the repository at this point in the history
Remove deprecated `kafka:ConsumerRecord` and `kafka:ProducerRecord`
  • Loading branch information
dilanSachi authored Apr 19, 2024
2 parents b1fbdbb + 44ad26c commit e5675da
Show file tree
Hide file tree
Showing 55 changed files with 196 additions and 234 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -85,7 +85,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);
foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Module.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -78,7 +78,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);
foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Package.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -77,7 +77,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);
foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
2 changes: 1 addition & 1 deletion ballerina/consumer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public client isolated class Consumer {
'class: "io.ballerina.stdlib.kafka.nativeimpl.consumer.Poll"
} external;

# Polls the external broker to retrieve messages in the required data type without the `kafka:ConsumerRecord`
# Polls the external broker to retrieve messages in the required data type without the `kafka:AnydataConsumerRecord`
# information.
# ```ballerina
# Person[] persons = check consumer->pollPayload(10);
Expand Down
40 changes: 0 additions & 40 deletions ballerina/kafka_records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,6 @@ public type TopicPartition record {|
int partition;
|};

# Type related to consumer record.
#
# + key - Key that is included in the record
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord
# instead to support data-binding
@deprecated
public type ConsumerRecord record {|
byte[] key?;
byte[] value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Type related to anydata consumer record.
#
# + key - Key that is included in the record
Expand All @@ -235,27 +216,6 @@ public type BytesConsumerRecord record {|
map<byte[]|byte[][]> headers;
|};

# Details related to the producer record.
#
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataProducerRecord
# instead to support data-binding
@deprecated
public type ProducerRecord record {|
string topic;
byte[] key?;
byte[] value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Details related to the anydata producer record.
#
# + topic - Topic to which the record will be appended
Expand Down
Loading

0 comments on commit e5675da

Please sign in to comment.