Skip to content

Commit

Permalink
deferred command that match the kernel used during scheduling of a co…
Browse files Browse the repository at this point in the history
…mmand are executed
  • Loading branch information
colombod committed Feb 17, 2021
1 parent 00eaa50 commit 8a2ce09
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Microsoft.DotNet.Interactive.Tests
{
public class KernelCommandSchedulerTests
public class KernelCommandSchedulerTests : IDisposable
{
private readonly CompositeDisposable _disposables = new();

Expand Down Expand Up @@ -54,15 +54,15 @@ public async Task command_execute_on_kernel_specified_at_scheduling_time()

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, context) =>
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (command, context) =>
Handle = (command, _) =>
{
commandsHandledOnKernel2.Add(command);
return Task.CompletedTask;
Expand All @@ -78,5 +78,82 @@ public async Task command_execute_on_kernel_specified_at_scheduling_time()
commandsHandledOnKernel1.Should().ContainSingle().Which.Should().Be(command1);
commandsHandledOnKernel2.Should().ContainSingle().Which.Should().Be(command2);
}

[Fact]
public async Task scheduling_a_command_will_defer_deferred_commands_scheduled_on_same_kernel()
{
var commandsHandledOnKernel1 = new List<KernelCommand>();

var scheduler = new KernelCommandScheduler();

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (_, _) => Task.CompletedTask
};

var deferredCommand1 = new SubmitCode("deferred for kernel 1");
var deferredCommand2 = new SubmitCode("deferred for kernel 2");
var deferredCommand3 = new SubmitCode("deferred for kernel 1");
var command1 = new SubmitCode("for kernel 1");

scheduler.DeferCommand(deferredCommand1, kernel1);
scheduler.DeferCommand(deferredCommand2, kernel2);
scheduler.DeferCommand(deferredCommand3, kernel1);
await scheduler.Schedule(command1, kernel1);

commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
}

[Fact]
public async Task deferred_command_not_executed_are_still_in_deferred_queue()
{
var commandsHandledOnKernel1 = new List<KernelCommand>();
var commandsHandledOnKernel2 = new List<KernelCommand>();

var scheduler = new KernelCommandScheduler();

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (command, _) =>
{
commandsHandledOnKernel2.Add(command);
return Task.CompletedTask;
}
};

var deferredCommand1 = new SubmitCode("deferred for kernel 1");
var deferredCommand2 = new SubmitCode("deferred for kernel 2");
var deferredCommand3 = new SubmitCode("deferred for kernel 1");
var command1 = new SubmitCode("for kernel 1");
var command2 = new SubmitCode("for kernel 2");

scheduler.DeferCommand(deferredCommand1, kernel1);
scheduler.DeferCommand(deferredCommand2, kernel2);
scheduler.DeferCommand(deferredCommand3, kernel1);
await scheduler.Schedule(command1, kernel1);

commandsHandledOnKernel2.Should().BeEmpty();
commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
await scheduler.Schedule(command2, kernel2);
commandsHandledOnKernel2.Should().BeEquivalentSequenceTo(deferredCommand2, command2);
}
}
}
41 changes: 40 additions & 1 deletion src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Task<KernelCommandResult> Schedule(KernelCommand command, Kernel kernel,
CancelCommands();
break;
default:
UndeferCommands();
UndeferCommandsFor(kernel);
break;
}

Expand Down Expand Up @@ -162,7 +162,46 @@ private void UndeferCommands()
}
}

private void UndeferCommandsFor(Kernel kernel)
{
var commandsToKeepInDeferredList = new ConcurrentQueue<(KernelCommand command, Kernel kernel)>();
while (_deferredCommands.TryDequeue(out var deferredCommand))
{
if (IsInPath(kernel, deferredCommand.kernel))
{
_commandQueue.Enqueue(
new KernelOperation(
deferredCommand.command,
new TaskCompletionSource<KernelCommandResult>(),
deferredCommand.kernel,
true));
}
else
{
commandsToKeepInDeferredList.Enqueue(deferredCommand);
}
}

while (commandsToKeepInDeferredList.TryDequeue(out var deferredCommand))
{
_deferredCommands.Enqueue(deferredCommand);
}


bool IsInPath(Kernel toTest, Kernel deferredCommandKernel)
{
while (toTest is not null)
{
if (toTest == deferredCommandKernel)
{
return true;
}

toTest = toTest.ParentKernel;
}
return false;
}
}

private static bool CanCancel(KernelCommand command)
{
Expand Down

0 comments on commit 8a2ce09

Please sign in to comment.