Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxHelper::blockingScheduler should order by Worker instead of by Context #245

Closed
Gattag opened this issue Jan 23, 2021 · 11 comments · May be fixed by #246
Closed

RxHelper::blockingScheduler should order by Worker instead of by Context #245

Gattag opened this issue Jan 23, 2021 · 11 comments · May be fixed by #246

Comments

@Gattag
Copy link

Gattag commented Jan 23, 2021

Describe the feature (might be a bug? I don't know what to call it)

Currently the RxHelper::blockingScheduler with ordered execution enabled uses the ContextScheduler. The ContextScheduler always executes the tasks based on the current context, all tasks on the current context are ordered one after another, irrespective of the Worker it was created from.
The rx Scheduler.Worker class documentation reads: Represents an isolated, sequential worker of a parent Scheduler for executing Runnable tasks. It appears to have no requirement beyond tasks submitted to the worker being executed in order. The default rx scheduler for io tasks is io.reactivex.internal.schedulers.IoScheduler and the implementation of each Worker it creates is a single threaded ScheduledExecutorService.

The point of saying this is just that, tasks should be scheduled in order respective to the Worker, not the Context they were created from and this is supported by the documentation and implementation of RxJava2. I would like RxJava2 io tasks to only be run in order only when ordering is required by the spec, but not more. And to have more parallelism without breaking the expectation that tasks submitted to a Worker are executed in order, which setting ordered to false on RxHelper::blockingScheduler does. I believe this should improve parallelism of io tasks without breaking spec.

Contribution

I'll implement it as long as I am not crazy about the premise/rational (am I?). I already sorta did in a scheduler I wrote, by storing a queue of tasks in the workers and you get the rest.

@tsegismont
Copy link
Contributor

If calling blockingScheduler(Vertx vertx, boolean ordered) with ordered set to false does what you want, what is it you're missing exactly?

@Gattag
Copy link
Author

Gattag commented Jan 25, 2021

If ordered is set to true, tasks are scheduled in order of the context so if two workers that share context are created, only one task can run at a time between the two workers. This doesn't break anything, but RxJava expects that tasks submitted to a worker are only in order with respect to other tasks submitted to that worker, not a context possibly (often) shared by multiple workers.
But on the flip side, when ordered is set to false, tasks are not scheduled in any order at all. The problem is that RxJava workers are still being used with the premise that they order tasks that are submitted to the same worker, at least that's what the docs say should be expected and that's how the default scheduler operates.

The scheduler should be able to run tasks without requiring context order to be maintained, while still preserving the worker order.

@Gattag
Copy link
Author

Gattag commented Jan 25, 2021

Specifically, if I ran:

Scheduler s = RxHelper.createBlocking(vertx);
Context context = vertx.getOrCreateContext();
context.runOnContext(ignored -> {
    Scheduler.Worker w1 = s.createWorker();
    Scheduler.Worker w2 = s.createWorker();
    w1.schedule(() -> /*Runnable A*/);
    w1.schedule(() -> /*Runnable B*/);
    w2.schedule(() -> /*Runnable C*/);
    w2.schedule(() -> /*Runnable D*/);
});

Currently I will expect the run order to be one after the other A -> B -> C -> D
But I think I should expect C can run at same time as A; B runs after A; D runs after C

@tsegismont
Copy link
Contributor

Thanks for the details.

The purpose of ordered=true is to ensure that two tasks cannot be run concurrently (relatively to the Vert.x context). From this perspective, the implementation is fine, and it complies with the RxJava Worker contract.

That being said, when ordered=false, you are right the implementation is not compliant with the Worker contract. It should order tasks relatively to the Worker instance.

Would you mind contributing a fix?

@Gattag
Copy link
Author

Gattag commented Jan 29, 2021

@tsegismont I'm on it. Should the current ordered=false impl be maintained as a new third option for legacy reasons or no?

@tsegismont
Copy link
Contributor

@Gattag no, if it's not comliant it's a bug and should be fixed.

For the fix, it might useful to create a TaskQueue per worker when ordered is set to false.

@Gattag
Copy link
Author

Gattag commented Feb 1, 2021

I would also like to note that the scheduler introduces a potential point for drift in the schedulePeriodically. The documentation
says: The default implementation schedules and reschedules the Runnable task via the schedule(Runnable, long, TimeUnit) method over and over and at a fixed rate, that is, the first execution will be after the initialDelay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.

      private void run(Object arg) {
        synchronized (TimedAction.this) {
          if (disposed) {
            return;
          }
        }
        action.run();
        /*Thread could pause here for any length of time*/
        synchronized (TimedAction.this) {
          if (!disposed) {
            if (periodMillis > 0) {
              schedule(periodMillis);
            } else {
              disposed = true;
              actions.remove(this);
            }
          }
        }
      }

It reschedules the task after each run with the same given period. Even though the task is rescheduled before the execution of the task is completed, this still introduces a potential accumulation of drift in the periodicity of it execution.

I'm not sure if I'm am being nitpicky or not, but I think this should have just used Vertx::setPeriodic instead of creating the functionality on its own, as I believe Vertx::setPeriodic does not introduce any long term drift (I dug into the impl and it relies on EventExecutorGroup::scheduleAtFixedRate, which has the same timing language in it's documentation and hopefully doesn't introduce any drift). As before, I don't mind implementing this either, I'd like to contribute (and I'll be fixing it for myself regardless).

@tsegismont
Copy link
Contributor

Please go ahead. Make sure the periodic timer is canceled with the subscription.

@Gattag
Copy link
Author

Gattag commented Feb 1, 2021

@tsegismont Unless I'm missing something, currently there is no way to provide a custom TaskQueue to any WorkerExecutor::executeBlocking method and a Context cannot be forcibly assigned to the execution of a WorkerExecutor. Without this, I cannot implement the queueing with a TaskQueue for a WorkerExecutor. Also (This next thing might not be strictly necessary, but for now I am presuming it is), I have to wrap WorkerExecutor::executeBlocking in a Context::runOnContext, to ensure that it is executed on the correct context for when ordered=true. The rationale for this wrapping is that a worker might be accessed across more than one context and I need to maintain both the ordering at the Worker level and at the Context level.

Currently, I think I have a complete scheduler that meets everything that need to be fixed and there is no more blocking code anywhere. The issue is that it implements its own queue and wraps the WorkerExecutor::executeBlocking call with a Context::runOnContext (Which isn't that bad but could be better It's very bad and breaks things and I need to move to using TaskQueue and setting the Context internally).

From here, in terms of the two things I mentioned I'm not sure how I should proceed (very new to Vert.x contribution), should I stick with I got or should I look into making changes to core? And I apologize for all the questions

@tsegismont
Copy link
Contributor

Unless I'm missing something, currently there is no way to provide a custom TaskQueue to any WorkerExecutor::executeBlocking method and a Context cannot be forcibly assigned to the execution of a WorkerExecutor. Without this, I cannot implement the queueing with a TaskQueue for a WorkerExecutor

Right. We need a PR to Vert.x core that:

  • add an executeBlocking method to WorkerExecutorImpl that takes a task queue param
  • expose this method in WorkerExecutorInternal (as ContextInternal does)

I have to wrap WorkerExecutor::executeBlocking in a Context::runOnContext, to ensure that it is executed on the correct context for when ordered=true. The rationale for this wrapping is that a worker might be accessed across more than one context and I need to maintain both the ordering at the Worker level and at the Context level.

We agreed in previous comments that the current implementation is fine when ordered is set to true. So why do we need to change it?

should I look into making changes to core?

Yes please.

And I apologize for all the questions

No worries

@Gattag
Copy link
Author

Gattag commented Feb 4, 2021

We agreed in previous comments that the current implementation is fine when ordered is set to true. So why do we need to change it?

So I talked about this more in the PR I made, but I have a delima. When ordered=true, task should be ordered by the Context they are associated with, but that still does not mean that we should ignore the fact they still must obey the Worker order. The issue I see is that if you schedule two task on the same ContextWorker, but each one in a different context, while they will run in the order of their respective context's, they are no longer being ordered by the worker, this is not allowed. What I was talking about above was (partly) an incorrect attempt to fix that, it does not work. Actually, the Vert.x concurrency model has no way to support using a ContextWorker across multiple contexts (I can go into why if necessary but I will refrain for now). The solution is, force all scheduled tasks onto the context that the ContextWorker was created in. If all tasks are ordered in the same context, they will be ordered to the worker too.

To be honest, I made more fuss of this little sub issue than I should have as I can't see any reason why RxJava would be sharing workers across contexts, but maybe I'm wrong, I will do a little bit of investigating and see.

Right. We need a PR to Vert.x core that:

  • add an executeBlocking method to WorkerExecutorImpl that takes a task queue param
  • expose this method in WorkerExecutorInternal (as ContextInternal does)

I'm already on this, it will also expose methods to force the context to ensure that when it runs the blocking handler, the thread has the correct context. (This does have a purpose, albeit small)

@tsegismont tsegismont closed this as not planned Won't fix, can't repro, duplicate, stale Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging a pull request may close this issue.

2 participants