diff --git a/app/src/main/java/io/homeassistant/companion/android/controls/HaControlsProviderService.kt b/app/src/main/java/io/homeassistant/companion/android/controls/HaControlsProviderService.kt index 8ed0001d9ae..57d433fbff9 100644 --- a/app/src/main/java/io/homeassistant/companion/android/controls/HaControlsProviderService.kt +++ b/app/src/main/java/io/homeassistant/companion/android/controls/HaControlsProviderService.kt @@ -1,21 +1,19 @@ package io.homeassistant.companion.android.controls import android.os.Build -import android.os.Handler -import android.os.Looper import android.service.controls.Control import android.service.controls.ControlsProviderService import android.service.controls.actions.ControlAction import android.util.Log import androidx.annotation.RequiresApi -import androidx.core.os.postDelayed import io.homeassistant.companion.android.common.dagger.GraphComponentAccessor import io.homeassistant.companion.android.common.data.integration.Entity import io.homeassistant.companion.android.common.data.integration.IntegrationRepository import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import java.util.concurrent.Flow import java.util.function.Consumer import javax.inject.Inject @@ -32,31 +30,6 @@ class HaControlsProviderService : ControlsProviderService() { private val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO) - private val monitoredEntities = mutableListOf() - private val handler = Handler(Looper.getMainLooper()) - // This is the poor mans way to do this. We should really connect via websocket and update - // on events. But now we get updates every 5 seconds while on power menu. - private val refresh = object : Runnable { - override fun run() { - monitoredEntities.forEach { entityId -> - ioScope.launch { - try { - val entity = integrationRepository.getEntity(entityId) - val domain = entity.entityId.split(".")[0] - val control = - domainToHaControl[domain]?.createControl(applicationContext, entity) - updateSubscriber?.onNext(control) - } catch (e: Exception) { - Log.e(TAG, "Unable to get entity information", e) - } - } - } - handler.postDelayed(this, 5000) - } - } - - private var updateSubscriber: Flow.Subscriber? = null - private val domainToHaControl = mapOf( "automation" to DefaultSwitchControl, "camera" to null, @@ -100,10 +73,10 @@ class HaControlsProviderService : ControlsProviderService() { .forEach { subscriber.onNext(it) } - subscriber.onComplete() } catch (e: Exception) { Log.e(TAG, "Error getting list of entities", e) } + subscriber.onComplete() } } } @@ -112,19 +85,43 @@ class HaControlsProviderService : ControlsProviderService() { Log.d(TAG, "publisherFor $controlIds") return Flow.Publisher { subscriber -> subscriber.onSubscribe(object : Flow.Subscription { + var running = true override fun request(n: Long) { Log.d(TAG, "request $n") - updateSubscriber = subscriber + ioScope.launch { + val entityFlow = integrationRepository.getEntityUpdates() + // Load up initial values + // This should use the cached values that we should store in the DB. + // For now we'll use the rest API + controlIds.forEach { + val entity = integrationRepository.getEntity(it) + val domain = it.split(".")[0] + val control = domainToHaControl[domain]?.createControl( + applicationContext, + entity + ) + subscriber.onNext(control) + } + + // Listen for the state changed events. + entityFlow.takeWhile { running }.collect { + if (controlIds.contains(it.entityId)) { + val domain = it.entityId.split(".")[0] + val control = domainToHaControl[domain]?.createControl( + applicationContext, + it as Entity> + ) + subscriber.onNext(control) + } + } + } } override fun cancel() { Log.d(TAG, "cancel") - updateSubscriber = null - handler.removeCallbacks(refresh) + running = false } }) - monitoredEntities.addAll(controlIds) - handler.post(refresh) } } @@ -139,25 +136,10 @@ class HaControlsProviderService : ControlsProviderService() { var actionSuccess = false if (haControl != null) { - runBlocking { - try { - actionSuccess = haControl.performAction(integrationRepository, action) - - val entity = integrationRepository.getEntity(controlId) - updateSubscriber?.onNext(haControl.createControl(applicationContext, entity)) - handler.postDelayed(750) { - // This is here because the state isn't aways instantly updated. This should - // cause us to update a second time rapidly to ensure we display the correct state - updateSubscriber?.onNext( - haControl.createControl( - applicationContext, - entity - ) - ) - } - } catch (e: Exception) { - Log.e(TAG, "Unable to control or get entity information", e) - } + try { + actionSuccess = haControl.performAction(integrationRepository, action) + } catch (e: Exception) { + Log.e(TAG, "Unable to control or get entity information", e) } } if (actionSuccess) { diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/integration/IntegrationRepository.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/integration/IntegrationRepository.kt index ad7ff7356df..e82b6687e5f 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/integration/IntegrationRepository.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/integration/IntegrationRepository.kt @@ -1,6 +1,7 @@ package io.homeassistant.companion.android.common.data.integration import io.homeassistant.companion.android.common.data.integration.impl.entities.RateLimitResponse +import kotlinx.coroutines.flow.Flow interface IntegrationRepository { @@ -47,6 +48,7 @@ interface IntegrationRepository { suspend fun getEntities(): List> suspend fun getEntity(entityId: String): Entity> + suspend fun getEntityUpdates(): Flow> suspend fun callService(domain: String, service: String, serviceData: HashMap) diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/integration/impl/IntegrationRepositoryImpl.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/integration/impl/IntegrationRepositoryImpl.kt index aa59daedeaa..aa0c44a0548 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/integration/impl/IntegrationRepositoryImpl.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/integration/impl/IntegrationRepositoryImpl.kt @@ -25,6 +25,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities. import io.homeassistant.companion.android.common.data.url.UrlRepository import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import okhttp3.HttpUrl.Companion.toHttpUrlOrNull import org.json.JSONArray import java.util.regex.Pattern @@ -462,6 +464,19 @@ class IntegrationRepositoryImpl @Inject constructor( ) } + override suspend fun getEntityUpdates(): Flow> { + return webSocketRepository.getStateChanges().map { + Entity( + it.newState.entityId, + it.newState.state, + it.newState.attributes, + it.newState.lastChanged, + it.newState.lastUpdated, + it.newState.context + ) + } + } + private suspend fun canRegisterEntityCategoryStateClass(): Boolean { val version = getHomeAssistantVersion() val matches = VERSION_PATTERN.matcher(version) diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/WebSocketRepository.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/WebSocketRepository.kt index cdaed2ff6b4..ca8938047c4 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/WebSocketRepository.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/WebSocketRepository.kt @@ -4,6 +4,8 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities. import io.homeassistant.companion.android.common.data.integration.impl.entities.ServiceCallRequest import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse +import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent +import kotlinx.coroutines.flow.Flow interface WebSocketRepository { suspend fun sendPing(): Boolean @@ -12,4 +14,5 @@ interface WebSocketRepository { suspend fun getServices(): List suspend fun getPanels(): List suspend fun callService(request: ServiceCallRequest) + suspend fun getStateChanges(): Flow } diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt index 4c5362d48a4..cdf31e7d929 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketRepositoryImpl.kt @@ -12,12 +12,22 @@ import io.homeassistant.companion.android.common.data.integration.impl.entities. import io.homeassistant.companion.android.common.data.url.UrlRepository import io.homeassistant.companion.android.common.data.websocket.WebSocketRepository import io.homeassistant.companion.android.common.data.websocket.impl.entities.DomainResponse +import io.homeassistant.companion.android.common.data.websocket.impl.entities.EventResponse import io.homeassistant.companion.android.common.data.websocket.impl.entities.GetConfigResponse import io.homeassistant.companion.android.common.data.websocket.impl.entities.SocketResponse +import io.homeassistant.companion.android.common.data.websocket.impl.entities.StateChangedEvent import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.ProducerScope +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withTimeout @@ -44,10 +54,13 @@ class WebSocketRepositoryImpl @Inject constructor( private val mapper = jacksonObjectMapper() .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) private val responseCallbackJobs = mutableMapOf>() - private val subscriptionCallbacks = mutableMapOf Unit>() private val id = AtomicLong(1) private var connection: WebSocket? = null private var connected = Job() + private var stateChangedFlow: SharedFlow? = null + + @ExperimentalCoroutinesApi + private var producerScope: ProducerScope? = null override suspend fun sendPing(): Boolean { val socketResponse = sendMessage( @@ -107,6 +120,38 @@ class WebSocketRepositoryImpl @Inject constructor( TODO("Not yet implemented") } + @ExperimentalCoroutinesApi + override suspend fun getStateChanges(): Flow { + if (stateChangedFlow == null) { + + val response = sendMessage( + mapOf( + "type" to "subscribe_events", + "event_type" to "state_changed" + ) + ) + + stateChangedFlow = callbackFlow { + producerScope = this + awaitClose { + Log.d(TAG, "Unsubscribing from state_changes") + ioScope.launch { + sendMessage( + mapOf( + "type" to "unsubscribe_events", + "subscription" to response.id + ) + ) + } + producerScope = null + stateChangedFlow = null + } + }.shareIn(ioScope, SharingStarted.WhileSubscribed()) + } + + return stateChangedFlow!! + } + /** * This method will */ @@ -181,6 +226,15 @@ class WebSocketRepositoryImpl @Inject constructor( responseCallbackJobs.remove(id) } + @ExperimentalCoroutinesApi + private suspend fun handleEvent(response: SocketResponse) { + val eventResponse = mapper.convertValue( + response.event, + object : TypeReference() {} + ) + producerScope?.send(eventResponse.data) + } + override fun onOpen(webSocket: WebSocket, response: Response) { Log.d(TAG, "Websocket: onOpen") } @@ -196,6 +250,7 @@ class WebSocketRepositoryImpl @Inject constructor( "auth_ok" -> handleAuthComplete(true) "auth_invalid" -> handleAuthComplete(false) "pong", "result" -> handleMessage(message) + "event" -> handleEvent(message) else -> Log.d(TAG, "Unknown message type: $text") } } diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/EventResponse.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/EventResponse.kt new file mode 100644 index 00000000000..52b41d9d2ab --- /dev/null +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/EventResponse.kt @@ -0,0 +1,10 @@ +package io.homeassistant.companion.android.common.data.websocket.impl.entities + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +@JsonIgnoreProperties(ignoreUnknown = true) +data class EventResponse( + val eventType: String, + val timeFired: String, + val data: StateChangedEvent +) diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/SocketResponse.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/SocketResponse.kt index 543bae7b261..5a997ff40b8 100644 --- a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/SocketResponse.kt +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/SocketResponse.kt @@ -8,5 +8,6 @@ data class SocketResponse( val id: Long?, val type: String, val success: Boolean?, - val result: JsonNode? + val result: JsonNode?, + val event: JsonNode? ) diff --git a/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/StateChangedEvent.kt b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/StateChangedEvent.kt new file mode 100644 index 00000000000..9f93dfb9454 --- /dev/null +++ b/common/src/main/java/io/homeassistant/companion/android/common/data/websocket/impl/entities/StateChangedEvent.kt @@ -0,0 +1,11 @@ +package io.homeassistant.companion.android.common.data.websocket.impl.entities + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import io.homeassistant.companion.android.common.data.integration.Entity + +@JsonIgnoreProperties(ignoreUnknown = true) +data class StateChangedEvent( + val entityId: String, + val oldState: Entity<*>, + val newState: Entity<*> +)