Skip to content

Commit

Permalink
feature: Notify about DataBroker Disconnect
Browse files Browse the repository at this point in the history
Closes: #12
Signed-Off-By: Andre Weber <[email protected]>
  • Loading branch information
wba2hi authored and lukasmittag committed Oct 10, 2023
1 parent d724aba commit 47e378b
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.kuksa.CoroutineCallback;
import org.eclipse.kuksa.DataBrokerConnection;
import org.eclipse.kuksa.DataBrokerConnector;
import org.eclipse.kuksa.DisconnectListener;
import org.eclipse.kuksa.PropertyObserver;
import org.eclipse.kuksa.TimeoutConfig;
import org.eclipse.kuksa.model.Property;
Expand All @@ -41,7 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand All @@ -62,6 +65,8 @@ public class JavaDataBrokerEngine implements DataBrokerEngine {
@Nullable
private DataBrokerConnection dataBrokerConnection = null;

private final Set<DisconnectListener> disconnectListeners = new HashSet<>();

public JavaDataBrokerEngine(@NonNull AssetManager assetManager) {
this.assetManager = assetManager;
}
Expand Down Expand Up @@ -135,7 +140,12 @@ private void connect(
connector.connect(new CoroutineCallback<>() {
@Override
public void onSuccess(@Nullable DataBrokerConnection result) {
if (result == null) return;

JavaDataBrokerEngine.this.dataBrokerConnection = result;
for (DisconnectListener listener : disconnectListeners) {
result.getDisconnectListeners().register(listener);
}

callback.onSuccess(result);
}
Expand Down Expand Up @@ -186,6 +196,7 @@ public void disconnect() {
}

dataBrokerConnection.disconnect();
dataBrokerConnection = null;
}

@Nullable
Expand All @@ -198,4 +209,20 @@ public DataBrokerConnection getDataBrokerConnection() {
public void setDataBrokerConnection(@Nullable DataBrokerConnection dataBrokerConnection) {
this.dataBrokerConnection = dataBrokerConnection;
}

@Override
public void registerDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.add(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().register(listener);
}
}

@Override
public void unregisterDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.remove(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().unregister(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import androidx.compose.ui.Modifier
import androidx.lifecycle.lifecycleScope
import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.extension.metadata
import org.eclipse.kuksa.model.Property
Expand Down Expand Up @@ -61,20 +62,26 @@ class KuksaDataBrokerActivity : ComponentActivity() {

private val dataBrokerConnectionCallback = object : CoroutineCallback<DataBrokerConnection>() {
override fun onSuccess(result: DataBrokerConnection?) {
outputViewModel.appendOutput("Connection to data broker was successful")
outputViewModel.appendOutput("Connection to DataBroker successful established")
connectionViewModel.updateConnectionState(ConnectionViewState.CONNECTED)
}

override fun onError(error: Throwable) {
outputViewModel.appendOutput("Connection to data broker failed: ${error.message}")
outputViewModel.appendOutput("Connection to DataBroker failed: ${error.message}")
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
}
}

private val onDisconnectListener = DisconnectListener {
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
outputViewModel.appendOutput("DataBroker disconnected")
}

private lateinit var dataBrokerEngine: DataBrokerEngine
private val kotlinDataBrokerEngine by lazy {
KotlinDataBrokerEngine(lifecycleScope, assets)
}

private val javaDataBrokerEngine by lazy {
JavaDataBrokerEngine(assets)
}
Expand Down Expand Up @@ -125,6 +132,18 @@ class KuksaDataBrokerActivity : ComponentActivity() {
}
}

override fun onPostCreate(savedInstanceState: Bundle?) {
super.onPostCreate(savedInstanceState)

dataBrokerEngine.registerDisconnectListener(onDisconnectListener)
}

override fun onDestroy() {
super.onDestroy()

dataBrokerEngine.unregisterDisconnectListener(onDisconnectListener)
}

private fun connect(connectionInfo: ConnectionInfo) {
Log.d(TAG, "Connecting to DataBroker: $connectionInfo")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.kuksa.testapp.databroker

import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.proto.v1.KuksaValV1
Expand All @@ -41,4 +42,8 @@ interface DataBrokerEngine {

fun subscribe(property: Property, propertyObserver: PropertyObserver)
fun disconnect()

fun registerDisconnectListener(listener: DisconnectListener)

fun unregisterDisconnectListener(listener: DisconnectListener)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DataBrokerConnector
import org.eclipse.kuksa.DataBrokerException
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.TimeoutConfig
import org.eclipse.kuksa.model.Property
Expand All @@ -47,6 +48,8 @@ class KotlinDataBrokerEngine(
) : DataBrokerEngine {
override var dataBrokerConnection: DataBrokerConnection? = null

private val disconnectListeners = mutableSetOf<DisconnectListener>()

override fun connect(
connectionInfo: ConnectionInfo,
callback: CoroutineCallback<DataBrokerConnection>,
Expand Down Expand Up @@ -118,6 +121,10 @@ class KotlinDataBrokerEngine(
lifecycleScope.launch {
try {
dataBrokerConnection = connector.connect()
.also { connection ->
disconnectListeners.forEach { listener -> connection.disconnectListeners.register(listener) }
}

callback.onSuccess(dataBrokerConnection)
} catch (e: DataBrokerException) {
callback.onError(e)
Expand Down Expand Up @@ -158,6 +165,17 @@ class KotlinDataBrokerEngine(

override fun disconnect() {
dataBrokerConnection?.disconnect()
dataBrokerConnection = null
}

override fun registerDisconnectListener(listener: DisconnectListener) {
disconnectListeners.add(listener)
dataBrokerConnection?.disconnectListeners?.register(listener)
}

override fun unregisterDisconnectListener(listener: DisconnectListener) {
disconnectListeners.remove(listener)
dataBrokerConnection?.disconnectListeners?.unregister(listener)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class DataBrokerConnection internal constructor(
private val managedChannel: ManagedChannel,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
val disconnectListeners = MultiListener<DisconnectListener>()

init {
val state = managedChannel.getState(false)
managedChannel.notifyWhenStateChanged(state) {
val listeners = disconnectListeners.get()
listeners.forEach { listener ->
listener.onDisconnect()
}
}
}

/**
* Subscribes to the specified vssPath with the provided propertyObserver. Once subscribed the application will be
Expand Down
31 changes: 31 additions & 0 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DisconnectListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

/**
* The [DisconnectListener] can be registered to [DataBrokerConnection.disconnectListeners]
* When registered it will notify about manual or unexpected connection disconnects from the DataBroker.
*/
fun interface DisconnectListener {
/**
* Will be triggered, when the connection to the DataBroker was closed manually or unexpectedly.
*/
fun onDisconnect()
}
50 changes: 50 additions & 0 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/MultiListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

/**
* Generic Listener interface, to support multiple listeners.
*/
class MultiListener<T> {
private var listeners: MutableSet<T> = mutableSetOf()

/**
* Adds a new [listener] and returns true if the [listener] was successfully added, returns false otherwise.
* A [listener] can only be added once.
*/
fun register(listener: T): Boolean {
return listeners.add(listener)
}

/**
* Removes a [listener] and returns true if the [listener] was successfully removed, returns false otherwise.
*/
fun unregister(listener: T): Boolean {
return listeners.remove(listener)
}

/**
* Retrieves a defensive copy of the underlying list of listeners.
*/
@JvmSynthetic
internal fun get(): List<T> {
return listeners.toList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.kuksa

import io.grpc.ManagedChannel
import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import io.mockk.clearMocks
import io.mockk.mockk
import io.mockk.slot
Expand Down Expand Up @@ -178,6 +179,32 @@ class DataBrokerConnectionTest : BehaviorSpec({
}
}
}

// this test closes the connection, the connection can't be used afterward anymore
`when`("A DisconnectListener is registered successfully") {
val disconnectListener = mockk<DisconnectListener>()
val disconnectListeners = dataBrokerConnection.disconnectListeners
disconnectListeners.register(disconnectListener)

then("The number of registered DisconnectListeners should be 1") {
disconnectListeners.get().size shouldBe 1
}
`when`("Trying to register the same listener again") {
disconnectListeners.register(disconnectListener)

then("It is not added multiple times") {
disconnectListeners.get().size shouldBe 1
}
}
`when`("The Connection is closed manually") {
dataBrokerConnection.disconnect()

then("The DisconnectListener is triggered") {
verify { disconnectListener.onDisconnect() }
}
}
}
// connection is closed at this point
}
given("A DataBrokerConnection with a mocked ManagedChannel") {
val managedChannel = mockk<ManagedChannel>(relaxed = true)
Expand Down

0 comments on commit 47e378b

Please sign in to comment.