Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked encoding for CCR APIs #92526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

package org.elasticsearch.xpack.ccr.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;

import java.util.List;
Expand All @@ -19,6 +23,8 @@

public class RestCcrStatsAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCcrStatsAction.class);

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_ccr/stats"));
Expand All @@ -32,7 +38,17 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final CcrStatsAction.Request request = new CcrStatsAction.Request();
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(
CcrStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do I still need a threaded listener with chunked encoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toXContent implementation collects all the shard responses into a map keyed by index up-front - I think that should be done elsewhere.

// sort by index name, then shard ID
final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>();
for (final StatsResponse response : statsResponse) {
taskResponsesByIndex.computeIfAbsent(response.status().followerIndex(), k -> new TreeMap<>())
.put(response.status().getShardId(), response);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection this could reasonably be using the CCR pool, adjusted in a194c11.

logger,
client.threadPool(),
Ccr.CCR_THREAD_POOL_NAME,
new RestChunkedToXContentListener<>(channel),
false
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;

import java.util.List;
Expand All @@ -34,7 +34,7 @@ public String getName() {
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final FollowInfoAction.Request request = new FollowInfoAction.Request();
request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestChunkedToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

package org.elasticsearch.xpack.ccr.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.util.List;
Expand All @@ -20,6 +24,8 @@

public class RestFollowStatsAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestFollowStatsAction.class);

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/{index}/_ccr/stats"));
Expand All @@ -34,7 +40,17 @@ public String getName() {
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
return channel -> client.execute(FollowStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(
FollowStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here why does this need threading if it's chunked encoding?

logger,
client.threadPool(),
Ccr.CCR_THREAD_POOL_NAME,
new RestChunkedToXContentListener<>(channel),
false
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters;
import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse;
Expand All @@ -35,4 +40,19 @@ protected CcrStatsAction.Response createTestInstance() {
FollowStatsAction.StatsResponses statsResponse = createStatsResponse();
return new CcrStatsAction.Response(autoFollowStats, statsResponse);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

var indexCount = instance.getFollowStats().getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count();
assertEquals(instance.getFollowStats().getStatsResponses().size() + indexCount * 2 + 4, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
Expand All @@ -18,20 +19,23 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase<FollowInfoAction.Response> {
public class FollowInfoResponseTests extends AbstractChunkedSerializingTestCase<FollowInfoAction.Response> {

static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>("info_parser", args -> {
return new FollowerInfo(
static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>(
"info_parser",
args -> new FollowerInfo(
(String) args[0],
(String) args[1],
(String) args[2],
Status.fromString((String) args[3]),
(FollowParameters) args[4]
);
});
)
);

static {
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD);
Expand All @@ -48,15 +52,15 @@ public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<FollowInfoAction.Response, Void> PARSER = new ConstructingObjectParser<>(
"response",
args -> { return new FollowInfoAction.Response((List<FollowerInfo>) args[0]); }
args -> new FollowInfoAction.Response((List<FollowerInfo>) args[0])
);

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), INFO_PARSER, FOLLOWER_INDICES_FIELD);
}

@Override
protected FollowInfoAction.Response doParseInstance(XContentParser parser) throws IOException {
protected FollowInfoAction.Response doParseInstance(XContentParser parser) {
return PARSER.apply(parser, null);
}

Expand Down Expand Up @@ -87,4 +91,18 @@ protected FollowInfoAction.Response createTestInstance() {
}
return new FollowInfoAction.Response(infos);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

assertEquals(instance.getFollowInfos().size() + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class StatsResponsesTests extends AbstractWireSerializingTestCase<FollowStatsAction.StatsResponses> {

@Override
Expand Down Expand Up @@ -67,4 +72,19 @@ static FollowStatsAction.StatsResponses createStatsResponse() {
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

var indexCount = instance.getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count();
assertEquals(instance.getStatsResponses().size() + indexCount * 2 + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;

public class CcrStatsAction extends ActionType<CcrStatsAction.Response> {
Expand Down Expand Up @@ -48,7 +51,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ChunkedToXContent {

private final AutoFollowStats autoFollowStats;
private final FollowStatsAction.StatsResponses followStats;
Expand Down Expand Up @@ -79,14 +82,14 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("auto_follow_stats", autoFollowStats, params);
builder.field("follow_stats", followStats, params);
}
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single(
(builder, params) -> builder.startObject().field("auto_follow_stats", autoFollowStats, params).field("follow_stats")
),
followStats.toXContentChunked(outerParams),
ChunkedToXContentHelper.endObject()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -77,7 +81,7 @@ public int hashCode() {
}
}

public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ChunkedToXContent {

public static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");

Expand All @@ -102,15 +106,12 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(FOLLOWER_INDICES_FIELD.getPreferredName());
for (FollowerInfo followInfo : followInfos) {
followInfo.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startObject().startArray(FOLLOWER_INDICES_FIELD.getPreferredName())),
followInfos.iterator(),
Iterators.single((builder, params) -> builder.endArray().endObject())
);
}

@Override
Expand Down
Loading