Skip to content

Commit

Permalink
Create source using streams and Source.from, not Source.actorRef
Browse files Browse the repository at this point in the history
  • Loading branch information
fsteeg committed Jun 19, 2018
1 parent b52479f commit f640e08
Showing 1 changed file with 16 additions and 26 deletions.
42 changes: 16 additions & 26 deletions app/controllers/HomeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import apps.Convert;
Expand Down Expand Up @@ -271,34 +267,28 @@ public Result search(String q, String filter, int from, int size, String format)
private Result jsonLines(String q, SearchResponse response) {
long totalHits = index.query(q).getHits().getTotalHits();
if (totalHits > 0) {
Source<ByteString, ?> source = Source.<ByteString>actorRef((int) totalHits, OverflowStrategy.fail())
.mapMaterializedValue(actor -> {
scrollQuery(q, actor);
return NotUsed.getInstance();
});
QueryBuilder query = QueryBuilders.queryStringQuery(q);
Logger.debug("Scrolling with query: q={}, query={}", q, query);
TimeValue keepAlive = new TimeValue(60000);
SearchResponse scrollResponse = index.client().prepareSearch(config("index.name"))
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query)
.setSize(100 /* hits per shard for each scroll */).get();
Source<ByteString, ?> source = Source.from(() -> scroller(scrollResponse, keepAlive).iterator());
return ok().chunked(source).as(Accept.Format.BULK.types[0]);
} else {
return ok().as(Accept.Format.BULK.types[0]);
}
}

private void scrollQuery(String q, ActorRef actor) {
QueryBuilder query = QueryBuilders.queryStringQuery(q);
Logger.debug("Scrolling with query: q={}, query={}", q, query);
TimeValue keepAlive = new TimeValue(60000);
SearchResponse scrollResponse = index.client().prepareSearch(config("index.name"))
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query)
.setSize(100 /* hits per shard for each scroll */).get();
do {
for (SearchHit hit : scrollResponse.getHits().getHits()) {
actor.tell(ByteString.fromString(hit.getSourceAsString() + "\n"), null);
}
Logger.trace("Scrolling, ID {}", scrollResponse.getScrollId());
scrollResponse = index.client().prepareSearchScroll(scrollResponse.getScrollId()).setScroll(keepAlive)
.execute().actionGet();
} while (scrollResponse.getHits().getHits().length != 0);
Logger.debug("Last scroll response for bulk request: {}", scrollResponse);
actor.tell(new Status.Success(NotUsed.getInstance()), null);
private Stream<ByteString> scroller(SearchResponse scrollResponse, TimeValue keepAlive) {
if (scrollResponse.getHits().getHits().length != 0) {
Stream<ByteString> thisScroll = Stream.of(scrollResponse.getHits().getHits())
.map((SearchHit hit) -> ByteString.fromString(hit.getSourceAsString() + "\n"));
Stream<ByteString> nextScroll = scroller(index.client().prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(keepAlive).execute().actionGet(), keepAlive);
return Stream.concat(thisScroll, nextScroll);
}
return Stream.empty();
}

private Result htmlSearch(String q, String type, int from, int size, String format, SearchResponse response) {
Expand Down

0 comments on commit f640e08

Please sign in to comment.