diff --git a/app/controllers/Accept.java b/app/controllers/Accept.java index bfc989f..7df024d 100644 --- a/app/controllers/Accept.java +++ b/app/controllers/Accept.java @@ -23,7 +23,8 @@ enum Format { HTML("html", "text/html"), // RDF_XML("rdf", "application/rdf+xml", "application/xml", "text/xml"), // N_TRIPLE("nt", "application/n-triples", "text/plain"), // - TURTLE("ttl", "text/turtle", "application/x-turtle"); + TURTLE("ttl", "text/turtle", "application/x-turtle"), // + BULK("bulk", "application/x-jsonlines"); String[] types; String queryParamString; diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index 0978497..49ad738 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -30,9 +30,14 @@ import org.apache.jena.riot.RiotNotFoundException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -44,6 +49,12 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigObject; +import akka.NotUsed; +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Source; +import akka.util.ByteString; import apps.Convert; import controllers.Accept.Format; import models.AuthorityResource; @@ -232,8 +243,9 @@ public Result gnd(String id) { } public Result search(String q, String filter, int from, int size, String format) { - String responseFormat = Accept.formatFor(format, request().acceptedTypes()).queryParamString; - SearchResponse response = index.query(q.isEmpty() ? "*" : q, filter, from, size); + Format responseFormat = Accept.formatFor(format, request().acceptedTypes()); + String queryString = (q == null || q.isEmpty()) ? "*" : q; + SearchResponse response = index.query(queryString, filter, from, size); response().setHeader("Access-Control-Allow-Origin", "*"); String[] formatAndConfig = format == null ? new String[] {} : format.split(":"); boolean returnSuggestions = formatAndConfig.length == 2; @@ -242,8 +254,51 @@ public Result search(String q, String filter, int from, int size, String format) .map(hit -> hit.getSource()).collect(Collectors.toList()); return withCallback(toSuggestions(Json.toJson(hits), formatAndConfig[1])); } - return responseFormat.equals("html") ? htmlSearch(q, filter, from, size, responseFormat, response) - : ok(returnAsJson(q, response)).as(config("index.content")); + switch (responseFormat) { + case HTML: { + return htmlSearch(q, filter, from, size, responseFormat.queryParamString, response); + } + case BULK: { + return jsonLines(queryString, response); + } + default: { + return ok(returnAsJson(q, response)).as(config("index.content")); + } + } + } + + private Result jsonLines(String q, SearchResponse response) { + if (index.query(q).getHits().getTotalHits() > 0) { + Source source = Source.actorRef(256, OverflowStrategy.dropNew()) + .mapMaterializedValue(actor -> { + scrollQuery(q, actor); + return NotUsed.getInstance(); + }); + return ok().chunked(source).as(Accept.Format.BULK.types[0]); + } else { + return ok().as(Accept.Format.BULK.types[0]); + } + } + + private void scrollQuery(String q, ActorRef actor) { + QueryBuilder query = QueryBuilders.queryStringQuery(q); + Logger.debug("bulkGnd: q={}, query={}", q, query); + TimeValue keepAlive = new TimeValue(60000); + SearchResponse scrollResp = index.client().prepareSearch(config("index.name")) + .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query) + .setSize(100 /* hits per shard for each scroll */).get(); + String scrollId = scrollResp.getScrollId(); + while (scrollResp.getHits().iterator().hasNext()) { + scrollResp.getHits().forEach((hit) -> { + actor.tell(ByteString.fromString(hit.getSourceAsString()), null); + actor.tell(ByteString.fromString("\n"), null); + }); + scrollResp = index.client().prepareSearchScroll(scrollId).setScroll(keepAlive).execute().actionGet(); + scrollId = scrollResp.getScrollId(); + } + SearchResponse lastResponse = scrollResp; + Logger.debug("Last search response for bulk request: " + lastResponse); + actor.tell(new Status.Success(NotUsed.getInstance()), null); } private Result htmlSearch(String q, String type, int from, int size, String format, SearchResponse response) { diff --git a/test/controllers/AcceptIntegrationTest.java b/test/controllers/AcceptIntegrationTest.java index ddd9fcd..880698f 100644 --- a/test/controllers/AcceptIntegrationTest.java +++ b/test/controllers/AcceptIntegrationTest.java @@ -43,6 +43,11 @@ public static Collection data() { { fakeRequest(GET, "/gnd/search?q=*&format=json"), /*->*/ "application/json" }, { fakeRequest(GET, "/gnd/search?q=*&format=whatever"), /*->*/ "application/json" }, { fakeRequest(GET, "/gnd/search?q=*").header("Accept", "text/plain"), /*->*/ "application/json" }, + // search, bulk format: JSON lines + { fakeRequest(GET, "/gnd/search?q=*").header("Accept", "application/x-jsonlines"), /*->*/ "application/x-jsonlines" }, + { fakeRequest(GET, "/gnd/search?format=bulk"), /*->*/ "application/x-jsonlines" }, + { fakeRequest(GET, "/gnd/search?q=*&format=bulk"), /*->*/ "application/x-jsonlines" }, + { fakeRequest(GET, "/gnd/search?q=vwxyz&format=bulk"), /*->*/ "application/x-jsonlines" }, // search, other formats as query param: { fakeRequest(GET, "/gnd/search?q=*&format=html"), /*->*/ "text/html" }, // search, other formats via header: diff --git a/test/controllers/AcceptUnitTest.java b/test/controllers/AcceptUnitTest.java index 814358a..a8cbf53 100644 --- a/test/controllers/AcceptUnitTest.java +++ b/test/controllers/AcceptUnitTest.java @@ -39,9 +39,12 @@ public static Collection data() { { fakeRequest().header("Accept", ""), null, /*->*/ "json" }, // { fakeRequest().header("Accept", "application/pdf"), null, /*->*/ "json" }, // no header, just format parameter: - { fakeRequest(), "html", /*->*/ "html" }, { fakeRequest(), "json", /*->*/ "json" }, - { fakeRequest(), "json:preferredName", /*->*/ "json(.+)?" }, - { fakeRequest(), "ttl", /*->*/ "ttl" }, { fakeRequest(), "nt", /*->*/ "nt" }, + { fakeRequest(), "html", /*->*/ "html" }, // + { fakeRequest(), "json", /*->*/ "json" }, // + { fakeRequest(), "json:preferredName", /*->*/ "json(.+)?" }, // + { fakeRequest(), "ttl", /*->*/ "ttl" }, // + { fakeRequest(), "nt", /*->*/ "nt" }, // + { fakeRequest(), "bulk", /*->*/ "bulk" }, // supported content types, no format parameter given: { fakeRequest().header("Accept", "text/html"), null, /*->*/ "html" }, { fakeRequest().header("Accept", "application/json"), null, /*->*/ "json" }, @@ -53,11 +56,13 @@ public static Collection data() { { fakeRequest().header("Accept", "application/xml"), null, /*->*/ "rdf" }, { fakeRequest().header("Accept", "application/rdf+xml"), null, /*->*/ "rdf" }, { fakeRequest().header("Accept", "text/xml"), null, /*->*/ "rdf" }, + { fakeRequest().header("Accept", "application/x-jsonlines"), null, /*->*/ "bulk" }, // we pick the preferred content type: { fakeRequest().header("Accept", "text/html,application/json"), null, /*->*/"html" }, { fakeRequest().header("Accept", "application/json,text/html"), null, /*->*/ "json" }, // format parameter overrides header: - { fakeRequest().header("Accept", "text/html"), "json", /*->*/ "json" } }); + { fakeRequest().header("Accept", "text/html"), "json", /*->*/ "json" }, // + { fakeRequest().header("Accept", "text/html"), "bulk", /*->*/ "bulk" } }); } private RequestBuilder fakeRequest;