Skip to content

Commit

Permalink
Support bulk requests via format parameter and content negotiation
Browse files Browse the repository at this point in the history
See #91
  • Loading branch information
fsteeg committed Jun 19, 2018
1 parent db4b465 commit 8bec747
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 9 deletions.
3 changes: 2 additions & 1 deletion app/controllers/Accept.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ enum Format {
HTML("html", "text/html"), //
RDF_XML("rdf", "application/rdf+xml", "application/xml", "text/xml"), //
N_TRIPLE("nt", "application/n-triples", "text/plain"), //
TURTLE("ttl", "text/turtle", "application/x-turtle");
TURTLE("ttl", "text/turtle", "application/x-turtle"), //
BULK("bulk", "application/x-jsonlines");

String[] types;
String queryParamString;
Expand Down
63 changes: 59 additions & 4 deletions app/controllers/HomeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@
import org.apache.jena.riot.RiotNotFoundException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -44,6 +49,12 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import apps.Convert;
import controllers.Accept.Format;
import models.AuthorityResource;
Expand Down Expand Up @@ -232,8 +243,9 @@ public Result gnd(String id) {
}

public Result search(String q, String filter, int from, int size, String format) {
String responseFormat = Accept.formatFor(format, request().acceptedTypes()).queryParamString;
SearchResponse response = index.query(q.isEmpty() ? "*" : q, filter, from, size);
Format responseFormat = Accept.formatFor(format, request().acceptedTypes());
String queryString = (q == null || q.isEmpty()) ? "*" : q;
SearchResponse response = index.query(queryString, filter, from, size);
response().setHeader("Access-Control-Allow-Origin", "*");
String[] formatAndConfig = format == null ? new String[] {} : format.split(":");
boolean returnSuggestions = formatAndConfig.length == 2;
Expand All @@ -242,8 +254,51 @@ public Result search(String q, String filter, int from, int size, String format)
.map(hit -> hit.getSource()).collect(Collectors.toList());
return withCallback(toSuggestions(Json.toJson(hits), formatAndConfig[1]));
}
return responseFormat.equals("html") ? htmlSearch(q, filter, from, size, responseFormat, response)
: ok(returnAsJson(q, response)).as(config("index.content"));
switch (responseFormat) {
case HTML: {
return htmlSearch(q, filter, from, size, responseFormat.queryParamString, response);
}
case BULK: {
return jsonLines(queryString, response);
}
default: {
return ok(returnAsJson(q, response)).as(config("index.content"));
}
}
}

private Result jsonLines(String q, SearchResponse response) {
if (index.query(q).getHits().getTotalHits() > 0) {
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(actor -> {
scrollQuery(q, actor);
return NotUsed.getInstance();
});
return ok().chunked(source).as(Accept.Format.BULK.types[0]);
} else {
return ok().as(Accept.Format.BULK.types[0]);
}
}

private void scrollQuery(String q, ActorRef actor) {
QueryBuilder query = QueryBuilders.queryStringQuery(q);
Logger.debug("bulkGnd: q={}, query={}", q, query);
TimeValue keepAlive = new TimeValue(60000);
SearchResponse scrollResp = index.client().prepareSearch(config("index.name"))
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(keepAlive).setQuery(query)
.setSize(100 /* hits per shard for each scroll */).get();
String scrollId = scrollResp.getScrollId();
while (scrollResp.getHits().iterator().hasNext()) {
scrollResp.getHits().forEach((hit) -> {
actor.tell(ByteString.fromString(hit.getSourceAsString()), null);
actor.tell(ByteString.fromString("\n"), null);
});
scrollResp = index.client().prepareSearchScroll(scrollId).setScroll(keepAlive).execute().actionGet();
scrollId = scrollResp.getScrollId();
}
SearchResponse lastResponse = scrollResp;
Logger.debug("Last search response for bulk request: " + lastResponse);
actor.tell(new Status.Success(NotUsed.getInstance()), null);
}

private Result htmlSearch(String q, String type, int from, int size, String format, SearchResponse response) {
Expand Down
5 changes: 5 additions & 0 deletions test/controllers/AcceptIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public static Collection<Object[]> data() {
{ fakeRequest(GET, "/gnd/search?q=*&format=json"), /*->*/ "application/json" },
{ fakeRequest(GET, "/gnd/search?q=*&format=whatever"), /*->*/ "application/json" },
{ fakeRequest(GET, "/gnd/search?q=*").header("Accept", "text/plain"), /*->*/ "application/json" },
// search, bulk format: JSON lines
{ fakeRequest(GET, "/gnd/search?q=*").header("Accept", "application/x-jsonlines"), /*->*/ "application/x-jsonlines" },
{ fakeRequest(GET, "/gnd/search?format=bulk"), /*->*/ "application/x-jsonlines" },
{ fakeRequest(GET, "/gnd/search?q=*&format=bulk"), /*->*/ "application/x-jsonlines" },
{ fakeRequest(GET, "/gnd/search?q=vwxyz&format=bulk"), /*->*/ "application/x-jsonlines" },
// search, other formats as query param:
{ fakeRequest(GET, "/gnd/search?q=*&format=html"), /*->*/ "text/html" },
// search, other formats via header:
Expand Down
13 changes: 9 additions & 4 deletions test/controllers/AcceptUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public static Collection<Object[]> data() {
{ fakeRequest().header("Accept", ""), null, /*->*/ "json" }, //
{ fakeRequest().header("Accept", "application/pdf"), null, /*->*/ "json" },
// no header, just format parameter:
{ fakeRequest(), "html", /*->*/ "html" }, { fakeRequest(), "json", /*->*/ "json" },
{ fakeRequest(), "json:preferredName", /*->*/ "json(.+)?" },
{ fakeRequest(), "ttl", /*->*/ "ttl" }, { fakeRequest(), "nt", /*->*/ "nt" },
{ fakeRequest(), "html", /*->*/ "html" }, //
{ fakeRequest(), "json", /*->*/ "json" }, //
{ fakeRequest(), "json:preferredName", /*->*/ "json(.+)?" }, //
{ fakeRequest(), "ttl", /*->*/ "ttl" }, //
{ fakeRequest(), "nt", /*->*/ "nt" }, //
{ fakeRequest(), "bulk", /*->*/ "bulk" },
// supported content types, no format parameter given:
{ fakeRequest().header("Accept", "text/html"), null, /*->*/ "html" },
{ fakeRequest().header("Accept", "application/json"), null, /*->*/ "json" },
Expand All @@ -53,11 +56,13 @@ public static Collection<Object[]> data() {
{ fakeRequest().header("Accept", "application/xml"), null, /*->*/ "rdf" },
{ fakeRequest().header("Accept", "application/rdf+xml"), null, /*->*/ "rdf" },
{ fakeRequest().header("Accept", "text/xml"), null, /*->*/ "rdf" },
{ fakeRequest().header("Accept", "application/x-jsonlines"), null, /*->*/ "bulk" },
// we pick the preferred content type:
{ fakeRequest().header("Accept", "text/html,application/json"), null, /*->*/"html" },
{ fakeRequest().header("Accept", "application/json,text/html"), null, /*->*/ "json" },
// format parameter overrides header:
{ fakeRequest().header("Accept", "text/html"), "json", /*->*/ "json" } });
{ fakeRequest().header("Accept", "text/html"), "json", /*->*/ "json" }, //
{ fakeRequest().header("Accept", "text/html"), "bulk", /*->*/ "bulk" } });
}

private RequestBuilder fakeRequest;
Expand Down

0 comments on commit 8bec747

Please sign in to comment.