Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated kafka:ConsumerRecord and kafka:ProducerRecord #1115

Merged
merged 4 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);

service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -85,7 +85,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);

foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Module.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);

service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -78,7 +78,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);

foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Package.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ kafka:ConsumerConfiguration consumerConfiguration = {
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);

service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) {
// processes the records
...
// commits the offsets manually
Expand All @@ -77,7 +77,7 @@ string key = "my-key";
check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
```
```ballerina
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1);

foreach var kafkaRecord in records {
byte[] messageContent = kafkaRecord.value;
Expand Down
2 changes: 1 addition & 1 deletion ballerina/consumer.bal
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public client isolated class Consumer {
'class: "io.ballerina.stdlib.kafka.nativeimpl.consumer.Poll"
} external;

# Polls the external broker to retrieve messages in the required data type without the `kafka:ConsumerRecord`
# Polls the external broker to retrieve messages in the required data type without the `kafka:AnydataConsumerRecord`
# information.
# ```ballerina
# Person[] persons = check consumer->pollPayload(10);
Expand Down
40 changes: 0 additions & 40 deletions ballerina/kafka_records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,6 @@ public type TopicPartition record {|
int partition;
|};

# Type related to consumer record.
#
# + key - Key that is included in the record
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + offset - Topic partition position in which the consumed record is stored
# + headers - Map of headers included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord
# instead to support data-binding
@deprecated
public type ConsumerRecord record {|
byte[] key?;
byte[] value;
int timestamp;
PartitionOffset offset;
map<byte[]|byte[][]|string|string[]> headers;
|};

# Type related to anydata consumer record.
#
# + key - Key that is included in the record
Expand All @@ -235,27 +216,6 @@ public type BytesConsumerRecord record {|
map<byte[]|byte[][]> headers;
|};

# Details related to the producer record.
#
# + topic - Topic to which the record will be appended
# + key - Key that is included in the record
# + value - Record content
# + timestamp - Timestamp of the record, in milliseconds since epoch
# + partition - Partition to which the record should be sent
# + headers - Map of headers to be included with the record
# # Deprecated
# Usage of this record is deprecated. Use subtypes of AnydataProducerRecord
# instead to support data-binding
@deprecated
public type ProducerRecord record {|
string topic;
byte[] key?;
byte[] value;
int timestamp?;
int partition?;
map<byte[]|byte[][]|string|string[]> headers?;
|};

# Details related to the anydata producer record.
#
# + topic - Topic to which the record will be appended
Expand Down
Loading
Loading