Skip to content

Commit

Permalink
SSE Reconnect (#9)
Browse files Browse the repository at this point in the history
expose maxRetries for reconnection attempts, showRetryEvents
controller Ktor log level from config
  • Loading branch information
IdanAizikNissim authored Oct 25, 2024
1 parent 1ee18f4 commit fdc6e81
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 10 deletions.
5 changes: 4 additions & 1 deletion shared/src/commonMain/kotlin/io/pocketbase/ClientConfig.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.pocketbase

import io.pocketbase.http.LogLevel
import io.pocketbase.http.Protocol
import kotlin.time.Duration.Companion.seconds

Expand All @@ -8,7 +9,9 @@ data class ClientConfig(
val protocol: Protocol = Protocol.HTTP,
val port: Int?,
val lang: String = "en-US",
val reconnectionTime: Long = 30.seconds.inWholeMilliseconds,
val reconnectionTime: Long = 3.seconds.inWholeMilliseconds,
val maxReconnectionRetries: Long = Long.MAX_VALUE,
val logLevel: LogLevel = LogLevel.NONE,
) {
companion object {
fun from(url: String): ClientConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal class HttpClient(
protocol = config.protocol,
port = config.port,
lang = config.lang,
logLevel = config.logLevel,
authStore = authStore,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.pocketbase.http
import io.ktor.client.HttpClient
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.client.plugins.defaultRequest
import io.ktor.client.plugins.logging.LogLevel
import io.ktor.client.plugins.logging.Logging
import io.ktor.client.plugins.sse.SSE
import io.ktor.client.request.header
Expand All @@ -18,6 +17,7 @@ internal class HttpClientBuilder(
protocol: Protocol,
port: Int?,
lang: String,
logLevel: LogLevel,
authStore: AuthStore,
): HttpClient =
factory.create { config ->
Expand All @@ -29,7 +29,7 @@ internal class HttpClientBuilder(
}

install(Logging) {
level = LogLevel.ALL
level = logLevel.toKtorLogLevel()
}

defaultRequest {
Expand All @@ -47,3 +47,12 @@ internal class HttpClientBuilder(
}
}
}

internal fun LogLevel.toKtorLogLevel(): io.ktor.client.plugins.logging.LogLevel =
when (this) {
LogLevel.ALL -> io.ktor.client.plugins.logging.LogLevel.ALL
LogLevel.HEADERS -> io.ktor.client.plugins.logging.LogLevel.HEADERS
LogLevel.BODY -> io.ktor.client.plugins.logging.LogLevel.BODY
LogLevel.INFO -> io.ktor.client.plugins.logging.LogLevel.INFO
LogLevel.NONE -> io.ktor.client.plugins.logging.LogLevel.NONE
}
9 changes: 9 additions & 0 deletions shared/src/commonMain/kotlin/io/pocketbase/http/LogLevel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.pocketbase.http

enum class LogLevel {
ALL,
HEADERS,
BODY,
INFO,
NONE,
}
38 changes: 31 additions & 7 deletions shared/src/commonMain/kotlin/io/pocketbase/http/SSEClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.ktor.client.plugins.sse.sse
import io.ktor.sse.ServerSentEvent
import io.pocketbase.ClientConfig
import io.pocketbase.auth.AuthStore
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlin.time.Duration.Companion.milliseconds

Expand All @@ -13,6 +14,8 @@ internal class SSEClient(
private val authStore: AuthStore,
) {
private var client: HttpClient? = null
private var retryAttempts = 0

var clientId: String? = null
private set

Expand All @@ -29,22 +32,28 @@ internal class SSEClient(
protocol = config.protocol,
port = config.port,
lang = config.lang,
logLevel = config.logLevel,
authStore = authStore,
)

client?.sse(
path = url,
reconnectionTime = config.reconnectionTime.milliseconds,
showRetryEvents = true,
) {
incoming.collect { msg ->
when (msg.event) {
"PB_CONNECT" -> {
clientId = msg.id
Incoming.PBConnect(clientId)
if (msg.retry != null) {
reconnect(url, msg.retry ?: 0)
} else {
when (msg.event) {
"PB_CONNECT" -> {
clientId = msg.id
Incoming.PBConnect(clientId)
}
else -> Incoming.Message(msg)
}.let {
emit(it)
}
else -> Incoming.Message(msg)
}.let {
emit(it)
}
}
}
Expand All @@ -55,6 +64,21 @@ internal class SSEClient(
client = null
clientId = null
}

private suspend fun reconnect(
url: String,
retry: Long,
) {
if (retryAttempts > config.maxReconnectionRetries) {
disconnect()
return
}

delay(retry)
retryAttempts++

connect(url)
}
}

internal sealed interface Incoming {
Expand Down

0 comments on commit fdc6e81

Please sign in to comment.