From f640e0827a0f5a25ea14b300ab3ac6d0ff57b66c Mon Sep 17 00:00:00 2001 From: Fabian Steeg Date: Tue, 19 Jun 2018 14:40:29 +0200 Subject: [PATCH] Create source using streams and Source.from, not Source.actorRef Details: https://www.playframework.com/documentation/2.6.x/JavaStream https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html https://doc.akka.io/japi/akka/current/akka/stream/javadsl/Source.html See: https://github.com/hbz/lobid-gnd/issues/91 --- app/controllers/HomeController.java | 42 +++++++++++------------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index dda9030..1e06fb4 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -50,10 +50,6 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigObject; -import akka.NotUsed; -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.stream.OverflowStrategy; import akka.stream.javadsl.Source; import akka.util.ByteString; import apps.Convert; @@ -271,34 +267,28 @@ public Result search(String q, String filter, int from, int size, String format) private Result jsonLines(String q, SearchResponse response) { long totalHits = index.query(q).getHits().getTotalHits(); if (totalHits > 0) { - Source source = Source.actorRef((int) totalHits, OverflowStrategy.fail()) - .mapMaterializedValue(actor -> { - scrollQuery(q, actor); - return NotUsed.getInstance(); - }); + QueryBuilder query = QueryBuilders.queryStringQuery(q); + Logger.debug("Scrolling with query: q={}, query={}", q, query); + TimeValue keepAlive = new TimeValue(60000); + SearchResponse scrollResponse = index.client().prepareSearch(config("index.name")) + .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query) + .setSize(100 /* hits per shard for each scroll */).get(); + Source source = Source.from(() -> scroller(scrollResponse, keepAlive).iterator()); return ok().chunked(source).as(Accept.Format.BULK.types[0]); } else { return ok().as(Accept.Format.BULK.types[0]); } } - private void scrollQuery(String q, ActorRef actor) { - QueryBuilder query = QueryBuilders.queryStringQuery(q); - Logger.debug("Scrolling with query: q={}, query={}", q, query); - TimeValue keepAlive = new TimeValue(60000); - SearchResponse scrollResponse = index.client().prepareSearch(config("index.name")) - .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query) - .setSize(100 /* hits per shard for each scroll */).get(); - do { - for (SearchHit hit : scrollResponse.getHits().getHits()) { - actor.tell(ByteString.fromString(hit.getSourceAsString() + "\n"), null); - } - Logger.trace("Scrolling, ID {}", scrollResponse.getScrollId()); - scrollResponse = index.client().prepareSearchScroll(scrollResponse.getScrollId()).setScroll(keepAlive) - .execute().actionGet(); - } while (scrollResponse.getHits().getHits().length != 0); - Logger.debug("Last scroll response for bulk request: {}", scrollResponse); - actor.tell(new Status.Success(NotUsed.getInstance()), null); + private Stream scroller(SearchResponse scrollResponse, TimeValue keepAlive) { + if (scrollResponse.getHits().getHits().length != 0) { + Stream thisScroll = Stream.of(scrollResponse.getHits().getHits()) + .map((SearchHit hit) -> ByteString.fromString(hit.getSourceAsString() + "\n")); + Stream nextScroll = scroller(index.client().prepareSearchScroll(scrollResponse.getScrollId()) + .setScroll(keepAlive).execute().actionGet(), keepAlive); + return Stream.concat(thisScroll, nextScroll); + } + return Stream.empty(); } private Result htmlSearch(String q, String type, int from, int size, String format, SearchResponse response) {