Skip to content

Commit

Permalink
Create source with custom Iterator to avoid memory issues
Browse files Browse the repository at this point in the history
See #91
  • Loading branch information
fsteeg committed Jun 20, 2018
1 parent 0ae8c51 commit fff8268
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions app/controllers/HomeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -30,6 +31,7 @@
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.riot.RiotNotFoundException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -272,30 +274,34 @@ public Result search(String q, String filter, int from, int size, String format)
}

private Result jsonLines(String q, SearchResponse response) {
long totalHits = index.query(q).getHits().getTotalHits();
if (totalHits > 0) {
QueryBuilder query = QueryBuilders.queryStringQuery(q);
Logger.debug("Scrolling with query: q={}, query={}", q, query);
TimeValue keepAlive = new TimeValue(60000);
SearchResponse scrollResponse = index.client().prepareSearch(config("index.name"))
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query)
.setSize(100 /* hits per shard for each scroll */).get();
Source<ByteString, ?> source = Source.from(() -> scroller(scrollResponse, keepAlive).iterator());
return ok().chunked(source).as(Accept.Format.BULK.types[0]);
} else {
return ok().as(Accept.Format.BULK.types[0]);
}
QueryBuilder query = QueryBuilders.queryStringQuery(q);
TimeValue keepAlive = new TimeValue(60000);
SearchRequestBuilder scrollRequest = index.client().prepareSearch(config("index.name"))
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query)
.setSize(100 /* hits per shard for each scroll */);
Logger.debug("Scrolling with query: q={}, request={}", q, scrollRequest);
Source<ByteString, ?> source = Source.from(() -> hitIterator(scrollRequest.get(), keepAlive));
return ok().chunked(source).as(Accept.Format.BULK.types[0]);
}

private Stream<ByteString> scroller(SearchResponse scrollResponse, TimeValue keepAlive) {
if (scrollResponse.getHits().getHits().length != 0) {
Stream<ByteString> thisScroll = Stream.of(scrollResponse.getHits().getHits())
.map((SearchHit hit) -> ByteString.fromString(hit.getSourceAsString() + "\n"));
Stream<ByteString> nextScroll = scroller(index.client().prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(keepAlive).execute().actionGet(), keepAlive);
return Stream.concat(thisScroll, nextScroll);
}
return Stream.empty();
private Iterator<ByteString> hitIterator(SearchResponse scrollResponse, TimeValue keepAlive) {
return new Iterator<ByteString>() {
Iterator<SearchHit> iterator = scrollResponse.getHits().iterator();

@Override
public boolean hasNext() {
if (!iterator.hasNext()) {
iterator = index.client().prepareSearchScroll(scrollResponse.getScrollId())//
.setScroll(keepAlive).execute().actionGet().getHits().iterator();
}
return iterator.hasNext();
}

@Override
public ByteString next() {
return ByteString.fromString(iterator.next().getSourceAsString() + "\n");
}
};
}

private Result htmlSearch(String q, String type, int from, int size, String format, SearchResponse response) {
Expand Down

0 comments on commit fff8268

Please sign in to comment.