From a562cba73b5d4292023c3ae10f52182daaba7aef Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Tue, 23 Aug 2022 23:11:53 +0800 Subject: [PATCH] Support groupId in subscription mode in GET /records (#562) --- .../java/org/astraea/app/web/Handler.java | 42 +++-- .../org/astraea/app/web/RecordHandler.java | 67 ++++++-- .../java/org/astraea/app/web/Response.java | 8 + .../java/org/astraea/app/web/HandlerTest.java | 15 ++ .../astraea/app/web/RecordHandlerTest.java | 145 ++++++++++++++++-- docs/web_server/web_api_records_chinese.md | 6 +- 6 files changed, 246 insertions(+), 37 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/Handler.java b/app/src/main/java/org/astraea/app/web/Handler.java index 982dd9f311..907691b215 100644 --- a/app/src/main/java/org/astraea/app/web/Handler.java +++ b/app/src/main/java/org/astraea/app/web/Handler.java @@ -18,7 +18,6 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -27,7 +26,9 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; +import org.astraea.app.common.Utils; interface Handler extends HttpHandler { @@ -66,15 +67,36 @@ default Response process(HttpExchange exchange) { } @Override - default void handle(HttpExchange exchange) throws IOException { - Response response = process(exchange); - var responseData = response.json().getBytes(StandardCharsets.UTF_8); - exchange - .getResponseHeaders() - .set("Content-Type", String.format("application/json; charset=%s", StandardCharsets.UTF_8)); - exchange.sendResponseHeaders(response.code(), responseData.length); - try (var os = exchange.getResponseBody()) { - os.write(responseData); + default void handle(HttpExchange exchange) { + handleResponse( + (response) -> { + var responseData = response.json().getBytes(StandardCharsets.UTF_8); + exchange + .getResponseHeaders() + .set( + "Content-Type", + String.format("application/json; charset=%s", StandardCharsets.UTF_8)); + Utils.packException( + () -> { + exchange.sendResponseHeaders(response.code(), responseData.length); + try (var os = exchange.getResponseBody()) { + os.write(responseData); + } + }); + }, + process(exchange)); + } + + // visible for testing + static void handleResponse(Consumer responseConsumer, Response response) { + Throwable error = null; + try { + responseConsumer.accept(response); + } catch (Throwable e) { + e.printStackTrace(); + error = e; + } finally { + response.onComplete(error); } } diff --git a/app/src/main/java/org/astraea/app/web/RecordHandler.java b/app/src/main/java/org/astraea/app/web/RecordHandler.java index 3ecfab41e0..ac74114caf 100644 --- a/app/src/main/java/org/astraea/app/web/RecordHandler.java +++ b/app/src/main/java/org/astraea/app/web/RecordHandler.java @@ -50,6 +50,7 @@ import org.astraea.app.consumer.Builder; import org.astraea.app.consumer.Consumer; import org.astraea.app.consumer.Deserializer; +import org.astraea.app.consumer.SubscribedConsumer; import org.astraea.app.producer.Producer; import org.astraea.app.producer.Sender; import org.astraea.app.producer.Serializer; @@ -67,6 +68,7 @@ public class RecordHandler implements Handler { static final String LIMIT = "limit"; static final String TIMEOUT = "timeout"; static final String OFFSET = "offset"; + static final String GROUP_ID = "groupId"; private static final int MAX_CACHE_SIZE = 100; private static final Duration CACHE_EXPIRE_DURATION = Duration.ofMinutes(10); @@ -102,6 +104,7 @@ public Response get(Optional target, Map queries) { throw new IllegalArgumentException("only one seek strategy is allowed"); } + var limit = Integer.parseInt(queries.getOrDefault(LIMIT, "1")); var timeout = Optional.ofNullable(queries.get(TIMEOUT)) .map(DurationField::toDuration) @@ -114,7 +117,13 @@ public Response get(Optional target, Map queries) { partition -> (Builder) Consumer.forPartitions(Set.of(TopicPartition.of(topic, partition)))) - .orElseGet(() -> Consumer.forTopics(Set.of(topic))); + .orElseGet( + () -> { + // disable auto commit here since we commit manually in Records#onComplete + var builder = Consumer.forTopics(Set.of(topic)).disableAutoCommitOffsets(); + Optional.ofNullable(queries.get(GROUP_ID)).ifPresent(builder::groupId); + return builder; + }); var keyDeserializer = Optional.ofNullable(queries.get(KEY_DESERIALIZER)) @@ -149,10 +158,7 @@ public Response get(Optional target, Map queries) { .map(Long::parseLong) .ifPresent(seekTo -> consumerBuilder.seek(Builder.SeekStrategy.SEEK_TO, seekTo)); - try (var consumer = consumerBuilder.build()) { - var limit = Integer.parseInt(queries.getOrDefault(LIMIT, "1")); - return new Records(consumer.poll(limit, timeout).stream().map(Record::new).collect(toList())); - } + return new Records(consumerBuilder.build(), limit, timeout); } @Override @@ -326,10 +332,15 @@ static class Metadata implements Response { } static class Records implements Response { - final Collection data; - - Records(Collection data) { - this.data = data; + private final Consumer consumer; + private final int limit; + private final Duration timeout; + private RecordsData records; + + private Records(Consumer consumer, int limit, Duration timeout) { + this.consumer = requireNonNull(consumer); + this.limit = limit; + this.timeout = requireNonNull(timeout); } @Override @@ -339,7 +350,43 @@ public String json() { .disableHtmlEscaping() .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()) .create() - .toJson(this); + .toJson(records()); + } + + @Override + public void onComplete(Throwable error) { + try { + if (error == null && consumer instanceof SubscribedConsumer) { + ((SubscribedConsumer) consumer).commitOffsets(Duration.ofSeconds(5)); + } + } finally { + consumer.close(); + } + } + + // visible for testing + RecordsData records() { + // make sure consumer poll data only once. + if (records == null) { + records = + new RecordsData( + consumer.poll(limit, timeout).stream().map(Record::new).collect(toList())); + } + return records; + } + + // visible for testing + Consumer consumer() { + return consumer; + } + } + + // this is a DTO that holds json response data and renders a format like {"data": ...} + static class RecordsData { + final Collection data; + + RecordsData(Collection data) { + this.data = data; } } diff --git a/app/src/main/java/org/astraea/app/web/Response.java b/app/src/main/java/org/astraea/app/web/Response.java index 2184ded878..85b7044183 100644 --- a/app/src/main/java/org/astraea/app/web/Response.java +++ b/app/src/main/java/org/astraea/app/web/Response.java @@ -53,6 +53,14 @@ default String json() { return new Gson().toJson(this); } + /** + * Callback fired when the response is sent, regardless of success or failure. + * + * @param error that cause sending response failed, will be null when sending response is + * successful. + */ + default void onComplete(Throwable error) {} + class ResponseImpl implements Response { final int code; final String message; diff --git a/app/src/test/java/org/astraea/app/web/HandlerTest.java b/app/src/test/java/org/astraea/app/web/HandlerTest.java index 8b54598c20..1080012ddf 100644 --- a/app/src/test/java/org/astraea/app/web/HandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/HandlerTest.java @@ -86,4 +86,19 @@ public Response process(HttpExchange exchange) { handler.handle(he); Mockito.verify(he).sendResponseHeaders(200, 0); } + + @Test + void testOnComplete() { + var response = Mockito.mock(Response.class); + var exception = new IllegalStateException("hello"); + Mockito.when(response.json()).thenThrow(exception); + Handler handler = (paths, queries) -> response; + + var exchange = Mockito.mock(HttpExchange.class); + Mockito.when(exchange.getRequestURI()).thenReturn(URI.create("http://localhost:8888/abc")); + Mockito.when(exchange.getRequestMethod()).thenReturn("get"); + + handler.handle(exchange); + Mockito.verify(response).onComplete(exception); + } } diff --git a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java index 5ac404aa1f..df8bcae79e 100644 --- a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java @@ -21,6 +21,7 @@ import static org.astraea.app.web.RecordHandler.ASYNC; import static org.astraea.app.web.RecordHandler.DISTANCE_FROM_BEGINNING; import static org.astraea.app.web.RecordHandler.DISTANCE_FROM_LATEST; +import static org.astraea.app.web.RecordHandler.GROUP_ID; import static org.astraea.app.web.RecordHandler.KEY_DESERIALIZER; import static org.astraea.app.web.RecordHandler.LIMIT; import static org.astraea.app.web.RecordHandler.OFFSET; @@ -42,6 +43,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -307,9 +309,13 @@ void testDistanceFromLatest() { Optional.of(topic), Map.of(DISTANCE_FROM_LATEST, "2", VALUE_DESERIALIZER, "integer"))); - Assertions.assertEquals(2, response.data.size()); + Assertions.assertEquals(2, response.records().data.size()); Assertions.assertEquals( - List.of(8, 9), response.data.stream().map(record -> record.value).collect(toList())); + List.of(8, 9), + response.records().data.stream().map(record -> record.value).collect(toList())); + + // close consumer + response.onComplete(null); } @Test @@ -325,9 +331,13 @@ void testDistanceFromBeginning() { Optional.of(topic), Map.of(DISTANCE_FROM_BEGINNING, "8", VALUE_DESERIALIZER, "integer"))); - Assertions.assertEquals(2, response.data.size()); + Assertions.assertEquals(2, response.records().data.size()); Assertions.assertEquals( - List.of(8, 9), response.data.stream().map(record -> record.value).collect(toList())); + List.of(8, 9), + response.records().data.stream().map(record -> record.value).collect(toList())); + + // close consumer + response.onComplete(null); } @Test @@ -341,9 +351,13 @@ void testSeekTo() { RecordHandler.Records.class, handler.get(Optional.of(topic), Map.of(SEEK_TO, "3", VALUE_DESERIALIZER, "integer"))); - Assertions.assertEquals(2, response.data.size()); + Assertions.assertEquals(2, response.records().data.size()); Assertions.assertEquals( - List.of(3, 4), response.data.stream().map(record -> record.value).collect(toList())); + List.of(3, 4), + response.records().data.stream().map(record -> record.value).collect(toList())); + + // close consumer + response.onComplete(null); } @Test @@ -375,7 +389,14 @@ void testGetRecordByPartition() { handler.get(Optional.of(topic), Map.of(DISTANCE_FROM_BEGINNING, "1", PARTITION, "1"))); Assertions.assertTrue( - response.data.stream().map(r -> r.partition).filter(p -> p != 1).findAny().isEmpty()); + response.records().data.stream() + .map(r -> r.partition) + .filter(p -> p != 1) + .findAny() + .isEmpty()); + + // close consumer + response.onComplete(null); } @Test @@ -392,10 +413,13 @@ void testLimit() { Map.of(DISTANCE_FROM_BEGINNING, "2", LIMIT, "3", VALUE_DESERIALIZER, "integer"))); // limit is just a recommended size here, we might get more records than limit - Assertions.assertEquals(8, response.data.size()); + Assertions.assertEquals(8, response.records().data.size()); Assertions.assertEquals( List.of(2, 3, 4, 5, 6, 7, 8, 9), - response.data.stream().map(record -> record.value).collect(toList())); + response.records().data.stream().map(record -> record.value).collect(toList())); + + // close consumer + response.onComplete(null); } @ParameterizedTest @@ -414,7 +438,7 @@ void testDeserializer(String valueDeserializer, byte[] value, Object expectedVal handler.get( Optional.of(topic), Map.of(DISTANCE_FROM_LATEST, "1", VALUE_DESERIALIZER, valueDeserializer))); - var records = List.copyOf(response.data); + var records = List.copyOf(response.records().data); Assertions.assertEquals(1, records.size()); if (valueDeserializer.equals("bytearray")) { @@ -422,6 +446,9 @@ void testDeserializer(String valueDeserializer, byte[] value, Object expectedVal } else { Assertions.assertEquals(expectedValue, records.get(0).value); } + + // close consumer + response.onComplete(null); } private static Stream forTestDeserializer() { @@ -474,8 +501,8 @@ void testGetResponse() { "string", VALUE_DESERIALIZER, "integer"))); - Assertions.assertEquals(1, response.data.size()); - var recordDto = response.data.iterator().next(); + Assertions.assertEquals(1, response.records().data.size()); + var recordDto = response.records().data.iterator().next(); Assertions.assertEquals(topic, recordDto.topic); Assertions.assertEquals(0, recordDto.partition); Assertions.assertEquals(0, recordDto.offset); @@ -490,6 +517,9 @@ void testGetResponse() { var headerDto = recordDto.headers.iterator().next(); Assertions.assertEquals("a", headerDto.key); Assertions.assertArrayEquals("b".getBytes(UTF_8), headerDto.value); + + // close consumer + response.onComplete(null); } @Test @@ -541,6 +571,9 @@ void testGetJsonResponse() { + "\"leaderEpoch\":0" + "}]}", response.json()); + + // close consumer + response.onComplete(null); } @Test @@ -598,7 +631,7 @@ void testPostAndGet() { "integer", PARTITION, "0"))); - var record = records.data.iterator().next(); + var record = records.records().data.iterator().next(); Assertions.assertEquals(topic, record.topic); Assertions.assertEquals(0, record.partition); Assertions.assertEquals(0, record.offset); @@ -609,6 +642,9 @@ var record = records.data.iterator().next(); Assertions.assertEquals(100, record.value); Assertions.assertEquals(currentTimestamp, record.timestamp); Assertions.assertEquals(List.of(), record.headers); + + // close consumer + records.onComplete(null); } @Test @@ -616,9 +652,10 @@ void testTimeout() { Assertions.assertThrows( IllegalArgumentException.class, () -> getRecordHandler().get(Optional.of("test"), Map.of(TIMEOUT, "foo"))); - Assertions.assertInstanceOf( - RecordHandler.Records.class, - getRecordHandler().get(Optional.of("test"), Map.of(TIMEOUT, "10s"))); + var response = getRecordHandler().get(Optional.of("test"), Map.of(TIMEOUT, "10s")); + Assertions.assertInstanceOf(RecordHandler.Records.class, response); + // close consumer + response.onComplete(null); } @Test @@ -711,6 +748,82 @@ void testDeletePartition() { } } + @Test + void testGetRecordsCommitOffsetWithGroupId() { + var topic = Utils.randomString(10); + var groupId = Utils.randomString(10); + var recordHandler = getRecordHandler(); + + Supplier getRecords = + () -> + Assertions.assertInstanceOf( + RecordHandler.Records.class, + recordHandler.get( + Optional.of(topic), Map.of(GROUP_ID, groupId, VALUE_DESERIALIZER, "integer"))); + + // send this request to register consumer group + var response = getRecords.get(); + Assertions.assertEquals(response.records().data.size(), 0); + Handler.handleResponse((ignored) -> {}, response); + + produceData(topic, 5); + response = getRecords.get(); + Assertions.assertEquals(response.records().data.size(), 5); + Handler.handleResponse((ignored) -> {}, response); + + produceData(topic, 2); + response = getRecords.get(); + Assertions.assertEquals(response.records().data.size(), 2); + // throw error here to fire onComplete with error + Handler.handleResponse( + (ignored) -> { + throw new RuntimeException(); + }, + response); + + // retry get records again, and get data that was supposed to send back last time + response = getRecords.get(); + Assertions.assertEquals(response.records().data.size(), 2); + Handler.handleResponse((ignored) -> {}, response); + } + + // test consumer in different modes, subscribe and assignment + private static Stream forTestGetRecordsCloseConsumer() { + return Stream.of( + arguments(Map.of(GROUP_ID, Utils.randomString(10))), arguments(Map.of(PARTITION, "0"))); + } + + @ParameterizedTest + @MethodSource("forTestGetRecordsCloseConsumer") + void testGetRecordsCloseConsumer(Map args) { + var topic = Utils.randomString(10); + var recordHandler = getRecordHandler(); + + var response = + Assertions.assertInstanceOf( + RecordHandler.Records.class, recordHandler.get(Optional.of(topic), args)); + Handler.handleResponse((ignored) -> {}, response); + @SuppressWarnings("resource") // consumer already closed in Response#onComplete + var error = + Assertions.assertThrows( + IllegalStateException.class, () -> response.consumer().poll(Duration.ofSeconds(1))); + Assertions.assertEquals(error.getMessage(), "This consumer has already been closed."); + + var response2 = + Assertions.assertInstanceOf( + RecordHandler.Records.class, recordHandler.get(Optional.of(topic), args)); + Handler.handleResponse( + (ignored) -> { + throw new RuntimeException(); + }, + response2); + @SuppressWarnings("resource") // consumer already closed in Response#onComplete + var error2 = + Assertions.assertThrows( + IllegalStateException.class, () -> response2.consumer().poll(Duration.ofSeconds(1))); + Assertions.assertEquals(error2.getMessage(), "This consumer has already been closed."); + } + private RecordHandler getRecordHandler() { return new RecordHandler(Admin.of(bootstrapServers()), bootstrapServers()); } diff --git a/docs/web_server/web_api_records_chinese.md b/docs/web_server/web_api_records_chinese.md index cddf76e722..983383383b 100644 --- a/docs/web_server/web_api_records_chinese.md +++ b/docs/web_server/web_api_records_chinese.md @@ -111,6 +111,7 @@ GET /records/{topic} | 名稱 | 說明 | 預設值 | |-----------------------|--------------------------------------------------------------------------------------------------------------|--------| | partition | (選填) 指定要讀取之 partition | 無 | +| groupId | (選填) 指定 consumer group id,注意若同時有指定 partition,則此參數不會生效 | 無 | | keyDeserializer | (選填) key deserializer | string | | valueDeserializer | (選填) value deserializer | string | | limit | (選填) 回傳資料筆數上限,注意此僅為一建議值,您仍有可能取得大於此值之資料筆數 | 1 | @@ -121,7 +122,10 @@ GET /records/{topic} - keyDeserializer/valueDeserializer 可選擇 `bytearray`/`string`/`long`/`integer`/`float`/`double` - 若 deserializer 選擇 `bytearray`,回傳的 key/value 值為經 base64 encoding 後之字串 -- distanceFromLatest / distanceFromBeginning / seekTo 僅能從三者中挑選一種使用。若三者都不填寫,預設行為與 consumer `auto.offset.reset=latest` 一致 +- distanceFromLatest / distanceFromBeginning / seekTo 僅能從三者中挑選一種使用。若三者都不填寫,預設行為與 + consumer `auto.offset.reset=latest` 一致 +- consumer 會在傳送資料至客戶端後再提交 offset,注意過程若發生任何錯誤(例:傳送失敗),或者使用者在請求時指定`partition` + ,則不會提交 offset。 以下範例均假設已經在僅有一個 partition 之 topic `test` 並插入 10 筆資料,key 型別為 string, value 型別為 integer,key/value 值 從第一筆開始為 `key: "test0", value: 0` 至第十筆為 `key: test9, value: 9`