Skip to content

Commit

Permalink
Support websocket message coalescing (#2829)
Browse files Browse the repository at this point in the history
- Add support for the new feature where core might combine multiple responses in to one single websocket message if it's faster. This might return a JSON array of objects instead of a single object.
 - Adjust logging for websocket, no longer prints entire result on non-debug builds.
  • Loading branch information
jpelgrom authored Aug 30, 2022
1 parent 3908a07 commit 84aa445
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.homeassistant.companion.android.common.data

import java.util.regex.Pattern

data class HomeAssistantVersion(
val year: Int,
val month: Int,
val release: Int
) {

companion object {
private val VERSION_PATTERN = Pattern.compile("([0-9]{4})\\.([0-9]{1,2})\\.([0-9]{1,2}).*")

fun fromString(versionString: String): HomeAssistantVersion? {
val matches = VERSION_PATTERN.matcher(versionString)
return if (matches.find() && matches.matches()) {
val coreYear = matches.group(1)?.toIntOrNull() ?: 0
val coreMonth = matches.group(2)?.toIntOrNull() ?: 0
val coreRelease = matches.group(3)?.toIntOrNull() ?: 0
HomeAssistantVersion(coreYear, coreMonth, coreRelease)
} else { // Invalid version
null
}
}
}

fun isAtLeast(minYear: Int, minMonth: Int, minRelease: Int = 0): Boolean =
year > minYear || (year == minYear && (month > minMonth || (month == minMonth && release >= minRelease)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.homeassistant.companion.android.common.data.integration.impl

import android.util.Log
import io.homeassistant.companion.android.common.BuildConfig
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
import io.homeassistant.companion.android.common.data.LocalStorage
import io.homeassistant.companion.android.common.data.authentication.AuthenticationRepository
import io.homeassistant.companion.android.common.data.integration.DeviceRegistration
Expand Down Expand Up @@ -31,7 +32,6 @@ import kotlinx.coroutines.flow.map
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.json.JSONArray
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import javax.inject.Inject
import javax.inject.Named

Expand Down Expand Up @@ -77,8 +77,6 @@ class IntegrationRepositoryImpl @Inject constructor(
private const val PREF_SEC_WARNING_NEXT = "sec_warning_last"
private const val TAG = "IntegrationRepository"
private const val RATE_LIMIT_URL = BuildConfig.RATE_LIMIT_URL

private val VERSION_PATTERN = Pattern.compile("([0-9]{4})\\.([0-9]{1,2})\\.([0-9]{1,2}).*")
}

override suspend fun registerDevice(deviceRegistration: DeviceRegistration) {
Expand Down Expand Up @@ -492,17 +490,8 @@ class IntegrationRepositoryImpl @Inject constructor(
): Boolean {
if (!isRegistered()) return false

val version = getHomeAssistantVersion()
val matches = VERSION_PATTERN.matcher(version)
var result = false
if (matches.find() && matches.matches()) {
val coreYear = matches.group(1)?.toIntOrNull() ?: 0
val coreMonth = matches.group(2)?.toIntOrNull() ?: 0
val coreRelease = matches.group(3)?.toIntOrNull() ?: 0
result =
coreYear > year || (coreYear == year && (coreMonth > month || (coreMonth == month && coreRelease >= release)))
}
return result
val version = HomeAssistantVersion.fromString(getHomeAssistantVersion())
return version?.isAtLeast(year, month, release) ?: false
}

override suspend fun getConfig(): GetConfigResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.fasterxml.jackson.module.kotlin.contains
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.homeassistant.companion.android.common.BuildConfig
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
import io.homeassistant.companion.android.common.data.authentication.AuthenticationRepository
import io.homeassistant.companion.android.common.data.authentication.AuthorizationException
import io.homeassistant.companion.android.common.data.integration.ServiceData
Expand Down Expand Up @@ -82,6 +84,7 @@ class WebSocketRepositoryImpl @Inject constructor(
private val id = AtomicLong(1)
private var connection: WebSocket? = null
private var connectionState: WebSocketState? = null
private var connectionHaVersion: HomeAssistantVersion? = null
private val connectedMutex = Mutex()
private var connected = CompletableDeferred<Boolean>()
private val eventSubscriptionMutex = Mutex()
Expand Down Expand Up @@ -327,7 +330,23 @@ class WebSocketRepositoryImpl @Inject constructor(
// Wait up to 30 seconds for auth response
return true == withTimeoutOrNull(30000) {
return@withTimeoutOrNull try {
connected.await()
val didConnect = connected.await()
if (didConnect && connectionHaVersion?.isAtLeast(2022, 9) == true) {
connection?.let {
val supportedFeaturesMessage = mapOf(
"type" to "supported_features",
"id" to id.getAndIncrement(),
"features" to mapOf(
"coalesce_messages" to 1
)
)
Log.d(TAG, "Sending message ${supportedFeaturesMessage["id"]}: $supportedFeaturesMessage")
it.send(
mapper.writeValueAsString(supportedFeaturesMessage)
)
}
}
didConnect
} catch (e: Exception) {
Log.e(TAG, "Unable to authenticate", e)
false
Expand Down Expand Up @@ -363,7 +382,8 @@ class WebSocketRepositoryImpl @Inject constructor(
private inline fun <reified T> mapResponse(response: SocketResponse?): T? =
if (response?.result != null) mapper.convertValue(response.result) else null

private fun handleAuthComplete(successful: Boolean) {
private fun handleAuthComplete(successful: Boolean, haVersion: String?) {
connectionHaVersion = haVersion?.let { HomeAssistantVersion.fromString(it) }
if (successful) {
connectionState = WebSocketState.ACTIVE
connected.complete(true)
Expand Down Expand Up @@ -436,6 +456,7 @@ class WebSocketRepositoryImpl @Inject constructor(
connectedMutex.withLock {
connected = CompletableDeferred()
connection = null
connectionHaVersion = null
if (connectionState != WebSocketState.CLOSED_AUTH)
connectionState = WebSocketState.CLOSED_OTHER
}
Expand Down Expand Up @@ -477,18 +498,26 @@ class WebSocketRepositoryImpl @Inject constructor(
}

override fun onMessage(webSocket: WebSocket, text: String) {
Log.d(TAG, "Websocket: onMessage (text)")
val message: SocketResponse = mapper.readValue(text)
Log.d(TAG, "Message number ${message.id} received: $text")
Log.d(TAG, "Websocket: onMessage (${if (BuildConfig.DEBUG) "text: $text" else "text"})")
val textTree = mapper.readTree(text)
val messages: List<SocketResponse> = if (textTree.isArray) {
textTree.elements().asSequence().toList().map { mapper.convertValue(it) }
} else {
listOf(mapper.readValue(text))
}

ioScope.launch {
when (message.type) {
"auth_required" -> Log.d(TAG, "Auth Requested")
"auth_ok" -> handleAuthComplete(true)
"auth_invalid" -> handleAuthComplete(false)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: $text")
messages.forEach { message ->
Log.d(TAG, "Message number ${message.id} received")

ioScope.launch {
when (message.type) {
"auth_required" -> Log.d(TAG, "Auth Requested")
"auth_ok" -> handleAuthComplete(true, message.haVersion)
"auth_invalid" -> handleAuthComplete(false, message.haVersion)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: ${message.type}")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ data class SocketResponse(
val type: String,
val success: Boolean?,
val result: JsonNode?,
val event: JsonNode?
val event: JsonNode?,
val haVersion: String?
)

0 comments on commit 84aa445

Please sign in to comment.