diff --git a/core/src/androidMain/kotlin/com/beepiz/bluetooth/gattcoroutines/GattConnectionImpl.kt b/core/src/androidMain/kotlin/com/beepiz/bluetooth/gattcoroutines/GattConnectionImpl.kt index fbe95c7..3738884 100644 --- a/core/src/androidMain/kotlin/com/beepiz/bluetooth/gattcoroutines/GattConnectionImpl.kt +++ b/core/src/androidMain/kotlin/com/beepiz/bluetooth/gattcoroutines/GattConnectionImpl.kt @@ -13,6 +13,7 @@ import com.beepiz.bluetooth.gattcoroutines.extensions.withCloseHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.channels.BroadcastChannel @@ -21,8 +22,7 @@ import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.consumeEach -import kotlinx.coroutines.channels.filter -import kotlinx.coroutines.channels.first +import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -66,12 +66,19 @@ internal class GattConnectionImpl( private val phyReadChannel = Channel>() private val isConnectedBroadcastChannel = ConflatedBroadcastChannel(false) - private val isConnectedChannel get() = isConnectedBroadcastChannel.openSubscription() + + private val isConnectedFlow + @UseExperimental(FlowPreview::class) //TODO: Copy it to dodge any future breaking changes + get() = isConnectedBroadcastChannel.asFlow() + override var isConnected: Boolean get() = runCatching { !isClosed && isConnectedBroadcastChannel.value }.getOrDefault(false) private set(value) = isConnectedBroadcastChannel.offerCatching(value).let { Unit } + private var isClosed = false + private var closedException: ConnectionClosedException? = null + private val stateChangeBroadcastChannel = ConflatedBroadcastChannel() @@ -108,7 +115,7 @@ internal class GattConnectionImpl( } gatt.connect().checkOperationInitiationSucceeded() } - isConnectedChannel.first { connected -> connected } + isConnectedFlow.first { connected -> connected } } override suspend fun disconnect() { @@ -117,7 +124,7 @@ internal class GattConnectionImpl( } checkNotClosed() requireGatt().disconnect() - isConnectedChannel.first { connected -> !connected } + isConnectedFlow.first { connected -> !connected } } override fun close(notifyStateChangeChannel: Boolean) { @@ -193,9 +200,10 @@ internal class GattConnectionImpl( "This characteristic doesn't support notification or doesn't come from discoverServices()." } setCharacteristicNotificationsEnabled(characteristic, enable = true) - val notificationChannel = characteristicChangedChannel.openSubscription().filter { + @UseExperimental(FlowPreview::class) //TODO: Copy it to dodge any future breaking changes. + val notificationChannel = characteristicChangedChannel.asFlow().filter { it.uuid == characteristic.uuid - } + }.produceIn(this) return if (disableNotificationsOnChannelClose) notificationChannel.withCloseHandler { setCharacteristicNotificationsEnabled(characteristic, enable = false) } else notificationChannel