Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add localContextPropagation to Task.Options, implement tracing Local vars #444

Merged
merged 74 commits into from
Nov 13, 2017

Conversation

leandrob13
Copy link
Contributor

@leandrob13 leandrob13 commented Oct 1, 2017

Fixes #246.

Update:

This PR introduces the concept of Local, inspired by Twitter's Local, which is a ThreadLocal that can be transported over asynchronous boundaries by supporting implementations — in Twitter's case being their Promise implementation.

So we are introducing in monix-execution:

  • monix.execution.misc.Local — our own implementation of Local, inspired by Twitter's implementation
  • monix.execution.schedulers.TracingScheduler — wraps any Scheduler reference into an implementation that can transport locals over asynchronous boundaries
  • monix.eval.TaskLocal: the pure, Task-enabled Local

Besides Local which has an implementation that literally keeps its state into a ThreadLocal, the challenge is to transport these locals over asynchronous boundaries. So we've got:

  • Task which is now capable of transporting these locals over the async boundaries managed by its own run-loop, provided that it gets executed with Options.localContextPropagation set to true (it's set to false by default)
  • TracingScheduler which works for Future and all abstractions that need an ExecutionContext for managing their async boundaries

An interesting implementation detail is that Task does not have this propagation enabled by default. This is because, for now at least, this is for users that want the propagation of locals and that know what they are doing.

One way of doing that is to use executeWithOptions:

task.executeWithOptions(_.enableLocalContextPropagation)
  // triggers the actual execution
  .runAsync

Another possibility is to use runAsyncOpt:

implicit val opts = Task.defaultOptions.copy(localContextPropagation = true)

// Options passed implicitly
val f = task.runAsyncOpt

In effect what ThreadLocal is for threads, TaskLocal is for tasks. Full example:

import monix.eval.{Task, TaskLocal}

val local = TaskLocal(0)

val task: Task[Unit] =
  for {
    value1 <- local.read // value1 == 0
    _ <- local.write(100)
    _ <- Task.shift      // async boundary
    value2 <- local.read // value2 == 100
    _ <- Task.shift      // async boundary
    value3 <- local.bind(200)(local.read.map(_ * 2)) // value3 == 200 * 2
    _ <- Task.shift      // async boundary
    value4 <- local.read // value4 == 100
    _ <- local.clear
    _ <- Task.shift      // async boundary
    value5 <- local.read // value5 == 0
  } yield {
    // Should print 0, 100, 400, 100, 0
    println("value1: " + value1)
    println("value2: " + value2)
    println("value3: " + value3)
    println("value4: " + value4)
    println("value5: " + value5)
  }

import monix.execution.Scheduler.Implicits.global
val opts = Task.defaultOptions.enableLocalContextPropagation

// Actual execution
val f = task.runAsyncOpt(global, opts)

This sample doesn't seem like much, but Local is a thread-safe variable that can be used in the context of Future, and TaskLocal is meant for thread-safe and pure variables that can be used in the context of Task.

Original:

Follow-up PR to compare the original TaskRunLoop implementation with the one adapted for Local propagation as proposed in #429. The benchmarks are presented below.

Tested on a 2.5 GHz Intel Dual Core i5, 16GB of RAM, SSD hard disk.
Benchmarks:

Master:

Prev:
TaskFlatMapLongLoopBenchmark.async   10000  thrpt   20   256.402 ±  18.459  ops/s
TaskFlatMapLongLoopBenchmark.eval    10000  thrpt   20  4598.770 ± 134.952  ops/s
TaskFlatMapLongLoopBenchmark.now     10000  thrpt   20  4825.676 ± 121.606  ops/s

Next:
TaskFlatMapLongLoopBenchmark.async   10000  thrpt   20   219.106 ±   8.732  ops/s
TaskFlatMapLongLoopBenchmark.eval    10000  thrpt   20  4609.198 ±  94.487  ops/s
TaskFlatMapLongLoopBenchmark.now     10000  thrpt   20  5270.542 ± 118.069  ops/s

feature/tracedOptions:

Prev:
TaskFlatMapLongLoopBenchmark.async   10000  thrpt   20   288.565 ±  41.238  ops/s
TaskFlatMapLongLoopBenchmark.eval    10000  thrpt   20  4630.486 ±  42.603  ops/s
TaskFlatMapLongLoopBenchmark.now     10000  thrpt   20  4767.912 ± 224.114  ops/s

Next:
TaskFlatMapLongLoopBenchmark.async   10000  thrpt   20   205.929 ±  20.505  ops/s
TaskFlatMapLongLoopBenchmark.eval    10000  thrpt   20  4633.065 ±  71.458  ops/s
TaskFlatMapLongLoopBenchmark.now     10000  thrpt   20  5259.749 ± 108.196  ops/s

leandrob13 and others added 30 commits September 11, 2017 15:16
@codecov
Copy link

codecov bot commented Oct 15, 2017

Codecov Report

Merging #444 into master will increase coverage by 0.1%.
The diff coverage is 97.45%.

@@            Coverage Diff            @@
##           master     #444     +/-   ##
=========================================
+ Coverage   89.21%   89.32%   +0.1%     
=========================================
  Files         351      356      +5     
  Lines        9525     9609     +84     
  Branches     1269     1264      -5     
=========================================
+ Hits         8498     8583     +85     
+ Misses       1027     1026      -1

Copy link
Member

@alexandru alexandru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good, but I prefer that Options parameter to have a default value, not a globally implicit one. This is because we are breaking source compatibility otherwise.

@@ -295,11 +295,11 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @return a [[monix.execution.Cancelable Cancelable]] that can
* be used to cancel a running task
*/
def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =
def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler, opts: Options): Cancelable =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The opts value needs a default parameter, otherwise we are breaking source compatibility.

A call like runOnComplete(f)(ec) should still work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done without problems.

@@ -311,8 +311,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
* that can be used to extract the result or to cancel
* a running task.
*/
def runAsync(implicit s: Scheduler): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s)
def runAsync(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, I want opts to have a default value, because we need calls like runAsync(ec) to still work.

So do what you did above with opts: Options = defaultOptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexandru just a note on this. I can only define a default value for only one overloaded runAsync function. If I define it on this one, the other one can't have it and that is why I chose the implicit default options val because it broke the code in many places for the runAsync with the callback function.

I can make that change but it will affect code in reactive module, so the impact is going to be bigger.

Right now the default value is defined in the first runAsync function:

def runAsync(cb: Callback[A])(implicit s: Scheduler, opts: Options = defaultOptions): Cancelable

@@ -346,8 +346,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
* was hit and further async execution is needed or
* in case of failure
*/
def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] = {
val future = this.runAsync(s)
def runSyncMaybe(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we need runSyncMaybe(ec) to work, so opts needs to have a default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This an be done without problem.

@@ -371,8 +371,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
* value is available immediately, or `Left(future)` in case we
* have an asynchronous boundary or an error.
*/
def coeval(implicit s: Scheduler): Coeval[Either[CancelableFuture[A], A]] =
Coeval.eval(runSyncMaybe(s))
def coeval(implicit s: Scheduler, opts: Options): Coeval[Either[CancelableFuture[A], A]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, we need coeval(ec) to still work, otherwise we break source compatibility.

@@ -747,8 +747,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
* The application of this function has strict behavior, as the
* task is immediately executed.
*/
def foreach(f: A => Unit)(implicit s: Scheduler): CancelableFuture[Unit] =
foreachL(f).runAsync(s)
def foreach(f: A => Unit)(implicit s: Scheduler, opts: Options): CancelableFuture[Unit] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here source compatibility is maybe less important, but we need to be consistent.

@@ -915,8 +915,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
}

/** Converts the source `Task` to a `cats.effect.IO` value. */
def toIO(implicit s: Scheduler): IO[A] =
TaskConversions.toIO(this)(s)
def toIO(implicit s: Scheduler, opts: Options): IO[A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have this parameter in toIO, because it simply doesn't work and it's misleading the user.

IO is an equivalent data type, however we do not control its implementation. And the other option about enabling cancelable run-loops? IO is not cancelable.

So if this boundary is reached, we know that the user is not interested in local context propagation or cancelation or any future options we might have here.

// $COVERAGE-ON$
else
Options(
autoCancelableRunLoops =
Option(System.getProperty("monix.environment.autoCancelableRunLoops", ""))
.map(_.toLowerCase)
.exists(v => v == "yes" || v == "true" || v == "1"),
localContextPropagation =
Option(System.getProperty("monix.environment.localContextPropagation", ""))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The System.getProperty calls should be in Platform.scala, see:

It's fine to add stuff to it because it is private[monix].

@@ -27,13 +27,13 @@ import scala.util.{Failure, Success}

private[eval] object TaskConversions {
/** Implementation for `Task#toIO`. */
def toIO[A](source: Task[A])(implicit s: Scheduler): IO[A] =
def toIO[A](source: Task[A])(implicit s: Scheduler, opts: Task.Options): IO[A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned, I don't want the opts parameter in toIO.

@alexandru
Copy link
Member

@leandrob13 btw, sorry for the delay man, I've also been busy.

@alexandru
Copy link
Member

As a status update, I took the liberty of doing some refactoring to move this ticket forward.

So first of all I took a look at Twitter's own Local implementation to see how they did it.

I'm actually pleased that you chose an immutable Map over an Array — initially I was worried, but I don't understand how their Array is reasonable in presence of multi-threading — I mean if you're transporting that Array over an async boundary, but then you modify it immediately after sending that Runnable in the thread-pool, you can mess it up. Plus their Local.let doesn't work correctly.

I do hope we made the right choice though. I see only benefits right now in using an immutable Map as our data structure and I hope we aren't wrong.

Anyway, refactoring for greater consistency and for simplification is what I do best. So I hope you won't get upset about modifying your PR:

  • I moved the implementation of TracingScheduler from jvm to shared, since JavaScript can benefit from this as well, along with the tests
  • I removed TracingContext, CorrelationId and Local.LocalContext — I think you had pretty good reasons for those, but they add complexity and for now they are out — Twitter's Local also doesn't require the elements to inherit from some LocalContext, so we now have a type Context = Map[Key, _] (aka anything goes in it)
    • plus the TracingContext trait stuff is a neat idea, but for a library like Monix, once you add an API in it, it's really hard to take it out or change it due to users suffering — Monix 3.0 breaks binary compatibility and even though I'm glad that we have an excuse to do it, I don't want to repeat this too often — so lets just sit on this for now
  • I've added TaskLocal in monix-eval, as the pure version of Local — basically without this data type our effort is for nothing, because people want to use Task for suspending side effects in FP code and direct usage of monix.execution.misc.Local is not acceptable
  • I've renamed Local.withValue to Local.bind and Local.bindClear respectively — not sure why, it's a cool name and made sense last night :-)
  • The static Local.bind / withValue is now a macro that accepts a null parameter and if null, then it does nothing — this makes it usable straight it TaskRunLoop
    • a null Context is no longer a valid value for storing in that ThreadLocal, which in case we are using Map, it's fine because we are simply clearing that with Map.empty, which always returns the same reference (being immutable and all that)

This PR is ready to be merged, however we've got two problems:

  1. I'm mindful of performance implications. Seeing the differences of master versus 2.3.2, it appears we have performance degradation in Task.apply. If this PR does not make that worse, then it's going through.
  2. I don't know what to say about the new signature of runAsync — it's fine I guess, but it's breaking source compatibility so now I'm thinking that it's maybe best to just have executeWithOptions and that's it, nothing settled, will think about it today after analyzing how the internals are built (the important thing is piping those options in the implementation of the operators like mapBoth)

@alexandru
Copy link
Member

alexandru commented Nov 13, 2017

I have reverted the implicit opts: Options strategy for now. It is a very difficult API design decision this one, I'm very conflicted about it.

The issue is this — implicit parameters infect everything, including conversions from one type to another. For example task.toReactivePublisher or task.toIO. And we also have for example stuff in monix.reactive.Consumer.

So by requiring an opts: Options in runAsync we are creating an expectation that people can override it. And even if we allow the possibility, the conversion to another data type is problematic — Observable will definitely not work, etc.

For example I'm using Task within the implementation of some Observable operators. There's no way we are going to pass Options to those Observable operators, because it wouldn't do any good in the context of Observable. But other users that want to use Task for such jobs will end up conflicted about what to do for Options — should they allow the possibility of override? I know that many are conflicted about passing around an ExecutionContext. And then if they do pass those Options implicitly, then they can discover that it doesn't work because the async boundaries you have once you execute runAsync are outside the control of Task.

Basically if we let executeWithOptions on a Task to be the only way to set those options, then at least we don't break the current API and we don't impose certain expectations. So I'm at this point thinking that this strategy does the least amount of damage.

In other news, I've ran benchmarks, comparing this with master. We have performance degradation since 2.3.2, however this feature isn't to blame. So will be merged soon, after another review and the tests passing.

I'm also sorry for modifying your code @leandrob13, but you got me excited about this feature and it's a tough design problem at the same time, so thought it would be easier if we both worked on it for pushing it in master.

@leandrob13
Copy link
Contributor Author

leandrob13 commented Nov 13, 2017

@alexandru No worries, I am glad that this is moving forward. Everything you mentioned is about performance, API consistency and not breaking signatures so those are valid arguments. There is a style to respect and my work was based on Finagle, there is no problem for you to adapt it monix style.

I am not upset about not using runAsync with the options. I understand it was a big change and as I manifested earlier, it was a big concern of mine impacting the code that much. I only have a question, can we still set the env variable to enable propagation of locals like I was doing it at the beginning? That can still be a valid temporary alternative for allowing the generalized propagation.

This is a first important step and I can assure you I'll be testing this and I hope it gets more people interested in this.

Thanks for your help and I'll be waiting for the merge!

@alexandru alexandru merged commit 0e4e941 into monix:master Nov 13, 2017
@alexandru alexandru added this to the 3.0.0 milestone Jan 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants