Skip to content

Commit

Permalink
Implements delete datasource API
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 9, 2023
1 parent 91c609b commit 2f0a0e4
Show file tree
Hide file tree
Showing 14 changed files with 648 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Ip2Geo datasource delete action
*/
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Delete datasource action instance
*/
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
/**
* Delete datasource action name
*/
public static final String NAME = "cluster:admin/geospatial/datasource/delete";

private DeleteDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

/**
* GeoIP datasource delete request
*/
@Getter
@Setter
@AllArgsConstructor
public class DeleteDatasourceRequest extends ActionRequest {
/**
* @param name the datasource name
* @return the datasource name
*/
private String name;

/**
* Constructor
*
* @param in the stream input
* @throws IOException IOException
*/
public DeleteDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (name == null || name.isBlank()) {
errors = new ActionRequestValidationException();
errors.addValidationError("Datasource name should not be empty");
}
return errors;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to delete datasource
*/
@Log4j2
public class DeleteDatasourceTransportAction extends HandledTransportAction<DeleteDatasourceRequest, AcknowledgedResponse> {
private static final long LOCK_DURATION_IN_SECONDS = 300l;
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;

/**
* Constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param lockService the lock service
* @param ingestService the ingest service
* @param datasourceFacade the datasource facade
*/
@Inject
public DeleteDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
}

/**
* We delete datasource regardless of its state as long as we can acquire a lock
*
* @param task the task
* @param request the request
* @param listener the listener
*/
@Override
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
return;
}
try {
deleteDatasource(request.getName());
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}

@VisibleForTesting
protected void deleteDatasource(final String datasourceName) throws IOException {
Datasource datasource = datasourceFacade.getDatasource(datasourceName);
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}

setDatasourceStateAsDeleting(datasource);
datasourceFacade.deleteDatasource(datasource);
}

private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException {
if (isSafeToDelete(datasource) == false) {
throw new OpenSearchException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
datasource.setState(DatasourceState.DELETING);
datasourceFacade.updateDatasource(datasource);

// Check again as processor might just have been created.
// If it fails to update the state back to the previous state, the new processor
// will fail to convert an ip to a geo data.
// In such case, user have to delete the processor and delete this datasource again.
if (isSafeToDelete(datasource) == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
}
}

private boolean isSafeToDelete(Datasource datasource) {
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
return ingestMetadata.getPipelines()
.keySet()
.stream()
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName()))
.findAny()
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Name of a get datasource action
* Get datasource action name
*/
public static final String NAME = "cluster:admin/geospatial/datasource/get";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
import static org.opensearch.rest.RestRequest.Method.DELETE;

import java.util.List;
import java.util.Locale;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

/**
* Rest handler for Ip2Geo datasource delete request
*/
public class RestDeleteDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource_delete";
private static final String PARAMS_NAME = "name";

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String name = request.param(PARAMS_NAME);
final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name);

return channel -> client.executeLocally(
DeleteDatasourceAction.INSTANCE,
deleteDatasourceRequest,
new RestToXContentListener<>(channel)
);
}

@Override
public List<Route> routes() {
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), String.format(Locale.ROOT, "ip2geo/datasource/{%s}", PARAMS_NAME));
return List.of(new Route(DELETE, path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
import java.util.Objects;
import java.util.stream.Collectors;

import javax.swing.*;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -52,6 +53,7 @@
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchHit;

/**
Expand Down Expand Up @@ -164,6 +166,38 @@ public void putDatasource(final Datasource datasource, final ActionListener list
});
}

/**
* Delete datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
* @param datasource the datasource
*
*/
public void deleteDatasource(final Datasource datasource) {
if (client.admin()
.indices()
.prepareDelete(datasource.getIndices().toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices()));
}
DeleteResponse response = client.prepareDelete()
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));

if (response.status().equals(RestStatus.OK)) {
log.info("deleted datasource[{}] successfully", datasource.getName());
} else if (response.status().equals(RestStatus.NOT_FOUND)) {
throw new ResourceNotFoundException("datasource[{}] does not exist", datasource.getName());
} else {
throw new OpenSearchException("failed to delete datasource[{}] with status[{}]", datasource.getName(), response.status());
}
}

/**
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param name the name of a datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
Expand All @@ -44,6 +45,10 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private static final String PROPERTY_IP = "ip";
private final String field;
private final String targetField;
/**
* @return The datasource name
*/
@Getter
private final String datasourceName;
private final Set<String> properties;
private final boolean ignoreMissing;
Expand Down
Loading

0 comments on commit 2f0a0e4

Please sign in to comment.