Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP data chunking support for kafka buffer #4475

Merged
merged 6 commits into from
May 2, 2024

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Apr 30, 2024

Description

HTTP data chunking support for kafka buffer

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -75,6 +76,32 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi
return requestProcessDuration.recordCallable(() -> processRequest(aggregatedHttpRequest));
}

private void sendChunks(List<String> jsonList, int maxRequestLength) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this logic into the codec class. It defines how we parse the input data. So it should also decide how to put it back together.

I have some ideas to improve the internal performance. We can do that in a follow-on PR.

Please also add some unit tests as part of the non-draft PR.

Signed-off-by: Kondaka <[email protected]>
@@ -83,6 +83,10 @@ default boolean isByteBuffer() {
return false;
}

default Integer getMaxRequestSize() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Optional<Integer> instead for clarity.

Integer maxRequestSize = buffer.getMaxRequestSize();
if (maxRequestSize != null) {
if (result == null || result > maxRequestSize)
result = maxRequestSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we get the minimum max request size?

Also, a stream would probably work nicely here:

allBuffers.stream().mapToLong(b -> b.getMaxRequestSize()).min();

@@ -56,21 +57,23 @@ public KafkaCustomProducerFactory(
public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
AtomicInteger maxRequestSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutating an existing object is not a very maintainable practice.

Can you add the maxRequestSize as a getter on the KafkaCustomProducer which this will return?

List<List<String>> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
int size = 2; // To account for "[" and "]" when the list is converted to String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this a constant string and include that data here.

private static final OVERHEAD_CHARACTERS = "[]"

and then:

int size = OVERHEAD_CHARACTERS.length()

@kkondaka kkondaka marked this pull request as ready for review May 2, 2024 07:14
dlvenable
dlvenable previously approved these changes May 2, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making these changes!

@@ -49,14 +53,15 @@ public class LogHTTPService {
private final Counter successRequestsCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;
private Optional<Integer> maxRequestLength;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep this as an Integer and allow it to be nullable.

Signed-off-by: Kondaka <[email protected]>
@kkondaka kkondaka merged commit 0eec807 into opensearch-project:main May 2, 2024
48 of 50 checks passed
@kkondaka kkondaka added this to the v2.8 milestone May 14, 2024
@kkondaka kkondaka deleted the kafka-http-chunks branch July 30, 2024 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants