From 8a2ce09072c7b27e48f4d0e0b4ff8abf27e8e626 Mon Sep 17 00:00:00 2001 From: Diego Colombo Date: Wed, 17 Feb 2021 20:16:59 +0000 Subject: [PATCH] deferred command that match the kernel used during scheduling of a command are executed --- .../KernelCommandSchedulerTests.cs | 83 ++++++++++++++++++- .../KernelCommandScheduler.cs | 41 ++++++++- 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs b/src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs index 6d1420b55f..0985a58f28 100644 --- a/src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs +++ b/src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs @@ -13,7 +13,7 @@ namespace Microsoft.DotNet.Interactive.Tests { - public class KernelCommandSchedulerTests + public class KernelCommandSchedulerTests : IDisposable { private readonly CompositeDisposable _disposables = new(); @@ -54,7 +54,7 @@ public async Task command_execute_on_kernel_specified_at_scheduling_time() var kernel1 = new FakeKernel("kernel1") { - Handle = (command, context) => + Handle = (command, _) => { commandsHandledOnKernel1.Add(command); return Task.CompletedTask; @@ -62,7 +62,7 @@ public async Task command_execute_on_kernel_specified_at_scheduling_time() }; var kernel2 = new FakeKernel("kernel2") { - Handle = (command, context) => + Handle = (command, _) => { commandsHandledOnKernel2.Add(command); return Task.CompletedTask; @@ -78,5 +78,82 @@ public async Task command_execute_on_kernel_specified_at_scheduling_time() commandsHandledOnKernel1.Should().ContainSingle().Which.Should().Be(command1); commandsHandledOnKernel2.Should().ContainSingle().Which.Should().Be(command2); } + + [Fact] + public async Task scheduling_a_command_will_defer_deferred_commands_scheduled_on_same_kernel() + { + var commandsHandledOnKernel1 = new List(); + + var scheduler = new KernelCommandScheduler(); + + var kernel1 = new FakeKernel("kernel1") + { + Handle = (command, _) => + { + commandsHandledOnKernel1.Add(command); + return Task.CompletedTask; + } + }; + var kernel2 = new FakeKernel("kernel2") + { + Handle = (_, _) => Task.CompletedTask + }; + + var deferredCommand1 = new SubmitCode("deferred for kernel 1"); + var deferredCommand2 = new SubmitCode("deferred for kernel 2"); + var deferredCommand3 = new SubmitCode("deferred for kernel 1"); + var command1 = new SubmitCode("for kernel 1"); + + scheduler.DeferCommand(deferredCommand1, kernel1); + scheduler.DeferCommand(deferredCommand2, kernel2); + scheduler.DeferCommand(deferredCommand3, kernel1); + await scheduler.Schedule(command1, kernel1); + + commandsHandledOnKernel1.Should().NotContain(deferredCommand2); + commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1); + } + + [Fact] + public async Task deferred_command_not_executed_are_still_in_deferred_queue() + { + var commandsHandledOnKernel1 = new List(); + var commandsHandledOnKernel2 = new List(); + + var scheduler = new KernelCommandScheduler(); + + var kernel1 = new FakeKernel("kernel1") + { + Handle = (command, _) => + { + commandsHandledOnKernel1.Add(command); + return Task.CompletedTask; + } + }; + var kernel2 = new FakeKernel("kernel2") + { + Handle = (command, _) => + { + commandsHandledOnKernel2.Add(command); + return Task.CompletedTask; + } + }; + + var deferredCommand1 = new SubmitCode("deferred for kernel 1"); + var deferredCommand2 = new SubmitCode("deferred for kernel 2"); + var deferredCommand3 = new SubmitCode("deferred for kernel 1"); + var command1 = new SubmitCode("for kernel 1"); + var command2 = new SubmitCode("for kernel 2"); + + scheduler.DeferCommand(deferredCommand1, kernel1); + scheduler.DeferCommand(deferredCommand2, kernel2); + scheduler.DeferCommand(deferredCommand3, kernel1); + await scheduler.Schedule(command1, kernel1); + + commandsHandledOnKernel2.Should().BeEmpty(); + commandsHandledOnKernel1.Should().NotContain(deferredCommand2); + commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1); + await scheduler.Schedule(command2, kernel2); + commandsHandledOnKernel2.Should().BeEquivalentSequenceTo(deferredCommand2, command2); + } } } \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs b/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs index aec02558e0..4c0bf649f9 100644 --- a/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs +++ b/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs @@ -36,7 +36,7 @@ public Task Schedule(KernelCommand command, Kernel kernel, CancelCommands(); break; default: - UndeferCommands(); + UndeferCommandsFor(kernel); break; } @@ -162,7 +162,46 @@ private void UndeferCommands() } } + private void UndeferCommandsFor(Kernel kernel) + { + var commandsToKeepInDeferredList = new ConcurrentQueue<(KernelCommand command, Kernel kernel)>(); + while (_deferredCommands.TryDequeue(out var deferredCommand)) + { + if (IsInPath(kernel, deferredCommand.kernel)) + { + _commandQueue.Enqueue( + new KernelOperation( + deferredCommand.command, + new TaskCompletionSource(), + deferredCommand.kernel, + true)); + } + else + { + commandsToKeepInDeferredList.Enqueue(deferredCommand); + } + } + while (commandsToKeepInDeferredList.TryDequeue(out var deferredCommand)) + { + _deferredCommands.Enqueue(deferredCommand); + } + + + bool IsInPath(Kernel toTest, Kernel deferredCommandKernel) + { + while (toTest is not null) + { + if (toTest == deferredCommandKernel) + { + return true; + } + + toTest = toTest.ParentKernel; + } + return false; + } + } private static bool CanCancel(KernelCommand command) {