Skip to content

Commit

Permalink
Rivers: Add back deletion river content on river removal.
Browse files Browse the repository at this point in the history
In elastic#8877, the deletion of the type associated with a river was
removed. This change adds back the removal using a scan search
along with bulk delete requests.
  • Loading branch information
rjernst committed Mar 25, 2015
1 parent 90dfd78 commit de8160a
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions src/main/java/org/elasticsearch/river/RiversService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -41,12 +46,16 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateListener;
import org.elasticsearch.river.routing.RiverRouting;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
Expand Down Expand Up @@ -200,6 +209,18 @@ public synchronized void closeRiver(RiverName riverName) throws ElasticsearchExc
}

private class ApplyRivers implements RiverClusterStateListener {

/* sends a bulk delete for the documents in the given search response
* returns true if the docs were deleted successfully, false otherwise
*/
private boolean deleteDocs(SearchResponse searchResponse) {
BulkRequestBuilder bulkReq = client.prepareBulk();
for (SearchHit hit : searchResponse.getHits().getHits()) {
bulkReq.add(new DeleteRequest(riverIndexName, hit.type(), hit.id()));
}
return bulkReq.get().hasFailures() == false;
}

@Override
public void riverClusterChanged(RiverClusterChangedEvent event) {
DiscoveryNode localNode = clusterService.localNode();
Expand All @@ -211,6 +232,40 @@ public void riverClusterChanged(RiverClusterChangedEvent event) {
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
closeRiver(riverName);
// also, double check and delete the river content if it was deleted (_meta does not exists)
try {
client.prepareGet(riverIndexName, riverName.name(), "_meta").setListenerThreaded(true).execute(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (getResponse.isExists()) return;

SearchResponse searchResponse = client.prepareSearch(riverIndexName)
.setTypes(riverName.name())
.setQuery(QueryBuilders.matchAllQuery())
.setSearchType(SearchType.SCAN)
.setScroll(TimeValue.timeValueMinutes(2)).get();
boolean hadFailures = searchResponse.getFailedShards() != 0;
while (searchResponse.getHits().totalHits() != 0) {
deleteDocs(searchResponse);
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).get();
hadFailures |= searchResponse.getFailedShards() != 0;
}

if (hadFailures) {
logger.debug("failed to delete some river [{}] content", riverName.name());
}
}

@Override
public void onFailure(Throwable e) {
logger.debug("failed to find river [{}] content for deletion", e, riverName.name());
}
});
} catch (IndexMissingException e) {
// all is well, the _river index was deleted
} catch (Exception e) {
logger.warn("unexpected failure when trying to verify river [{}] deleted", e, riverName.name());
}
}
}

Expand Down

0 comments on commit de8160a

Please sign in to comment.