Skip to content

Commit

Permalink
fix: /query rest endpoint should return valid JSON (#3819)
Browse files Browse the repository at this point in the history
NOTE: for the 5.4.x release the decision was made to NOT fix the badly formed JSON for existing push queries. This has only been fixed for pull queries :(

Previously, the RESTful `/query` endpoint was returning invalid json. For example:

```json
{"row":{"columns":["USER_1","PAGE_1",1,"1"], "errorMessage": null, "finalMessage": null,"terminal":true}}}
{"row":{"columns":["USER_2","PAGE_2",2,"2"], "errorMessage": null, "finalMessage": null,"terminal":true}}}
{"row": null, "errorMessage": null, "finalMessage":"Limit Reached","terminal":true}}"
```

Each _line_ contained a valid JSON object, but the payload as a whole was invalid and the parsing fails if a string contains an embedded new line character.

The payload as a whole is now a valid JSON document, being a JSON array of rows, and can handle strings with embedded new line characters.
In addition, I've hidden null fields and the 'terminal' field, which was never supposed to be in the JSON. The output now looks like:

```json
{"row":{"columns":["USER_1","PAGE_1",1,"1"]}},
{"row":{"columns":["USER_2","PAGE_2",2,"2"]}},
{"finalMessage":"Limit Reached"}]"
```

The CLI is backwards compatible with older versions of the server, though it won't output column headings from older versions.

BREAKING CHANGE: the response from the RESTful API for push queries has changed: it is now a valid JSON document containing a JSON array, where each element is JSON object containing either a row of data, an error message, or a final message.  The `terminal` field has been removed.

(cherry picked from commit 13ced13)
  • Loading branch information
big-andy-coates committed Nov 14, 2019
1 parent 9a47eaf commit b278e83
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private Response handlePullQuery(
final StreamedRow header = StreamedRow.header(entity.getQueryId(), entity.getSchema());

final List<StreamedRow> rows = entity.getRows().stream()
.map(GenericRow::new)
.map(StreamedQueryResource::toGenericRow)
.map(StreamedRow::row)
.collect(Collectors.toList());

Expand Down Expand Up @@ -333,6 +333,11 @@ private static Collection<String> findPossibleTopicMatches(
.filter(name -> name.equalsIgnoreCase(topicName))
.collect(Collectors.toSet());
}

@SuppressWarnings("unchecked")
private static GenericRow toGenericRow(final List<?> values) {
return new GenericRow((List)values);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

import com.fasterxml.jackson.core.type.TypeReference;
import io.confluent.common.utils.IntegrationTest;
Expand Down Expand Up @@ -290,20 +290,19 @@ public void shouldExecutePullQueryOverRest() {
// When:
final Supplier<List<String>> call = () -> {
final String response = rawRestQueryRequest(
"SELECT * from " + AGG_TABLE + " WHERE ROWKEY='" + AN_AGG_KEY + "';"
"SELECT COUNT, ROWKEY from " + AGG_TABLE + " WHERE ROWKEY='" + AN_AGG_KEY + "';"
);
return Arrays.asList(response.split(System.lineSeparator()));
};

// Then:
final List<String> messages = assertThatEventually(call, hasSize(HEADER + 1));
final List<Map<String, Object>> parsed = parseRawRestQueryResponse(String.join("", messages));
assertThat(parsed, hasSize(HEADER + 1));
assertThat(parsed.get(0).get("header"), instanceOf(Map.class));
assertThat(((Map) parsed.get(0).get("header")).get("queryId"), is(notNullValue()));
assertThat(((Map) parsed.get(0).get("header")).get("schema"),
is("`ROWKEY` STRING KEY, `COUNT` BIGINT"));
assertThat(messages.get(1), is("{\"row\":{\"columns\":[[\"USER_1\",1]]}}]"));
assertThat(messages, hasSize(HEADER + 1));

assertThat(messages.get(0), startsWith("[{\"header\":{\"queryId\":\""));
assertThat(messages.get(0),
endsWith("\",\"schema\":\"`COUNT` BIGINT, `ROWKEY` STRING KEY\"}},"));
assertThat(messages.get(1), is("{\"row\":{\"columns\":[1,\"USER_1\"]}}]"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.codehaus.plexus.util.StringUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpStatus.Code;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -340,14 +341,18 @@ public void shouldStreamRowsCorrectly() throws Throwable {
}
final String responseLine = responseScanner.nextLine();

if (responseLine.isEmpty()) {
String jsonLine = StringUtils.stripStart(responseLine, "[");
jsonLine = StringUtils.stripEnd(jsonLine, ",");
jsonLine = StringUtils.stripEnd(jsonLine, "]");

if (jsonLine.isEmpty()) {
i--;
continue;
}

if (i == 0) {
// Header:
assertThat(responseLine, is("{\"header\":{\"queryId\":\"none\",\"schema\":\"`f1` INTEGER\"}}"));
assertThat(jsonLine, is("{\"header\":{\"queryId\":\"none\",\"schema\":\"`f1` INTEGER\"}}"));
continue;
}

Expand All @@ -357,9 +362,9 @@ public void shouldStreamRowsCorrectly() throws Throwable {
}

final GenericRow testRow = objectMapper
.readValue(responseLine, StreamedRow.class)
.readValue(jsonLine, StreamedRow.class)
.getRow()
.orElse(null);
.get();

assertEquals(expectedRow, testRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;

public final class QueryStream implements Closeable, Iterator<StreamedRow> {

Expand Down Expand Up @@ -112,17 +113,22 @@ private boolean bufferNextRow() {
try {
while (responseScanner.hasNextLine()) {
final String responseLine = responseScanner.nextLine().trim();
if (!responseLine.isEmpty()) {
try {
bufferedRow = objectMapper.readValue(responseLine, StreamedRow.class);
} catch (final IOException exception) {
if (closed) {
return false;
}
throw new RuntimeException(exception);

final String jsonMsg = toJsonMsg(responseLine);

if (jsonMsg.isEmpty()) {
continue;
}

try {
bufferedRow = objectMapper.readValue(jsonMsg, StreamedRow.class);
} catch (final IOException exception) {
if (closed) {
return false;
}
return true;
throw new RuntimeException(exception);
}
return true;
}

return false;
Expand All @@ -135,4 +141,27 @@ private boolean bufferNextRow() {
throw e;
}
}

/**
* Convert the single line within the full response into a valid JSON object.
*
* <p>The entire response is an array of JSON objects, e.g. in the form:
*
* <pre>
* {@code
* [{...stuff...},
* {...stuff...},
* ...more rows....
* {...stuff...}],
* }
* </pre>
*
* <p>This method trims any leading {@code [} or trailing {@code ,} or {@code ]}
*/
private static String toJsonMsg(final String responseLine) {
String result = StringUtils.removeStart(responseLine, "[");
result = StringUtils.removeEnd(result, "]");
result = StringUtils.removeEnd(result, ",");
return result.trim();
}
}

0 comments on commit b278e83

Please sign in to comment.