Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header support for consumer/producer records #1086

Merged
merged 17 commits into from
Mar 22, 2024

Conversation

dilanSachi
Copy link
Contributor

@dilanSachi dilanSachi commented Mar 19, 2024

Purpose

$subject
Fixes ballerina-platform/ballerina-library#6196

kafka:AnydataProducerRecord and kafka:AnydataConsumerRecord are updated with headers field as map<byte[]|byte[][]|string|string[]>. Users can use subtypes of this in their consumer records. The runtime will try to match the received header values to the best possible match and populate the header map.

As an example, if the received headers are like this - {"key1": ["header1", "header2"], "key2": "header3"} and the consumer record header is defined like this - map<string[]|byte[]>, the headers will be mapped as {"key1": ["header1", "header2"], "key2": [12, 212, 23]}.

We have defined kafka:BytesConsumerRecord with headers as type map<byte[]|byte[][]>. If there is only one header for a specifc key, the type of the value will be byte[]. Otherwise it will be byte[][].

For an instance, if a user defines the header type as map<string|byte[][]>, the headers will be mapped to match the number of headers for a given type.

Producer

type OrderProducerRecord record {|
    *kafka:AnydataProducerRecord;
    map<string|string[]> headers?;
|};

check orderProducer->send({
    topic: "order-topic",
    value: {
        orderId: 1,
        productName: "Laptop",
        price: 1000.00,
        isValid: true
    },
    headers: {
        "key1": "header1",
        "key2": ["header2", "header3"]
    }
});

Consumer

type OrderConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    map<string|string[]> headers?;
|};

OrderConsumerRecord[] records = check orderConsumer->poll(15);
from OrderConsumerRecord orderRecord in records
where orderRecord.value.isValid
do {
    string[] headerKeys = orderRecord.headers.keys();
    foreach string key in headerKeys {
        string|string[] headerValues = orderRecord.headers.get(key);
        if headerValues is string[] {
            io:println(string `Header key: ${key}, value: ${headerValues[0]}`);
        } else if headerValues is string {
            io:println(string `Header key: ${key}, value: ${headerValues}`);
        }
    }
};

The existing compiler plugin for the kafka:Service is not exactly strict and does not validate some of the scenarios. This needs to be improved with correct semantic api usages. Will create an issue for this.

Examples

Checklist

  • Linked to an issue
  • Updated the changelog
  • Added tests
  • Updated the spec
  • Checked native-image compatibility

@dilanSachi dilanSachi added the Skip GraalVM Check This will skip the GraalVM compatibility check label Mar 19, 2024
Copy link

codecov bot commented Mar 19, 2024

Codecov Report

Attention: Patch coverage is 84.84848% with 15 lines in your changes are missing coverage. Please review.

Project coverage is 82.34%. Comparing base (cd175fd) to head (2078638).
Report is 1 commits behind head on master.

Files Patch % Lines
ballerina/producer.bal 70.58% 5 Missing ⚠️
...na/stdlib/kafka/plugin/KafkaFunctionValidator.java 0.00% 1 Missing and 4 partials ⚠️
...va/io/ballerina/stdlib/kafka/utils/KafkaUtils.java 95.00% 0 Missing and 3 partials ⚠️
ballerina/producer_utils.bal 50.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1086      +/-   ##
============================================
+ Coverage     82.00%   82.34%   +0.34%     
- Complexity      464      485      +21     
============================================
  Files            48       48              
  Lines          2289     2373      +84     
  Branches        352      382      +30     
============================================
+ Hits           1877     1954      +77     
- Misses          264      268       +4     
- Partials        148      151       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@dilanSachi dilanSachi removed the Skip GraalVM Check This will skip the GraalVM compatibility check label Mar 20, 2024
@dilanSachi dilanSachi marked this pull request as ready for review March 20, 2024 04:31
ballerina/kafka_records.bal Outdated Show resolved Hide resolved
ballerina/kafka_records.bal Outdated Show resolved Hide resolved
ballerina/producer.bal Outdated Show resolved Hide resolved
TharmiganK
TharmiganK previously approved these changes Mar 20, 2024
Copy link
Contributor

@TharmiganK TharmiganK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dilanSachi dilanSachi added the Skip GraalVM Check This will skip the GraalVM compatibility check label Mar 20, 2024
@dilanSachi dilanSachi removed the Skip GraalVM Check This will skip the GraalVM compatibility check label Mar 22, 2024
@dilanSachi dilanSachi merged commit fe64243 into ballerina-platform:master Mar 22, 2024
8 of 9 checks passed
@dilanSachi dilanSachi deleted the add-header-support branch March 22, 2024 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add header support for Kafka consumer/producer
3 participants