From 767b19dfb4222b550affde62b77fa370f0337da3 Mon Sep 17 00:00:00 2001 From: Fabian Engelniederhammer Date: Thu, 18 Apr 2024 09:05:58 +0200 Subject: [PATCH] feat(lapis2)!: read data from SILO as NDJSON Refs: #741 --- .../org/genspectrum/lapis/silo/SiloClient.kt | 83 ++++++------ .../org/genspectrum/lapis/silo/SiloQuery.kt | 42 +++---- .../lapis/model/SiloQueryModelTest.kt | 12 +- .../genspectrum/lapis/silo/SiloClientTest.kt | 118 ++++-------------- 4 files changed, 93 insertions(+), 162 deletions(-) diff --git a/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloClient.kt b/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloClient.kt index c012a09d3..79681fbbd 100644 --- a/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloClient.kt +++ b/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloClient.kt @@ -22,15 +22,13 @@ import java.net.http.HttpResponse.BodyHandlers private val log = KotlinLogging.logger {} -const val SILO_RESPONSE_MAX_LOG_LENGTH = 10_000 - @Component class SiloClient( private val cachedSiloClient: CachedSiloClient, private val dataVersion: DataVersion, private val requestContext: RequestContext, ) { - fun sendQuery(query: SiloQuery): ResponseType { + fun sendQuery(query: SiloQuery): List { val result = cachedSiloClient.sendQuery(query) dataVersion.dataVersion = result.dataVersion @@ -71,32 +69,44 @@ class CachedSiloClient( log.info { "Calling SILO: $queryJson" } - val response = send(URI("$siloUrl/query")) { + val response = send( + uri = URI("$siloUrl/query"), + bodyHandler = BodyHandlers.ofLines(), + tryToReadSiloErrorFromBody = { tryToReadSiloErrorFromString(it.findFirst().orElse("")) }, + ) { it.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .POST(HttpRequest.BodyPublishers.ofString(queryJson)) } - try { - return WithDataVersion( - queryResult = objectMapper.readValue(response.body(), query.action.typeReference).queryResult, - dataVersion = getDataVersion(response), - ) - } catch (exception: Exception) { - val message = "Could not parse response from silo: " + exception::class.toString() + " " + exception.message - throw RuntimeException(message, exception) - } + return WithDataVersion( + queryResult = response.body() + .filter { it.isNotBlank() } + .map { + try { + objectMapper.readValue(it, query.action.typeReference) + } catch (exception: Exception) { + val message = "Could not parse response from silo: " + + exception::class.toString() + " " + exception.message + throw RuntimeException(message, exception) + } + } + .toList(), + dataVersion = getDataVersion(response), + ) } fun callInfo(): InfoData { - val response = send(URI("$siloUrl/info")) { it.GET() } + val response = send(URI("$siloUrl/info"), BodyHandlers.ofString(), ::tryToReadSiloErrorFromString) { it.GET() } return InfoData(getDataVersion(response)) } - private fun send( + private fun send( uri: URI, + bodyHandler: HttpResponse.BodyHandler, + tryToReadSiloErrorFromBody: (ResponseBodyType) -> SiloErrorResponse, buildRequest: (HttpRequest.Builder) -> Unit, - ): HttpResponse { + ): HttpResponse { val request = HttpRequest.newBuilder(uri) .apply(buildRequest) .apply { @@ -107,7 +117,7 @@ class CachedSiloClient( .build() val response = try { - httpClient.send(request, BodyHandlers.ofString()) + httpClient.send(request, bodyHandler) } catch (exception: Exception) { val message = "Could not connect to silo: " + exception::class.toString() + " " + exception.message throw RuntimeException(message, exception) @@ -116,34 +126,18 @@ class CachedSiloClient( if (!uri.toString().endsWith("info")) { log.info { "Response from SILO: ${response.statusCode()}" } } - log.debug { - val body = response.body() - val truncationPostfix = when { - body.length > SILO_RESPONSE_MAX_LOG_LENGTH -> "(...truncated)" - else -> "" - } - "Data from SILO: ${body.take(SILO_RESPONSE_MAX_LOG_LENGTH)}$truncationPostfix" - } if (response.statusCode() != 200) { - val siloErrorResponse = tryToReadSiloError(response) + val siloErrorResponse = tryToReadSiloErrorFromBody(response.body()) if (response.statusCode() == 503) { - val message = siloErrorResponse?.message ?: "Unknown reason." + val message = siloErrorResponse.message throw SiloUnavailableException( "SILO is currently unavailable: $message", response.headers().firstValue("retry-after").orElse(null), ) } - if (siloErrorResponse == null) { - throw SiloException( - HttpStatus.INTERNAL_SERVER_ERROR.value(), - "Internal Server Error", - "Unexpected error from SILO: ${response.body()}", - ) - } - throw SiloException( response.statusCode(), siloErrorResponse.error, @@ -154,15 +148,20 @@ class CachedSiloClient( return response } - private fun tryToReadSiloError(response: HttpResponse) = + private fun tryToReadSiloErrorFromString(responseBody: String) = try { - objectMapper.readValue(response.body()) + objectMapper.readValue(responseBody) } catch (e: Exception) { log.error { "Failed to deserialize error response from SILO: $e" } - null + + throw SiloException( + HttpStatus.INTERNAL_SERVER_ERROR.value(), + "Internal Server Error", + "Unexpected error from SILO: $responseBody", + ) } - private fun getDataVersion(response: HttpResponse): String { + private fun getDataVersion(response: HttpResponse<*>): String { return response.headers().firstValue("data-version").orElse("") } } @@ -171,13 +170,9 @@ class SiloException(val statusCode: Int, val title: String, override val message class SiloUnavailableException(override val message: String, val retryAfter: String?) : Exception(message) -data class SiloQueryResponse( - val queryResult: ResponseType, -) - data class WithDataVersion( val dataVersion: String, - val queryResult: ResponseType, + val queryResult: List, ) data class SiloErrorResponse(val error: String, val message: String) diff --git a/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloQuery.kt b/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloQuery.kt index 2854ddbf3..e71a282ae 100644 --- a/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloQuery.kt +++ b/lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloQuery.kt @@ -14,17 +14,17 @@ import java.time.LocalDate data class SiloQuery(val action: SiloAction, val filterExpression: SiloFilterExpression) -class AggregationDataTypeReference : TypeReference>>() +class AggregationDataTypeReference : TypeReference() -class MutationDataTypeReference : TypeReference>>() +class MutationDataTypeReference : TypeReference() -class AminoAcidMutationDataTypeReference : TypeReference>>() +class AminoAcidMutationDataTypeReference : TypeReference() -class DetailsDataTypeReference : TypeReference>>() +class DetailsDataTypeReference : TypeReference() -class InsertionDataTypeReference : TypeReference>>() +class InsertionDataTypeReference : TypeReference() -class SequenceDataTypeReference : TypeReference>>() +class SequenceDataTypeReference : TypeReference() interface CommonActionFields { val orderByFields: List @@ -36,7 +36,7 @@ interface CommonActionFields { const val ORDER_BY_RANDOM_FIELD_NAME = "random" sealed class SiloAction( - @JsonIgnore val typeReference: TypeReference>, + @JsonIgnore val typeReference: TypeReference, @JsonIgnore val cacheable: Boolean, ) : CommonActionFields { companion object { @@ -45,7 +45,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = AggregatedAction( groupByFields = groupByFields, orderByFields = getNonRandomizedOrderByFields(orderByFields), @@ -59,7 +59,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = MutationsAction( minProportion = minProportion, orderByFields = getNonRandomizedOrderByFields(orderByFields), @@ -73,7 +73,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = AminoAcidMutationsAction( minProportion = minProportion, orderByFields = getNonRandomizedOrderByFields(orderByFields), @@ -87,7 +87,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = DetailsAction( fields = fields, orderByFields = getNonRandomizedOrderByFields(orderByFields), @@ -100,7 +100,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = NucleotideInsertionsAction( orderByFields = getNonRandomizedOrderByFields(orderByFields), limit = limit, @@ -112,7 +112,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = AminoAcidInsertionsAction( orderByFields = getNonRandomizedOrderByFields(orderByFields), limit = limit, @@ -126,7 +126,7 @@ sealed class SiloAction( orderByFields: List = emptyList(), limit: Int? = null, offset: Int? = null, - ): SiloAction> = + ): SiloAction = SequenceAction( type = type, sequenceName = sequenceName, @@ -151,7 +151,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "Aggregated", - ) : SiloAction>(AggregationDataTypeReference(), cacheable = true) + ) : SiloAction(AggregationDataTypeReference(), cacheable = true) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class MutationsAction( @@ -161,7 +161,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "Mutations", - ) : SiloAction>(MutationDataTypeReference(), cacheable = true) + ) : SiloAction(MutationDataTypeReference(), cacheable = true) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class AminoAcidMutationsAction( @@ -171,7 +171,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "AminoAcidMutations", - ) : SiloAction>(AminoAcidMutationDataTypeReference(), cacheable = true) + ) : SiloAction(AminoAcidMutationDataTypeReference(), cacheable = true) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class DetailsAction( @@ -181,7 +181,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "Details", - ) : SiloAction>(DetailsDataTypeReference(), cacheable = false) + ) : SiloAction(DetailsDataTypeReference(), cacheable = false) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class NucleotideInsertionsAction( @@ -190,7 +190,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "Insertions", - ) : SiloAction>(InsertionDataTypeReference(), cacheable = true) + ) : SiloAction(InsertionDataTypeReference(), cacheable = true) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class AminoAcidInsertionsAction( @@ -199,7 +199,7 @@ sealed class SiloAction( override val limit: Int? = null, override val offset: Int? = null, val type: String = "AminoAcidInsertions", - ) : SiloAction>(InsertionDataTypeReference(), cacheable = true) + ) : SiloAction(InsertionDataTypeReference(), cacheable = true) @JsonInclude(JsonInclude.Include.NON_EMPTY) private data class SequenceAction( @@ -209,7 +209,7 @@ sealed class SiloAction( override val offset: Int? = null, val type: SequenceType, val sequenceName: String, - ) : SiloAction>(SequenceDataTypeReference(), cacheable = false) + ) : SiloAction(SequenceDataTypeReference(), cacheable = false) } sealed class SiloFilterExpression(val type: String) diff --git a/lapis2/src/test/kotlin/org/genspectrum/lapis/model/SiloQueryModelTest.kt b/lapis2/src/test/kotlin/org/genspectrum/lapis/model/SiloQueryModelTest.kt index 44692f450..d53dfab1f 100644 --- a/lapis2/src/test/kotlin/org/genspectrum/lapis/model/SiloQueryModelTest.kt +++ b/lapis2/src/test/kotlin/org/genspectrum/lapis/model/SiloQueryModelTest.kt @@ -89,7 +89,7 @@ class SiloQueryModelTest { @Test fun `computeNucleotideMutationProportions calls the SILO client with a mutations action`() { - every { siloClientMock.sendQuery(any>>()) } returns emptyList() + every { siloClientMock.sendQuery(any>()) } returns emptyList() every { siloFilterExpressionMapperMock.map(any()) } returns True every { referenceGenomeSchemaMock.isSingleSegmented() } returns true @@ -106,7 +106,7 @@ class SiloQueryModelTest { @Test fun `computeNucleotideMutationProportions ignores the segmentName if singleSegmentedSequenceFeature is enabled`() { - every { siloClientMock.sendQuery(any>>()) } returns listOf(someMutationData) + every { siloClientMock.sendQuery(any>()) } returns listOf(someMutationData) every { siloFilterExpressionMapperMock.map(any()) } returns True every { referenceGenomeSchemaMock.isSingleSegmented() } returns true @@ -128,7 +128,7 @@ class SiloQueryModelTest { @Test fun `computeNucleotideMutationProportions includes segmentName if singleSegmentedSequenceFeature is not enabled`() { - every { siloClientMock.sendQuery(any>>()) } returns listOf(someMutationData) + every { siloClientMock.sendQuery(any>()) } returns listOf(someMutationData) every { siloFilterExpressionMapperMock.map(any()) } returns True every { referenceGenomeSchemaMock.isSingleSegmented() } returns false @@ -150,7 +150,7 @@ class SiloQueryModelTest { @Test fun `computeAminoAcidMutationsProportions returns the sequenceName with the position`() { - every { siloClientMock.sendQuery(any>>()) } returns listOf(someMutationData) + every { siloClientMock.sendQuery(any>()) } returns listOf(someMutationData) every { siloFilterExpressionMapperMock.map(any()) } returns True val result = underTest.computeAminoAcidMutationProportions( @@ -171,7 +171,7 @@ class SiloQueryModelTest { @Test fun `getNucleotideInsertions ignores the field sequenceName if the nucleotide sequence has one segment`() { - every { siloClientMock.sendQuery(any>>()) } returns listOf(someInsertionData) + every { siloClientMock.sendQuery(any>()) } returns listOf(someInsertionData) every { siloFilterExpressionMapperMock.map(any()) } returns True every { referenceGenomeSchemaMock.isSingleSegmented() } returns true @@ -198,7 +198,7 @@ class SiloQueryModelTest { @Test fun `getAminoAcidInsertions returns the sequenceName with the position`() { - every { siloClientMock.sendQuery(any>>()) } returns listOf(someInsertionData) + every { siloClientMock.sendQuery(any>()) } returns listOf(someInsertionData) every { siloFilterExpressionMapperMock.map(any()) } returns True val result = underTest.getAminoAcidInsertions( diff --git a/lapis2/src/test/kotlin/org/genspectrum/lapis/silo/SiloClientTest.kt b/lapis2/src/test/kotlin/org/genspectrum/lapis/silo/SiloClientTest.kt index a99400d89..15e302feb 100644 --- a/lapis2/src/test/kotlin/org/genspectrum/lapis/silo/SiloClientTest.kt +++ b/lapis2/src/test/kotlin/org/genspectrum/lapis/silo/SiloClientTest.kt @@ -76,18 +76,10 @@ class SiloClientTest( response() .withContentType(MediaType.APPLICATION_JSON_UTF_8) .withBody( - """{ - "queryResult": [ - { - "count": 6, - "division": "Aargau" - }, - { - "count": 8, - "division": "Basel-Land" - } - ] - }""", + """ + {"count": 6,"division": "Aargau"} + {"count": 8,"division": "Basel-Land"} + """, ), ) @@ -107,35 +99,15 @@ class SiloClientTest( @ParameterizedTest @MethodSource("getMutationActions") - fun `given server returns mutations response then response can be deserialized`( - action: SiloAction>, - ) { + fun `given server returns mutations response then response can be deserialized`(action: SiloAction) { expectQueryRequestAndRespondWith( response() .withContentType(MediaType.APPLICATION_JSON_UTF_8) .withBody( - """{ - "queryResult": [ - { - "count": 51, - "mutation": "C3037T", - "mutationFrom": "C", - "mutationTo": "T", - "position": 3037, - "proportion": 1, - "sequenceName": "main" - }, - { - "count": 52, - "mutation": "C14408T", - "mutationFrom": "C", - "mutationTo": "T", - "position": 14408, - "proportion": 1, - "sequenceName": "main" - } - ] - }""", + """ +{"count": 51,"mutation": "C3037T","mutationFrom": "C","mutationTo": "T","position": 3037,"proportion": 1,"sequenceName": "main"} +{"count": 52,"mutation": "C14408T","mutationFrom": "C","mutationTo": "T","position": 14408,"proportion": 1,"sequenceName": "main"} + """, ), ) @@ -174,18 +146,10 @@ class SiloClientTest( response() .withContentType(MediaType.APPLICATION_JSON_UTF_8) .withBody( - """{ - "queryResult": [ - { - "primaryKey": "key1", - "someSequenceName": "ABCD" - }, - { - "primaryKey": "key2", - "someSequenceName": "DEFG" - } - ] - }""", + """ + {"primaryKey": "key1","someSequenceName": "ABCD"} + {"primaryKey": "key2","someSequenceName": "DEFG"} + """, ), ) @@ -211,24 +175,10 @@ class SiloClientTest( response() .withContentType(MediaType.APPLICATION_JSON_UTF_8) .withBody( - """{ - "queryResult": [ - { - "age": 50, - "country": "Switzerland", - "date": "2021-02-23", - "pango_lineage": "B.1.1.7", - "qc_value": 0.95 - }, - { - "age": 54, - "country": "Switzerland", - "date": "2021-03-19", - "pango_lineage": "B.1.1.7", - "qc_value": 0.94 - } - ] - }""", + """ +{ "age": 50, "country": "Switzerland", "date": "2021-02-23", "pango_lineage": "B.1.1.7", "qc_value": 0.95 } +{ "age": 54, "country": "Switzerland", "date": "2021-03-19", "pango_lineage": "B.1.1.7", "qc_value": 0.94 } + """, ), ) @@ -270,24 +220,10 @@ class SiloClientTest( response() .withContentType(MediaType.APPLICATION_JSON_UTF_8) .withBody( - """{ - "queryResult": [ - { - "count": 1, - "insertedSymbols": "SGE", - "position": 143, - "insertion": "ins_S:247:SGE", - "sequenceName": "S" - }, - { - "count": 2, - "insertedSymbols": "EPE", - "position": 214, - "insertion": "ins_S:214:EPE", - "sequenceName": "S" - } - ] - }""", + """ +{ "count": 1, "insertedSymbols": "SGE", "position": 143, "insertion": "ins_S:247:SGE", "sequenceName": "S" } +{ "count": 2, "insertedSymbols": "EPE", "position": 214, "insertion": "ins_S:214:EPE", "sequenceName": "S" } + """, ), ) @@ -358,7 +294,7 @@ class SiloClientTest( ) val exception = assertThrows { underTest.sendQuery(someQuery) } - assertThat(exception.message, containsString("value failed for JSON property")) + assertThat(exception.message, containsString("Could not parse response from silo")) } @Test @@ -405,7 +341,7 @@ class SiloClientTest( expectQueryRequestAndRespondWith( response() .withStatusCode(200) - .withBody("""{"queryResult": []}"""), + .withBody(""), Times.exactly(1), ) expectQueryRequestAndRespondWith( @@ -429,7 +365,7 @@ class SiloClientTest( expectQueryRequestAndRespondWith( response() .withStatusCode(200) - .withBody("""{"queryResult": []}"""), + .withBody(""), Times.once(), ) @@ -448,7 +384,7 @@ class SiloClientTest( response() .withStatusCode(200) .withHeader(DATA_VERSION_HEADER, dataVersionValue) - .withBody("""{"queryResult": []}"""), + .withBody(""), Times.once(), ) @@ -469,7 +405,7 @@ class SiloClientTest( expectQueryRequestAndRespondWith( response() .withStatusCode(200) - .withBody("""{"queryResult": []}"""), + .withBody(""), Times.once(), ) expectQueryRequestAndRespondWith( @@ -566,7 +502,7 @@ class SiloClientAndCacheInvalidatorTest( response() .withStatusCode(200) .withHeader(DATA_VERSION_HEADER, firstDataVersion) - .withBody("""{"queryResult": []}"""), + .withBody(""), Times.once(), )