Skip to content

Commit

Permalink
Support groupId in subscription mode in GET /records (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
brandboat authored Aug 23, 2022
1 parent ca8b8f1 commit a562cba
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 37 deletions.
42 changes: 32 additions & 10 deletions app/src/main/java/org/astraea/app/web/Handler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
Expand All @@ -27,7 +26,9 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.astraea.app.common.Utils;

interface Handler extends HttpHandler {

Expand Down Expand Up @@ -66,15 +67,36 @@ default Response process(HttpExchange exchange) {
}

@Override
default void handle(HttpExchange exchange) throws IOException {
Response response = process(exchange);
var responseData = response.json().getBytes(StandardCharsets.UTF_8);
exchange
.getResponseHeaders()
.set("Content-Type", String.format("application/json; charset=%s", StandardCharsets.UTF_8));
exchange.sendResponseHeaders(response.code(), responseData.length);
try (var os = exchange.getResponseBody()) {
os.write(responseData);
default void handle(HttpExchange exchange) {
handleResponse(
(response) -> {
var responseData = response.json().getBytes(StandardCharsets.UTF_8);
exchange
.getResponseHeaders()
.set(
"Content-Type",
String.format("application/json; charset=%s", StandardCharsets.UTF_8));
Utils.packException(
() -> {
exchange.sendResponseHeaders(response.code(), responseData.length);
try (var os = exchange.getResponseBody()) {
os.write(responseData);
}
});
},
process(exchange));
}

// visible for testing
static void handleResponse(Consumer<Response> responseConsumer, Response response) {
Throwable error = null;
try {
responseConsumer.accept(response);
} catch (Throwable e) {
e.printStackTrace();
error = e;
} finally {
response.onComplete(error);
}
}

Expand Down
67 changes: 57 additions & 10 deletions app/src/main/java/org/astraea/app/web/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.astraea.app.consumer.Builder;
import org.astraea.app.consumer.Consumer;
import org.astraea.app.consumer.Deserializer;
import org.astraea.app.consumer.SubscribedConsumer;
import org.astraea.app.producer.Producer;
import org.astraea.app.producer.Sender;
import org.astraea.app.producer.Serializer;
Expand All @@ -67,6 +68,7 @@ public class RecordHandler implements Handler {
static final String LIMIT = "limit";
static final String TIMEOUT = "timeout";
static final String OFFSET = "offset";
static final String GROUP_ID = "groupId";
private static final int MAX_CACHE_SIZE = 100;
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofMinutes(10);

Expand Down Expand Up @@ -102,6 +104,7 @@ public Response get(Optional<String> target, Map<String, String> queries) {
throw new IllegalArgumentException("only one seek strategy is allowed");
}

var limit = Integer.parseInt(queries.getOrDefault(LIMIT, "1"));
var timeout =
Optional.ofNullable(queries.get(TIMEOUT))
.map(DurationField::toDuration)
Expand All @@ -114,7 +117,13 @@ public Response get(Optional<String> target, Map<String, String> queries) {
partition ->
(Builder<byte[], byte[]>)
Consumer.forPartitions(Set.of(TopicPartition.of(topic, partition))))
.orElseGet(() -> Consumer.forTopics(Set.of(topic)));
.orElseGet(
() -> {
// disable auto commit here since we commit manually in Records#onComplete
var builder = Consumer.forTopics(Set.of(topic)).disableAutoCommitOffsets();
Optional.ofNullable(queries.get(GROUP_ID)).ifPresent(builder::groupId);
return builder;
});

var keyDeserializer =
Optional.ofNullable(queries.get(KEY_DESERIALIZER))
Expand Down Expand Up @@ -149,10 +158,7 @@ public Response get(Optional<String> target, Map<String, String> queries) {
.map(Long::parseLong)
.ifPresent(seekTo -> consumerBuilder.seek(Builder.SeekStrategy.SEEK_TO, seekTo));

try (var consumer = consumerBuilder.build()) {
var limit = Integer.parseInt(queries.getOrDefault(LIMIT, "1"));
return new Records(consumer.poll(limit, timeout).stream().map(Record::new).collect(toList()));
}
return new Records(consumerBuilder.build(), limit, timeout);
}

@Override
Expand Down Expand Up @@ -326,10 +332,15 @@ static class Metadata implements Response {
}

static class Records implements Response {
final Collection<Record> data;

Records(Collection<Record> data) {
this.data = data;
private final Consumer<byte[], byte[]> consumer;
private final int limit;
private final Duration timeout;
private RecordsData records;

private Records(Consumer<byte[], byte[]> consumer, int limit, Duration timeout) {
this.consumer = requireNonNull(consumer);
this.limit = limit;
this.timeout = requireNonNull(timeout);
}

@Override
Expand All @@ -339,7 +350,43 @@ public String json() {
.disableHtmlEscaping()
.registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter())
.create()
.toJson(this);
.toJson(records());
}

@Override
public void onComplete(Throwable error) {
try {
if (error == null && consumer instanceof SubscribedConsumer) {
((SubscribedConsumer<byte[], byte[]>) consumer).commitOffsets(Duration.ofSeconds(5));
}
} finally {
consumer.close();
}
}

// visible for testing
RecordsData records() {
// make sure consumer poll data only once.
if (records == null) {
records =
new RecordsData(
consumer.poll(limit, timeout).stream().map(Record::new).collect(toList()));
}
return records;
}

// visible for testing
Consumer<byte[], byte[]> consumer() {
return consumer;
}
}

// this is a DTO that holds json response data and renders a format like {"data": ...}
static class RecordsData {
final Collection<Record> data;

RecordsData(Collection<Record> data) {
this.data = data;
}
}

Expand Down
8 changes: 8 additions & 0 deletions app/src/main/java/org/astraea/app/web/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ default String json() {
return new Gson().toJson(this);
}

/**
* Callback fired when the response is sent, regardless of success or failure.
*
* @param error that cause sending response failed, will be null when sending response is
* successful.
*/
default void onComplete(Throwable error) {}

class ResponseImpl implements Response {
final int code;
final String message;
Expand Down
15 changes: 15 additions & 0 deletions app/src/test/java/org/astraea/app/web/HandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,19 @@ public Response process(HttpExchange exchange) {
handler.handle(he);
Mockito.verify(he).sendResponseHeaders(200, 0);
}

@Test
void testOnComplete() {
var response = Mockito.mock(Response.class);
var exception = new IllegalStateException("hello");
Mockito.when(response.json()).thenThrow(exception);
Handler handler = (paths, queries) -> response;

var exchange = Mockito.mock(HttpExchange.class);
Mockito.when(exchange.getRequestURI()).thenReturn(URI.create("http://localhost:8888/abc"));
Mockito.when(exchange.getRequestMethod()).thenReturn("get");

handler.handle(exchange);
Mockito.verify(response).onComplete(exception);
}
}
Loading

0 comments on commit a562cba

Please sign in to comment.