diff --git a/lib/src/main/java/tech/relaycorp/awaladroid/messaging/ReceiveMessages.kt b/lib/src/main/java/tech/relaycorp/awaladroid/messaging/ReceiveMessages.kt index 8455906b..0076ef43 100644 --- a/lib/src/main/java/tech/relaycorp/awaladroid/messaging/ReceiveMessages.kt +++ b/lib/src/main/java/tech/relaycorp/awaladroid/messaging/ReceiveMessages.kt @@ -1,10 +1,10 @@ package tech.relaycorp.awaladroid.messaging -import java.util.logging.Level import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.onCompletion import tech.relaycorp.awaladroid.Awala import tech.relaycorp.awaladroid.GatewayException import tech.relaycorp.awaladroid.GatewayProtocolException @@ -23,6 +23,7 @@ import tech.relaycorp.relaynet.bindings.pdc.StreamingMode import tech.relaycorp.relaynet.messages.InvalidMessageException import tech.relaycorp.relaynet.ramf.RAMFException import tech.relaycorp.relaynet.wrappers.cms.EnvelopedDataException +import java.util.logging.Level internal class ReceiveMessages( private val pdcClientBuilder: () -> PDCClient = { PoWebClient.initLocal(Awala.POWEB_PORT) } @@ -36,16 +37,19 @@ internal class ReceiveMessages( fun receive(): Flow = getNonceSigners() .flatMapLatest { nonceSigners -> - pdcClientBuilder().use { - try { - collectParcels(it, nonceSigners) - } catch (exp: ServerException) { - throw ReceiveMessageException("Server error", exp) - } catch (exp: ClientBindingException) { - throw GatewayProtocolException("Client error", exp) - } catch (exp: NonceSignerException) { - throw GatewayProtocolException("Client signing error", exp) - } + val pdcClient = pdcClientBuilder() + try { + collectParcels(pdcClient, nonceSigners) + .onCompletion { + @Suppress("BlockingMethodInNonBlockingContext") + pdcClient.close() + } + } catch (exp: ServerException) { + throw ReceiveMessageException("Server error", exp) + } catch (exp: ClientBindingException) { + throw GatewayProtocolException("Client error", exp) + } catch (exp: NonceSignerException) { + throw GatewayProtocolException("Client signing error", exp) } }