Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1 JobCancellationException #11

Merged
merged 1 commit into from
Feb 22, 2021
Merged

Fixes #1 JobCancellationException #11

merged 1 commit into from
Feb 22, 2021

Conversation

AChep
Copy link
Contributor

@AChep AChep commented Feb 18, 2021

No description provided.

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

I actually have a reproducible bug caused by this issue :) Would be nice to see this merged.

@tfcporciuncula
Copy link
Owner

Would you be able to share your reproducible bug with a minimum reproducible example? Based on the limited context I currently have, swallowing the exception doesn't really feel like a good solution.

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

Yeah, I'll do that in an hour or two. But looking at the docs https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html there's a statement about that

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

It will happen in this scenario: system decides to notify us about the change, next line is the offer call. The system takes away the control from that thread, and gives it to the one that cancels the flow.

Boom. We have got a crash once main thread regains the control.

+---------+            +---------+      
| Thread1 |            | Thread2 |      
+---------+            +---------+      
     |                      |           
     | New event            |           
     |----------            |           
     |         |            |           
     |<---------            |           
     |                      |           
     | Invoking the...      |           
     |----------------      |           
     |               |      |           
     |<---------------      |           
     |                      |           
     |                      | Cancelled 
     |                      |---------- 
     |                      |         | 
     |                      |<--------- 
     |                      |           
     | ...callback          |           
     |------------          |           
     |           |          |           
     |<-----------          |           
     |                      |           
     | Crash                |           
     |------                |           
     |     |                |           
     |<-----                |           
     |                      |  

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

@tfcporciuncula is this a good minimum reproducible example? :P

@tfcporciuncula
Copy link
Owner

I'm sorry, I'm not sure I follow it 😅 I'd rather rely on an isolated example that I can run and reproduce the issue, so we're sure we're solving an existing problem in the library.

Seems like this exception might be thrown if we try to offer after the underlying channel has been closed, right? I don't understand what would be causing it to close, though.

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

Seems like this exception might be thrown if we try to offer after the underlying channel has been closed, right? I don't understand what would be causing it to close, though.

Exactly

It's likely to happen in the following use case:

  private val job = launch(Dispatchers.Default) {
    pref.collect()
  }
  
  suspend fun foo() {
    job.cancel()
    pref.setAndCommit("foo")
  }

@AChep
Copy link
Contributor Author

AChep commented Feb 18, 2021

@tfcporciuncula It happens because the cancellation of the flow and the callback are invoked in the different threads.

  internal val keyFlow: KeyFlow = callbackFlow {
    // called on thread A # 1
    val listener = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
      // called on thread B # 2
      kotlin.runCatching {
        // called on thread B # 4
        offer(key)
      }
    }
    sharedPreferences.registerOnSharedPreferenceChangeListener(listener)
    awaitClose {
      // called on thread A # 3
      sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener) 
    }
  }

I numerated the possible order of execution.

@tfcporciuncula
Copy link
Owner

So it's a race condition between unregistering the listener and notifying a last change. I'll spend some time doing some experiments here and I'll get back to you.

@tfcporciuncula
Copy link
Owner

I'm still unable to repro this. From my experiments, I'm always in the same thread on those scenarios, and I was also unable to reproduce it with the snippet from #11 (comment).

The only reason I'm being resistant to this change is because I feel this scenario might only happen on specific circumstances on the client side that shouldn't happen in the first place, and thus shouldn't be handled by the lib -- the case reported in the issue was an example of that.

I would still be happy to revisit this given a proper minimum reproducible example, though. It's hard for me to judge this without being able to repro the issue.

@AChep
Copy link
Contributor Author

AChep commented Feb 19, 2021

Well, it's really easy to reproduce. Run this:

    onStart() {
        (0..100).forEach {
            shoot()
        }
    }

    val sharedPreferences by lazy { PreferenceManager.getDefaultSharedPreferences(context) }

    val flowSharedPreferences by lazy { FlowSharedPreferences(sharedPreferences) }

    private fun shoot() {
        val key = Math.random().toString()
        val pref = flowSharedPreferences.getInt(key)

        val j = GlobalScope.launch(Dispatchers.Default) {
            pref.asFlow().collect()
        }

        GlobalScope.launch(Dispatchers.Default) {
            j.cancel()
            pref.setAndCommit(1)
        }
    }

@tfcporciuncula
Copy link
Owner

If it's "really easy" to reproduce it, you should've started with a reproducible example from the beginning. I'll give it a try with this new snipped and get back to you again.

@AChep
Copy link
Contributor Author

AChep commented Feb 20, 2021

@tfcporciuncula sorry for being annoying, but have you got any chance to try that snippet? If it's still not reproducible I'd like to try to write something more robust at failing.

@tfcporciuncula
Copy link
Owner

@AChep That example was enough for me to reproduce it, thanks. I currently have two thoughts:

  1. If we really decide to avoid the crash, instead of wrapping the offer() call within a runCatching, I'd rather check if isActive first. So it'd just be like if (isActive) offer(key). This should be enough to avoid the crashes, but could you please verify it on your end as well?
  2. I'm wondering if the example you gave me is simply an incorrect usage of the library or not. You're deliberately setting a value in the preference after cancelling the job that was observing the preference value. I guess you might still want to change the preference value even though you're not interested anymore in being notified of any changes? Is that a valid use case? Do you have any thoughts?

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

@tfcporciuncula

If we really decide to avoid the crash, instead of wrapping the offer() call within a runCatching, I'd rather check if isActive first. So it'd just be like if (isActive) offer(key). This should be enough to avoid the crashes, but could you please verify it on your end as well?

Unfortunately due to a multithreaded nature of the bug that won't help us^

  internal val keyFlow: KeyFlow = callbackFlow {
    // called on thread A # 1
    val listener = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
      // called on thread B # 2
      kotlin.runCatching {
        // called on thread B # 3
        if (isActive) {
          // called on thread B # 5
          offer(key)
        }
      }
    }
    sharedPreferences.registerOnSharedPreferenceChangeListener(listener)
    awaitClose {
      // called on thread A # 4
      sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener) 
    }
  }

I'm wondering if the example you gave me is simply an incorrect usage of the library or not. You're deliberately setting a value in the preference after cancelling the job that was observing the preference value. I guess you might still want to change the preference value even though you're not interested anymore in being notified of any changes? Is that a valid use case? Do you have any thoughts?

Yes, it is. Simplest example: saving the preference in #onStop() (where coincidentally the observing stops).

@tfcporciuncula
Copy link
Owner

Do you think you can come up with an example where it crashes even with the isActive check? I can verify that the example you gave me has multiple threads involved (even though seems like #1 is the same as #2, #3 and #5, and just #4 is different), but I'm unable to repro the crash there if I have the isActive check. I've increased the loop to 1000 and 10000 and ran it multiple times, but it seems to work fine -- and the crash always happens if the check isn't there.

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

Do you think you can come up with an example where it crashes even with the isActive check? I can verify that the example you gave me has multiple threads involved (even though seems like #1 is the same as #2, #3 and #5, and just #4 is different), but I'm unable to repro the crash there if I have the isActive check. I've increased the loop to 1000 and 10000 and ran it multiple times, but it seems to work fine -- and the crash always happens if the check isn't there.

We are just making it less likely to happen with that check.

Looking at the docs there's nothing wrong to check for that crash :p

@tfcporciuncula
Copy link
Owner

I'm just not a fan of a catch all kind of approach. Worst case scenario I would prefer to have a proper try catch block that only swallows JobCancellationException.

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

@tfcporciuncula should I replace that with checking only for ClosedSendChannelException ?

@tfcporciuncula
Copy link
Owner

Why not JobCancellationException?

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

Why not JobCancellationException?

Doesn't it throw the ClosedSendChannelException? As written in the docs https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

I'd love if you tested it with ClosedSendChannelException if you can... Don't really want to pull the lib into the sample.

@tfcporciuncula
Copy link
Owner

JobCancellationException is internal and we actually can't catch it 😅 now I understand why the workaround proposed here uses runCatching.

Catching ClosedSendChannelException doesn't work, I think you're looking at the wrong doc. You should probably look here:

Throws an exception if the channel is closed for send (see close for details).

It doesn't specify what is thrown, but I can easily verify it's the JobCancellationException.

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

I'd go for the CancellationException then :P

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

i'll update the PR now.

@tfcporciuncula
Copy link
Owner

Yes, CancellationException would work, but by now I'm honestly sold on the runCatching. This is what I would propose us to do, though:

internal val keyFlow: KeyFlow = callbackFlow {
  val listener = SharedPreferences.OnSharedPreferenceChangeListener { _, key -> offerCatching(key) }
  sharedPreferences.registerOnSharedPreferenceChangeListener(listener)
  awaitClose { sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener) }
}

// https://github.com/Kotlin/kotlinx.coroutines/issues/974
private fun <E> SendChannel<E>.offerCatching(element: E): Boolean {
  return runCatching { offer(element) }.getOrDefault(false)
}

I don't want to have the runCatching (or even the try catch for CancellationException) there without any clear reasoning behind it. Having the extension in place with a link to the issue makes things clearer, what do you think?

It also seems like this will be a temporary solution, as it seems they will finally fix this on their side hopefully soon, which is one more reason I want to have the link to the issue there.

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

Okay, one more sec then :P

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

Should I put that to the SendChannelExt file? Or how do I call the file?

@tfcporciuncula
Copy link
Owner

Oh no I would actually leave the extension in the same file right below the keyFlow definition, and make it a private extension since we'll only be using it there. No need to have it in a separate file IMHO.

@AChep
Copy link
Contributor Author

AChep commented Feb 22, 2021

Done 🎉

Copy link
Owner

@tfcporciuncula tfcporciuncula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your patience and for helping me understanding the issue! Will have a new version out later today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants