Skip to content

Commit

Permalink
Fix kroxylicious#518: API - sendRequest ought to accept header parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
k-wall committed Sep 26, 2023
1 parent 889fae2 commit 371ec3c
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,45 @@ public interface FilterContext {
* @see io.kroxylicious.proxy.filter Thread Safety
*/
@NonNull
@Deprecated(forRemoval = true, since = "0.3.0")
<T extends ApiMessage> CompletionStage<T> sendRequest(short apiVersion, @NonNull ApiMessage request);

/**
* Send a request from a filter towards the broker. The response to the request will be
* made available to the filter asynchronously, by way of the {@link CompletionStage}. The
* CompletionStage will contain the response header and response body, or null of the
* request does not have a response.
* <h4>Header</h4>
* <p>The caller is required to provide a {@link RequestHeaderData}. It is recommended that the
* caller specify the {@link RequestHeaderData#requestApiVersion()}. This can be done conveniently
* with forms such as:</p>
* <pre>{@code new RequestHeaderData().setRequestApiVersion(4)}</pre>
* <p>The caller may also provide a {@link RequestHeaderData#clientId()} an
* {@link RequestHeaderData#unknownTaggedFields()}. Kroxylicious will automatically set the
* {@link RequestHeaderData#requestApiKey()} to be consistent with the {@code request}.
* {@link RequestHeaderData#correlationId()} is ignored.</p>
* <h4>Filtering</h4>
* <p>The request will pass through all filters upstream of the filter that invoked the operation,
* invoking them, but not itself. Similarly, the response will pass through all filters upstream
* of the filter that invoked the operation, invoking them, but not itself. The response does not
* pass through filters downstream.
* </p>
* <h4>Chained Computation stages</h4>
* <p>Default and asynchronous default computation stages chained to the returned
* {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread associated with the
* connection. See {@link io.kroxylicious.proxy.filter} for more details.
* </p>
*
* @param <M> The type of the response
* @param header The request header.
* @param request The request data.
* @return CompletionStage that will yield the response.
* @see io.kroxylicious.proxy.filter Thread Safety
*/
@NonNull
<M extends ApiMessage> CompletionStage<ResponseHeaderAndApiMessage<M>> sendRequest(@NonNull RequestHeaderData header,
@NonNull ApiMessage request);

/**
* Generates a completed filter results containing the given header and response. When
* response filters implementations return this result, the response will be sent towards
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.filter;

import java.util.Objects;

import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.protocol.ApiMessage;

/**
* Tuple encapsulating a response message and its header.
* @param header message header
* @param message message
* @param <M> The type of the message
*/
public record ResponseHeaderAndApiMessage<M extends ApiMessage>(ResponseHeaderData header, M message) {
public ResponseHeaderAndApiMessage {
Objects.requireNonNull(header);
Objects.requireNonNull(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,8 +102,10 @@ private CompletionStage<ApiVersions> getVersions(FilterContext context) {
return CompletableFuture.completedFuture(apiVersions);
}
else {
return context.sendRequest(ApiKeys.API_VERSIONS.latestVersion(), new ApiVersionsRequestData()).thenApply(message -> {
ApiVersionsResponseData apiVersionsResponseData = (ApiVersionsResponseData) message;
var data = new ApiVersionsRequestData();
var header = new RequestHeaderData().setRequestApiVersion(data.highestSupportedVersion());
return context.<ApiVersionsResponseData> sendRequest(header, data).thenApply(response -> {
var apiVersionsResponseData = response.message();
updateVersions(context.channelDescriptor(), apiVersionsResponseData);
return apiVersions;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.kroxylicious.proxy.filter.RequestFilterResultBuilder;
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import io.kroxylicious.proxy.filter.ResponseFilterResultBuilder;
import io.kroxylicious.proxy.filter.ResponseHeaderAndApiMessage;
import io.kroxylicious.proxy.frame.DecodedFrame;
import io.kroxylicious.proxy.frame.DecodedRequestFrame;
import io.kroxylicious.proxy.frame.DecodedResponseFrame;
Expand All @@ -52,6 +53,8 @@
import io.kroxylicious.proxy.internal.util.ByteBufOutputStream;
import io.kroxylicious.proxy.model.VirtualCluster;

import edu.umd.cs.findbugs.annotations.NonNull;

/**
* A {@code ChannelInboundHandler} (for handling requests from downstream)
* that applies a single {@link Filter}.
Expand Down Expand Up @@ -402,13 +405,15 @@ private String channelDescriptor() {
return ctx.channel().toString();
}

@SuppressWarnings("unchecked")
private void completeInternalResponse(InternalResponseFrame<?> decodedFrame) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{}: Completing {} response for request sent by this filter{}: {}",
channelDescriptor(), decodedFrame.apiKey(), filterDescriptor(), decodedFrame);
}
CompletableFuture<ApiMessage> p = decodedFrame.promise();
p.complete(decodedFrame.body());
CompletableFuture<ResponseHeaderAndApiMessage<ApiMessage>> p = (CompletableFuture<ResponseHeaderAndApiMessage<ApiMessage>>) decodedFrame
.promise();
p.complete(new ResponseHeaderAndApiMessage<>(decodedFrame.header(), decodedFrame.body()));
}

private static <F extends FilterResult> F validateFilterResultNonNull(F f) {
Expand Down Expand Up @@ -474,23 +479,29 @@ public CompletionStage<ResponseFilterResult> forwardResponse(ResponseHeaderData
return responseFilterResultBuilder().forward(header, response).completed();
}

@NonNull
@Override
public <T extends ApiMessage> CompletionStage<T> sendRequest(short apiVersion, ApiMessage request) {
short key = request.apiKey();
var apiKey = ApiKeys.forId(key);
short headerVersion = apiKey.requestHeaderVersion(apiVersion);
var header = new RequestHeaderData()
.setCorrelationId(-1)
.setRequestApiKey(key)
.setRequestApiVersion(apiVersion);
if (headerVersion > 1) {
header.setClientId(filter.getClass().getSimpleName() + "@" + System.identityHashCode(filter));
}
boolean hasResponse = apiKey != ApiKeys.PRODUCE
|| ((ProduceRequestData) request).acks() != 0;
var filterPromise = new InternalCompletableFuture<T>(ctx.executor());
@Deprecated(forRemoval = true, since = "0.3.0")
public <T extends ApiMessage> CompletionStage<T> sendRequest(short apiVersion, @NonNull ApiMessage request) {
return sendRequest(new RequestHeaderData().setRequestApiVersion(apiVersion), request)
.thenApply(r -> r == null ? null : (T) r.message());
}

@NonNull
@Override
public <M extends ApiMessage> CompletionStage<ResponseHeaderAndApiMessage<M>> sendRequest(@NonNull RequestHeaderData header,
@NonNull ApiMessage request) {
Objects.requireNonNull(header);
Objects.requireNonNull(request);

var apiKey = ApiKeys.forId(request.apiKey());
header.setRequestApiKey(apiKey.id);
header.setCorrelationId(-1);

var hasResponse = apiKey != ApiKeys.PRODUCE || ((ProduceRequestData) request).acks() != 0;
var filterPromise = new InternalCompletableFuture<ResponseHeaderAndApiMessage<M>>(ctx.executor());
var frame = new InternalRequestFrame<>(
apiVersion, -1, hasResponse,
header.requestApiVersion(), header.correlationId(), hasResponse,
filter, filterPromise, header, request);

if (LOGGER.isDebugEnabled()) {
Expand All @@ -517,7 +528,8 @@ public <T extends ApiMessage> CompletionStage<T> sendRequest(short apiVersion, A
ctx.executor().schedule(() -> {
LOGGER.debug("{}: Timing out {} request after {}ms", ctx, apiKey, timeoutMs);
filterPromise
.completeExceptionally(new TimeoutException("Asynchronous %s request made by filter %s was timed-out.".formatted(apiKey, filterDescriptor())));
.completeExceptionally(
new TimeoutException("Asynchronous %s request made by filter %s was timed-out.".formatted(apiKey, filterDescriptor())));
}, timeoutMs, TimeUnit.MILLISECONDS);
return filterPromise.minimalCompletionStage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ public Filter recipient() {
return recipient;
}

@SuppressWarnings("unchecked")
public <T extends ApiMessage> CompletableFuture<T> promise() {
return (CompletableFuture<T>) future;
public CompletableFuture<?> promise() {
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey, RequestHea
var request = useClientRequest ? (MetadataRequestData) body : new MetadataRequestData();

var future = new CompletableFuture<RequestFilterResult>();
var unused = context.<MetadataResponseData> sendRequest(apiVersion, request)
.thenAccept(metadataResponseData -> {
var unused = context.<MetadataResponseData> sendRequest(new RequestHeaderData().setRequestApiVersion(apiVersion), request)
.thenAccept(metadataResponse -> {
// closing the connection is important. This client connection is connected to bootstrap (it could
// be any broker or maybe not something else). we must close the connection to force the client to
// connect again.
var builder = context.requestFilterResultBuilder();
if (useClientRequest) {
// The client's request matched our out-of-band message, so we may as well return the
// response.
future.complete(builder.shortCircuitResponse(metadataResponseData).withCloseConnection().build());
future.complete(builder.shortCircuitResponse(metadataResponse.message()).withCloseConnection().build());
}
else {
future.complete(builder.withCloseConnection().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.message.FetchResponseData.PartitionData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
Expand All @@ -37,7 +38,7 @@
import io.kroxylicious.proxy.internal.util.MemoryRecordsHelper;

/**
* An filter for modifying the key/value/header/topic of {@link ApiKeys#FETCH} responses.
* A filter for modifying the key/value/header/topic of {@link ApiKeys#FETCH} responses.
*/
public class FetchResponseTransformationFilter implements FetchResponseFilter {

Expand Down Expand Up @@ -87,13 +88,14 @@ public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion, R
if (!requestTopics.isEmpty()) {
LOGGER.debug("Fetch response contains {} unknown topic ids, lookup via Metadata request: {}", requestTopics.size(), requestTopics);
var future = new CompletableFuture<ResponseFilterResult>();
// TODO Can't necessarily use HIGHEST_SUPPORTED_VERSION, must use highest supported version
context.<MetadataResponseData> sendRequest(MetadataRequestData.HIGHEST_SUPPORTED_VERSION,
new MetadataRequestData()
.setTopics(requestTopics))
// Version 12 required for topic id support.
var metadataHeader = new RequestHeaderData().setRequestApiVersion((short) 12);
var metadataRequest = new MetadataRequestData().setTopics(requestTopics);
context.<MetadataResponseData> sendRequest(metadataHeader, metadataRequest)
.thenAccept(metadataResponse -> {
Map<Uuid, String> uidToName = metadataResponse.topics().stream().collect(Collectors.toMap(MetadataResponseData.MetadataResponseTopic::topicId,
MetadataResponseData.MetadataResponseTopic::name));
Map<Uuid, String> uidToName = metadataResponse.message().topics().stream()
.collect(Collectors.toMap(MetadataResponseData.MetadataResponseTopic::topicId,
MetadataResponseData.MetadataResponseTopic::name));
LOGGER.debug("Metadata response yields {}, updating original Fetch response", uidToName);
for (var fetchableTopicResponse : fetchResponse.responses()) {
fetchableTopicResponse.setTopic(uidToName.get(fetchableTopicResponse.topicId()));
Expand Down
Loading

0 comments on commit 371ec3c

Please sign in to comment.