Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List all PITs API #4

Open
wants to merge 10 commits into
base: deletepit
Choose a base branch
from
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -403,6 +405,7 @@
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -662,8 +665,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);

// Point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -840,6 +846,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class CreatePitAction extends ActionType<CreatePitResponse> {
public static final CreatePitAction INSTANCE = new CreatePitAction();
public static final String NAME = "indices:data/read/point_in_time";
public static final String NAME = "indices:data/read/point_in_time/create";

private CreatePitAction() {
super(NAME, CreatePitResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public class DeletePitAction extends ActionType<DeletePitResponse> {

public static final DeletePitAction INSTANCE = new DeletePitAction();
public static final String NAME = "indices:admin/read/pit/delete";
public static final String NAME = "indices:data/read/point_in_time/delete";

private DeletePitAction() {
super(NAME, DeletePitResponse::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to get all active PITs in a node
*/
public class GetAllPitNodeRequest extends BaseNodeRequest {
GetAllPitNodesRequest request;

@Inject
public GetAllPitNodeRequest(GetAllPitNodesRequest request) {
this.request = request;
}

public GetAllPitNodeRequest(StreamInput in) throws IOException {
super(in);
request = new GetAllPitNodesRequest(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Response which holds information about all PIT contexts in a node
*/
public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment {
private List<PitInfo> pitsInfo;

@Inject
public GetAllPitNodeResponse(StreamInput in, List<PitInfo> pitsInfo) throws IOException {
super(in);
this.pitsInfo = pitsInfo;
}

public GetAllPitNodeResponse(DiscoveryNode node, List<PitInfo> pitsInfo) {
super(node);
this.pitsInfo = pitsInfo;
}

public GetAllPitNodeResponse(StreamInput in) throws IOException {
super(in);
this.pitsInfo = Collections.unmodifiableList(in.readList(PitInfo::new));
}

public List<PitInfo> getPitsInfo() {
return pitsInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(pitsInfo);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitsInfo");
for (PitInfo pit : pitsInfo) {
pit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to get all active PIT IDs in set of nodes
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {
@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Response structure to hold all active PIT contexts information from all nodes
*/
public class GetAllPitNodesResponse extends BaseNodesResponse<GetAllPitNodeResponse> implements ToXContentObject {

List<PitInfo> pitsInfo = new ArrayList<>();

@Inject
public GetAllPitNodesResponse(StreamInput in) throws IOException {
super(in);
}

public GetAllPitNodesResponse(
ClusterName clusterName,
List<GetAllPitNodeResponse> getAllPitNodeRespons,
List<FailedNodeException> failures
) {
super(clusterName, getAllPitNodeRespons, failures);
Set<String> uniquePitIds = new HashSet<>();
pitsInfo.addAll(
getAllPitNodeRespons.stream()
.flatMap(p -> p.getPitsInfo().stream().filter(t -> uniquePitIds.add(t.getPitId())))
.collect(Collectors.toList())
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitsInfo");
for (PitInfo pit : pitsInfo) {
pit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public List<GetAllPitNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(GetAllPitNodeResponse::new);
}

@Override
public void writeNodesTo(StreamOutput out, List<GetAllPitNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}

public List<PitInfo> getPITIDs() {
return new ArrayList<>(pitsInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for listing all PIT reader contexts
*/
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();
public static final String NAME = "indices:data/read/point_in_time/readall";

private GetAllPitsAction() {
super(NAME, GetAllPitNodesResponse::new);
}
}
63 changes: 63 additions & 0 deletions server/src/main/java/org/opensearch/action/search/PitInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;

/**
* This holds information about pit reader context such as pit id and creation time
*/
public class PitInfo implements ToXContentFragment, Writeable {
private final String pitId;
private final long creationTime;
private final long keepAlive;

public PitInfo(String pitId, long creationTime,long keepAlive) {
this.pitId = pitId;
this.creationTime = creationTime;
this.keepAlive = keepAlive;
}

public PitInfo(StreamInput in) throws IOException {
this.pitId = in.readString();
this.creationTime = in.readLong();
this.keepAlive = in.readLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pitId", pitId);
builder.field("creationTime", creationTime);
builder.field("keepALive", keepAlive);
builder.endObject();
return builder;
}

public String getPitId() {
return pitId;
}

public long getCreationTime() {
return creationTime;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(creationTime);
out.writeLong(keepAlive);
}
}
Loading