Skip to content

Commit

Permalink
Initial support for websocket state changed events! (#1906)
Browse files Browse the repository at this point in the history
  • Loading branch information
JBassett authored Nov 11, 2021
1 parent 3d909c6 commit 70e441f
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package io.homeassistant.companion.android.controls

import android.os.Build
import android.os.Handler
import android.os.Looper
import android.service.controls.Control
import android.service.controls.ControlsProviderService
import android.service.controls.actions.ControlAction
import android.util.Log
import androidx.annotation.RequiresApi
import androidx.core.os.postDelayed
import io.homeassistant.companion.android.common.dagger.GraphComponentAccessor
import io.homeassistant.companion.android.common.data.integration.Entity
import io.homeassistant.companion.android.common.data.integration.IntegrationRepository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Flow
import java.util.function.Consumer
import javax.inject.Inject
Expand All @@ -32,31 +30,6 @@ class HaControlsProviderService : ControlsProviderService() {

private val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)

private val monitoredEntities = mutableListOf<String>()
private val handler = Handler(Looper.getMainLooper())
// This is the poor mans way to do this. We should really connect via websocket and update
// on events. But now we get updates every 5 seconds while on power menu.
private val refresh = object : Runnable {
override fun run() {
monitoredEntities.forEach { entityId ->
ioScope.launch {
try {
val entity = integrationRepository.getEntity(entityId)
val domain = entity.entityId.split(".")[0]
val control =
domainToHaControl[domain]?.createControl(applicationContext, entity)
updateSubscriber?.onNext(control)
} catch (e: Exception) {
Log.e(TAG, "Unable to get entity information", e)
}
}
}
handler.postDelayed(this, 5000)
}
}

private var updateSubscriber: Flow.Subscriber<in Control>? = null

private val domainToHaControl = mapOf(
"automation" to DefaultSwitchControl,
"camera" to null,
Expand Down Expand Up @@ -100,10 +73,10 @@ class HaControlsProviderService : ControlsProviderService() {
.forEach {
subscriber.onNext(it)
}
subscriber.onComplete()
} catch (e: Exception) {
Log.e(TAG, "Error getting list of entities", e)
}
subscriber.onComplete()
}
}
}
Expand All @@ -112,19 +85,43 @@ class HaControlsProviderService : ControlsProviderService() {
Log.d(TAG, "publisherFor $controlIds")
return Flow.Publisher { subscriber ->
subscriber.onSubscribe(object : Flow.Subscription {
var running = true
override fun request(n: Long) {
Log.d(TAG, "request $n")
updateSubscriber = subscriber
ioScope.launch {
val entityFlow = integrationRepository.getEntityUpdates()
// Load up initial values
// This should use the cached values that we should store in the DB.
// For now we'll use the rest API
controlIds.forEach {
val entity = integrationRepository.getEntity(it)
val domain = it.split(".")[0]
val control = domainToHaControl[domain]?.createControl(
applicationContext,
entity
)
subscriber.onNext(control)
}

// Listen for the state changed events.
entityFlow.takeWhile { running }.collect {
if (controlIds.contains(it.entityId)) {
val domain = it.entityId.split(".")[0]
val control = domainToHaControl[domain]?.createControl(
applicationContext,
it as Entity<Map<String, Any>>
)
subscriber.onNext(control)
}
}
}
}

override fun cancel() {
Log.d(TAG, "cancel")
updateSubscriber = null
handler.removeCallbacks(refresh)
running = false
}
})
monitoredEntities.addAll(controlIds)
handler.post(refresh)
}
}

Expand All @@ -139,25 +136,10 @@ class HaControlsProviderService : ControlsProviderService() {

var actionSuccess = false
if (haControl != null) {
runBlocking {
try {
actionSuccess = haControl.performAction(integrationRepository, action)

val entity = integrationRepository.getEntity(controlId)
updateSubscriber?.onNext(haControl.createControl(applicationContext, entity))
handler.postDelayed(750) {
// This is here because the state isn't aways instantly updated. This should
// cause us to update a second time rapidly to ensure we display the correct state
updateSubscriber?.onNext(
haControl.createControl(
applicationContext,
entity
)
)
}
} catch (e: Exception) {
Log.e(TAG, "Unable to control or get entity information", e)
}
try {
actionSuccess = haControl.performAction(integrationRepository, action)
} catch (e: Exception) {
Log.e(TAG, "Unable to control or get entity information", e)
}
}
if (actionSuccess) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.homeassistant.companion.android.common.data.integration

import io.homeassistant.companion.android.common.data.integration.impl.entities.RateLimitResponse
import kotlinx.coroutines.flow.Flow

interface IntegrationRepository {

Expand Down Expand Up @@ -47,6 +48,7 @@ interface IntegrationRepository {

suspend fun getEntities(): List<Entity<Any>>
suspend fun getEntity(entityId: String): Entity<Map<String, Any>>
suspend fun getEntityUpdates(): Flow<Entity<*>>

suspend fun callService(domain: String, service: String, serviceData: HashMap<String, Any>)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.url.UrlRepository
import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.json.JSONArray
import java.util.regex.Pattern
Expand Down Expand Up @@ -462,6 +464,19 @@ class IntegrationRepositoryImpl @Inject constructor(
)
}

override suspend fun getEntityUpdates(): Flow<Entity<*>> {
return webSocketRepository.getStateChanges().map {
Entity(
it.newState.entityId,
it.newState.state,
it.newState.attributes,
it.newState.lastChanged,
it.newState.lastUpdated,
it.newState.context
)
}
}

private suspend fun canRegisterEntityCategoryStateClass(): Boolean {
val version = getHomeAssistantVersion()
val matches = VERSION_PATTERN.matcher(version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.integration.impl.entities.ServiceCallRequest
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent
import kotlinx.coroutines.flow.Flow

interface WebSocketRepository {
suspend fun sendPing(): Boolean
Expand All @@ -12,4 +14,5 @@ interface WebSocketRepository {
suspend fun getServices(): List<DomainResponse>
suspend fun getPanels(): List<String>
suspend fun callService(request: ServiceCallRequest)
suspend fun getStateChanges(): Flow<StateChangedEvent>
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities.
import io.homeassistant.companion.android.common.data.url.UrlRepository
import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository
import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.EventResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.SocketResponse
import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeout
Expand All @@ -44,10 +54,13 @@ class WebSocketRepositoryImpl @Inject constructor(
private val mapper = jacksonObjectMapper()
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
private val responseCallbackJobs = mutableMapOf<Long, CancellableContinuation<SocketResponse>>()
private val subscriptionCallbacks = mutableMapOf<Long, (Boolean) -> Unit>()
private val id = AtomicLong(1)
private var connection: WebSocket? = null
private var connected = Job()
private var stateChangedFlow: SharedFlow<StateChangedEvent>? = null

@ExperimentalCoroutinesApi
private var producerScope: ProducerScope<StateChangedEvent>? = null

override suspend fun sendPing(): Boolean {
val socketResponse = sendMessage(
Expand Down Expand Up @@ -107,6 +120,38 @@ class WebSocketRepositoryImpl @Inject constructor(
TODO("Not yet implemented")
}

@ExperimentalCoroutinesApi
override suspend fun getStateChanges(): Flow<StateChangedEvent> {
if (stateChangedFlow == null) {

val response = sendMessage(
mapOf(
"type" to "subscribe_events",
"event_type" to "state_changed"
)
)

stateChangedFlow = callbackFlow {
producerScope = this
awaitClose {
Log.d(TAG, "Unsubscribing from state_changes")
ioScope.launch {
sendMessage(
mapOf(
"type" to "unsubscribe_events",
"subscription" to response.id
)
)
}
producerScope = null
stateChangedFlow = null
}
}.shareIn(ioScope, SharingStarted.WhileSubscribed())
}

return stateChangedFlow!!
}

/**
* This method will
*/
Expand Down Expand Up @@ -181,6 +226,15 @@ class WebSocketRepositoryImpl @Inject constructor(
responseCallbackJobs.remove(id)
}

@ExperimentalCoroutinesApi
private suspend fun handleEvent(response: SocketResponse) {
val eventResponse = mapper.convertValue(
response.event,
object : TypeReference<EventResponse>() {}
)
producerScope?.send(eventResponse.data)
}

override fun onOpen(webSocket: WebSocket, response: Response) {
Log.d(TAG, "Websocket: onOpen")
}
Expand All @@ -196,6 +250,7 @@ class WebSocketRepositoryImpl @Inject constructor(
"auth_ok" -> handleAuthComplete(true)
"auth_invalid" -> handleAuthComplete(false)
"pong", "result" -> handleMessage(message)
"event" -> handleEvent(message)
else -> Log.d(TAG, "Unknown message type: $text")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.homeassistant.companion.android.common.data.websocket.impl.entities

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

@JsonIgnoreProperties(ignoreUnknown = true)
data class EventResponse(
val eventType: String,
val timeFired: String,
val data: StateChangedEvent
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ data class SocketResponse(
val id: Long?,
val type: String,
val success: Boolean?,
val result: JsonNode?
val result: JsonNode?,
val event: JsonNode?
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.homeassistant.companion.android.common.data.websocket.impl.entities

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import io.homeassistant.companion.android.common.data.integration.Entity

@JsonIgnoreProperties(ignoreUnknown = true)
data class StateChangedEvent(
val entityId: String,
val oldState: Entity<*>,
val newState: Entity<*>
)

0 comments on commit 70e441f

Please sign in to comment.