Skip to content

Commit

Permalink
feat(lapis2)!: read data from SILO as NDJSON
Browse files Browse the repository at this point in the history
Refs: #741
  • Loading branch information
fengelniederhammer committed Apr 18, 2024
1 parent c150d92 commit 767b19d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 162 deletions.
83 changes: 39 additions & 44 deletions lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ import java.net.http.HttpResponse.BodyHandlers

private val log = KotlinLogging.logger {}

const val SILO_RESPONSE_MAX_LOG_LENGTH = 10_000

@Component
class SiloClient(
private val cachedSiloClient: CachedSiloClient,
private val dataVersion: DataVersion,
private val requestContext: RequestContext,
) {
fun <ResponseType> sendQuery(query: SiloQuery<ResponseType>): ResponseType {
fun <ResponseType> sendQuery(query: SiloQuery<ResponseType>): List<ResponseType> {
val result = cachedSiloClient.sendQuery(query)
dataVersion.dataVersion = result.dataVersion

Expand Down Expand Up @@ -71,32 +69,44 @@ class CachedSiloClient(

log.info { "Calling SILO: $queryJson" }

val response = send(URI("$siloUrl/query")) {
val response = send(
uri = URI("$siloUrl/query"),
bodyHandler = BodyHandlers.ofLines(),
tryToReadSiloErrorFromBody = { tryToReadSiloErrorFromString(it.findFirst().orElse("")) },
) {
it.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.POST(HttpRequest.BodyPublishers.ofString(queryJson))
}

try {
return WithDataVersion(
queryResult = objectMapper.readValue(response.body(), query.action.typeReference).queryResult,
dataVersion = getDataVersion(response),
)
} catch (exception: Exception) {
val message = "Could not parse response from silo: " + exception::class.toString() + " " + exception.message
throw RuntimeException(message, exception)
}
return WithDataVersion(
queryResult = response.body()
.filter { it.isNotBlank() }
.map {
try {
objectMapper.readValue(it, query.action.typeReference)
} catch (exception: Exception) {
val message = "Could not parse response from silo: " +
exception::class.toString() + " " + exception.message
throw RuntimeException(message, exception)
}
}
.toList(),
dataVersion = getDataVersion(response),
)
}

fun callInfo(): InfoData {
val response = send(URI("$siloUrl/info")) { it.GET() }
val response = send(URI("$siloUrl/info"), BodyHandlers.ofString(), ::tryToReadSiloErrorFromString) { it.GET() }

return InfoData(getDataVersion(response))
}

private fun send(
private fun <ResponseBodyType> send(
uri: URI,
bodyHandler: HttpResponse.BodyHandler<ResponseBodyType>,
tryToReadSiloErrorFromBody: (ResponseBodyType) -> SiloErrorResponse,
buildRequest: (HttpRequest.Builder) -> Unit,
): HttpResponse<String> {
): HttpResponse<ResponseBodyType> {
val request = HttpRequest.newBuilder(uri)
.apply(buildRequest)
.apply {
Expand All @@ -107,7 +117,7 @@ class CachedSiloClient(
.build()

val response = try {
httpClient.send(request, BodyHandlers.ofString())
httpClient.send(request, bodyHandler)
} catch (exception: Exception) {
val message = "Could not connect to silo: " + exception::class.toString() + " " + exception.message
throw RuntimeException(message, exception)
Expand All @@ -116,34 +126,18 @@ class CachedSiloClient(
if (!uri.toString().endsWith("info")) {
log.info { "Response from SILO: ${response.statusCode()}" }
}
log.debug {
val body = response.body()
val truncationPostfix = when {
body.length > SILO_RESPONSE_MAX_LOG_LENGTH -> "(...truncated)"
else -> ""
}
"Data from SILO: ${body.take(SILO_RESPONSE_MAX_LOG_LENGTH)}$truncationPostfix"
}

if (response.statusCode() != 200) {
val siloErrorResponse = tryToReadSiloError(response)
val siloErrorResponse = tryToReadSiloErrorFromBody(response.body())

if (response.statusCode() == 503) {
val message = siloErrorResponse?.message ?: "Unknown reason."
val message = siloErrorResponse.message
throw SiloUnavailableException(
"SILO is currently unavailable: $message",
response.headers().firstValue("retry-after").orElse(null),
)
}

if (siloErrorResponse == null) {
throw SiloException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Internal Server Error",
"Unexpected error from SILO: ${response.body()}",
)
}

throw SiloException(
response.statusCode(),
siloErrorResponse.error,
Expand All @@ -154,15 +148,20 @@ class CachedSiloClient(
return response
}

private fun tryToReadSiloError(response: HttpResponse<String>) =
private fun tryToReadSiloErrorFromString(responseBody: String) =
try {
objectMapper.readValue<SiloErrorResponse>(response.body())
objectMapper.readValue<SiloErrorResponse>(responseBody)
} catch (e: Exception) {
log.error { "Failed to deserialize error response from SILO: $e" }
null

throw SiloException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Internal Server Error",
"Unexpected error from SILO: $responseBody",
)
}

private fun getDataVersion(response: HttpResponse<String>): String {
private fun getDataVersion(response: HttpResponse<*>): String {
return response.headers().firstValue("data-version").orElse("")
}
}
Expand All @@ -171,13 +170,9 @@ class SiloException(val statusCode: Int, val title: String, override val message

class SiloUnavailableException(override val message: String, val retryAfter: String?) : Exception(message)

data class SiloQueryResponse<ResponseType>(
val queryResult: ResponseType,
)

data class WithDataVersion<ResponseType>(
val dataVersion: String,
val queryResult: ResponseType,
val queryResult: List<ResponseType>,
)

data class SiloErrorResponse(val error: String, val message: String)
42 changes: 21 additions & 21 deletions lapis2/src/main/kotlin/org/genspectrum/lapis/silo/SiloQuery.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import java.time.LocalDate

data class SiloQuery<ResponseType>(val action: SiloAction<ResponseType>, val filterExpression: SiloFilterExpression)

class AggregationDataTypeReference : TypeReference<SiloQueryResponse<List<AggregationData>>>()
class AggregationDataTypeReference : TypeReference<AggregationData>()

class MutationDataTypeReference : TypeReference<SiloQueryResponse<List<MutationData>>>()
class MutationDataTypeReference : TypeReference<MutationData>()

class AminoAcidMutationDataTypeReference : TypeReference<SiloQueryResponse<List<MutationData>>>()
class AminoAcidMutationDataTypeReference : TypeReference<MutationData>()

class DetailsDataTypeReference : TypeReference<SiloQueryResponse<List<DetailsData>>>()
class DetailsDataTypeReference : TypeReference<DetailsData>()

class InsertionDataTypeReference : TypeReference<SiloQueryResponse<List<InsertionData>>>()
class InsertionDataTypeReference : TypeReference<InsertionData>()

class SequenceDataTypeReference : TypeReference<SiloQueryResponse<List<SequenceData>>>()
class SequenceDataTypeReference : TypeReference<SequenceData>()

interface CommonActionFields {
val orderByFields: List<OrderByField>
Expand All @@ -36,7 +36,7 @@ interface CommonActionFields {
const val ORDER_BY_RANDOM_FIELD_NAME = "random"

sealed class SiloAction<ResponseType>(
@JsonIgnore val typeReference: TypeReference<SiloQueryResponse<ResponseType>>,
@JsonIgnore val typeReference: TypeReference<ResponseType>,
@JsonIgnore val cacheable: Boolean,
) : CommonActionFields {
companion object {
Expand All @@ -45,7 +45,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<AggregationData>> =
): SiloAction<AggregationData> =
AggregatedAction(
groupByFields = groupByFields,
orderByFields = getNonRandomizedOrderByFields(orderByFields),
Expand All @@ -59,7 +59,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<MutationData>> =
): SiloAction<MutationData> =
MutationsAction(
minProportion = minProportion,
orderByFields = getNonRandomizedOrderByFields(orderByFields),
Expand All @@ -73,7 +73,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<MutationData>> =
): SiloAction<MutationData> =
AminoAcidMutationsAction(
minProportion = minProportion,
orderByFields = getNonRandomizedOrderByFields(orderByFields),
Expand All @@ -87,7 +87,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<DetailsData>> =
): SiloAction<DetailsData> =
DetailsAction(
fields = fields,
orderByFields = getNonRandomizedOrderByFields(orderByFields),
Expand All @@ -100,7 +100,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<InsertionData>> =
): SiloAction<InsertionData> =
NucleotideInsertionsAction(
orderByFields = getNonRandomizedOrderByFields(orderByFields),
limit = limit,
Expand All @@ -112,7 +112,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<InsertionData>> =
): SiloAction<InsertionData> =
AminoAcidInsertionsAction(
orderByFields = getNonRandomizedOrderByFields(orderByFields),
limit = limit,
Expand All @@ -126,7 +126,7 @@ sealed class SiloAction<ResponseType>(
orderByFields: List<OrderByField> = emptyList(),
limit: Int? = null,
offset: Int? = null,
): SiloAction<List<SequenceData>> =
): SiloAction<SequenceData> =
SequenceAction(
type = type,
sequenceName = sequenceName,
Expand All @@ -151,7 +151,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "Aggregated",
) : SiloAction<List<AggregationData>>(AggregationDataTypeReference(), cacheable = true)
) : SiloAction<AggregationData>(AggregationDataTypeReference(), cacheable = true)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class MutationsAction(
Expand All @@ -161,7 +161,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "Mutations",
) : SiloAction<List<MutationData>>(MutationDataTypeReference(), cacheable = true)
) : SiloAction<MutationData>(MutationDataTypeReference(), cacheable = true)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class AminoAcidMutationsAction(
Expand All @@ -171,7 +171,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "AminoAcidMutations",
) : SiloAction<List<MutationData>>(AminoAcidMutationDataTypeReference(), cacheable = true)
) : SiloAction<MutationData>(AminoAcidMutationDataTypeReference(), cacheable = true)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class DetailsAction(
Expand All @@ -181,7 +181,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "Details",
) : SiloAction<List<DetailsData>>(DetailsDataTypeReference(), cacheable = false)
) : SiloAction<DetailsData>(DetailsDataTypeReference(), cacheable = false)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class NucleotideInsertionsAction(
Expand All @@ -190,7 +190,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "Insertions",
) : SiloAction<List<InsertionData>>(InsertionDataTypeReference(), cacheable = true)
) : SiloAction<InsertionData>(InsertionDataTypeReference(), cacheable = true)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class AminoAcidInsertionsAction(
Expand All @@ -199,7 +199,7 @@ sealed class SiloAction<ResponseType>(
override val limit: Int? = null,
override val offset: Int? = null,
val type: String = "AminoAcidInsertions",
) : SiloAction<List<InsertionData>>(InsertionDataTypeReference(), cacheable = true)
) : SiloAction<InsertionData>(InsertionDataTypeReference(), cacheable = true)

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private data class SequenceAction(
Expand All @@ -209,7 +209,7 @@ sealed class SiloAction<ResponseType>(
override val offset: Int? = null,
val type: SequenceType,
val sequenceName: String,
) : SiloAction<List<SequenceData>>(SequenceDataTypeReference(), cacheable = false)
) : SiloAction<SequenceData>(SequenceDataTypeReference(), cacheable = false)
}

sealed class SiloFilterExpression(val type: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SiloQueryModelTest {

@Test
fun `computeNucleotideMutationProportions calls the SILO client with a mutations action`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<MutationData>>>()) } returns emptyList()
every { siloClientMock.sendQuery(any<SiloQuery<MutationData>>()) } returns emptyList()
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True
every { referenceGenomeSchemaMock.isSingleSegmented() } returns true

Expand All @@ -106,7 +106,7 @@ class SiloQueryModelTest {

@Test
fun `computeNucleotideMutationProportions ignores the segmentName if singleSegmentedSequenceFeature is enabled`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<MutationData>>>()) } returns listOf(someMutationData)
every { siloClientMock.sendQuery(any<SiloQuery<MutationData>>()) } returns listOf(someMutationData)
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True
every { referenceGenomeSchemaMock.isSingleSegmented() } returns true

Expand All @@ -128,7 +128,7 @@ class SiloQueryModelTest {

@Test
fun `computeNucleotideMutationProportions includes segmentName if singleSegmentedSequenceFeature is not enabled`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<MutationData>>>()) } returns listOf(someMutationData)
every { siloClientMock.sendQuery(any<SiloQuery<MutationData>>()) } returns listOf(someMutationData)
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True
every { referenceGenomeSchemaMock.isSingleSegmented() } returns false

Expand All @@ -150,7 +150,7 @@ class SiloQueryModelTest {

@Test
fun `computeAminoAcidMutationsProportions returns the sequenceName with the position`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<MutationData>>>()) } returns listOf(someMutationData)
every { siloClientMock.sendQuery(any<SiloQuery<MutationData>>()) } returns listOf(someMutationData)
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True

val result = underTest.computeAminoAcidMutationProportions(
Expand All @@ -171,7 +171,7 @@ class SiloQueryModelTest {

@Test
fun `getNucleotideInsertions ignores the field sequenceName if the nucleotide sequence has one segment`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<InsertionData>>>()) } returns listOf(someInsertionData)
every { siloClientMock.sendQuery(any<SiloQuery<InsertionData>>()) } returns listOf(someInsertionData)
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True
every { referenceGenomeSchemaMock.isSingleSegmented() } returns true

Expand All @@ -198,7 +198,7 @@ class SiloQueryModelTest {

@Test
fun `getAminoAcidInsertions returns the sequenceName with the position`() {
every { siloClientMock.sendQuery(any<SiloQuery<List<InsertionData>>>()) } returns listOf(someInsertionData)
every { siloClientMock.sendQuery(any<SiloQuery<InsertionData>>()) } returns listOf(someInsertionData)
every { siloFilterExpressionMapperMock.map(any<CommonSequenceFilters>()) } returns True

val result = underTest.getAminoAcidInsertions(
Expand Down
Loading

0 comments on commit 767b19d

Please sign in to comment.