Skip to content

Commit

Permalink
Delete PIT API
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Apr 18, 2022
2 parents 57232fb + 0fcd1df commit ece4c00
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -46,6 +49,7 @@ public class CreatePITController implements Runnable {
private final Task task;
private final ActionListener<CreatePITResponse> listener;
private final CreatePITRequest request;
private static final Logger logger = LogManager.getLogger(CreatePITController.class);

public CreatePITController(
CreatePITRequest request,
Expand Down Expand Up @@ -88,7 +92,10 @@ public void executeCreatePit() {

final StepListener<CreatePITResponse> createPitListener = new StepListener<>();

final ActionListener<CreatePITResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure);
final ActionListener<CreatePITResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error("PIT creation failed while updating PIT ID", e);
listener.onFailure(e);
});
/**
* Phase 1 of create PIT
*/
Expand Down Expand Up @@ -144,7 +151,8 @@ public void executeUpdatePitId(
final ActionListener<UpdatePitContextResponse> groupedActionListener = getGroupedListener(
updatePitIdListener,
createPITResponse,
contextId.shards().size()
contextId.shards().size(),
contextId.shards().values()
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
Expand All @@ -169,6 +177,7 @@ public void executeUpdatePitId(
groupedActionListener
);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e);
groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e));
}
}
Expand Down Expand Up @@ -199,7 +208,8 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLoo
private ActionListener<UpdatePitContextResponse> getGroupedListener(
ActionListener<CreatePITResponse> updatePitIdListener,
CreatePITResponse createPITResponse,
int size
int size,
Collection<SearchContextIdForNode> contexts
) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -209,11 +219,31 @@ public void onResponse(final Collection<UpdatePitContextResponse> responses) {

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts);
updatePitIdListener.onFailure(e);
}
}, size);
}

/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts) {
ActionListener<Integer> deleteListener = new ActionListener<>() {
@Override
public void onResponse(Integer freed) {
// log the number of freed contexts - this is invoke and forget call
logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size()));
}

@Override
public void onFailure(Exception e) {
logger.debug("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
}

@Override
public void run() {
runner.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import java.io.IOException;

/**
* Create point in time response with point in time id and success / failures
*/
public class CreatePITResponse extends ActionResponse implements StatusToXContentObject {
// point in time id
private final String id;
private final int totalShards;
private final int successfulShards;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,28 @@

import java.io.IOException;

/**
* Request used to update PIT reader contexts with pitId, keepAlive and creationTime
*/
public class UpdatePITContextRequest extends TransportRequest {
private final String pitId;
private final long keepAlive;

private final long createTime;
private final long creationTime;
private final ShardSearchContextId searchContextId;

public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long createTime) {
public UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long creationTime) {
this.pitId = pitId;
this.searchContextId = searchContextId;
this.keepAlive = keepAlive;
this.createTime = createTime;
this.creationTime = creationTime;
}

UpdatePITContextRequest(StreamInput in) throws IOException {
super(in);
pitId = in.readString();
keepAlive = in.readLong();
createTime = in.readLong();
creationTime = in.readLong();
searchContextId = new ShardSearchContextId(in);
}

Expand All @@ -42,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pitId);
out.writeLong(keepAlive);
out.writeLong(createTime);
out.writeLong(creationTime);
searchContextId.writeTo(out);
}

Expand All @@ -54,8 +57,8 @@ public String getPitId() {
return pitId;
}

public long getCreateTime() {
return createTime;
public long getCreationTime() {
return creationTime;
}

public long getKeepAlive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@
public class UpdatePitContextResponse extends TransportResponse {
private final String pitId;

private final long createTime;
private final long creationTime;

private final long keepAlive;

UpdatePitContextResponse(StreamInput in) throws IOException {
super(in);
pitId = in.readString();
createTime = in.readLong();
creationTime = in.readLong();
keepAlive = in.readLong();
}

public UpdatePitContextResponse(String pitId, long createTime, long keepAlive) {
public UpdatePitContextResponse(String pitId, long creationTime, long keepAlive) {
this.pitId = pitId;
this.keepAlive = keepAlive;
this.createTime = createTime;
this.creationTime = creationTime;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(createTime);
out.writeLong(creationTime);
out.writeLong(keepAlive);
}

Expand All @@ -49,7 +49,7 @@ public long getKeepAlive() {
return keepAlive;
}

public long getCreateTime() {
return createTime;
public long getCreationTime() {
return creationTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default void onNewPitContext(ReaderContext readerContext) {}

/**
* Executed when a Point-In-Time search {@link SearchContext} is freed.
* This happens on deleteion of a Point-In-Time or on it's keep-alive expiring.
* This happens on deletion of a Point-In-Time or on it's keep-alive is expiring.
* @param readerContext the freed search context
*/
default void onFreePitContext(ReaderContext readerContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(POST, "/{index}/_pit")));
return unmodifiableList(asList(new Route(POST, "/{index}/_search/_point_in_time")));
}

}
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1051,9 +1051,9 @@ public void updatePitIdAndKeepAlive(UpdatePITContextRequest request, ActionListe
}
Releasable updatePit = null;
try {
updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreateTime());
updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime());
updatePit.close();
listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreateTime(), request.getKeepAlive()));
listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive()));
} catch (Exception e) {
freeReaderContext(readerContext.id());
if (updatePit != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public class PitReaderContext extends ReaderContext {

// Storing the encoded PIT ID as part of PIT reader context for use cases such as list pit API
private final SetOnce<String> pitId = new SetOnce<>();
private final SetOnce<Long> createTime = new SetOnce<>();
// Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts
private final SetOnce<Long> creationTime = new SetOnce<>();

public PitReaderContext(
ShardSearchContextId id,
Expand Down Expand Up @@ -52,18 +53,18 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId,
refCounted.incRef();
tryUpdateKeepAlive(keepAliveInMillis);
setPitId(pitId);
setCreateTime(createTime);
setCreationTime(createTime);
return Releasables.releaseOnce(() -> {
this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis()));
refCounted.decRef();
});
}

public long getCreateTime() {
return this.createTime.get();
public long getCreationTime() {
return this.creationTime.get();
}

public void setCreateTime(final long createTime) {
this.createTime.set(createTime);
public void setCreationTime(final long creationTime) {
this.creationTime.set(creationTime);
}
}
Loading

0 comments on commit ece4c00

Please sign in to comment.