Skip to content

Commit

Permalink
new scheduler api
Browse files Browse the repository at this point in the history
  • Loading branch information
colombod committed Feb 20, 2021
1 parent ccc8c0d commit e3aa6bd
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 135 deletions.
330 changes: 195 additions & 135 deletions src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,164 +50,224 @@ private void DisposeAfterTest(Action action)
}

[Fact]
public async Task command_execute_on_kernel_specified_at_scheduling_time()
public async Task scheduled_work_is_completed_in_order()
{
var commandsHandledOnKernel1 = new List<KernelCommand>();
var commandsHandledOnKernel2 = new List<KernelCommand>();
var scheduler = new KernelScheduler<int, int>();

var scheduler = new KernelCommandScheduler();
var executionList = new List<int>();

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (command, _) =>
{
commandsHandledOnKernel2.Add(command);
return Task.CompletedTask;
}
};
await scheduler.Schedule(1, (v) => executionList.Add(v));
await scheduler.Schedule(2, (v) => executionList.Add(v));
await scheduler.Schedule(3, (v) => executionList.Add(v));

var command1 = new SubmitCode("for kernel 1");
var command2 = new SubmitCode("for kernel 2");

await scheduler.Schedule(command1, kernel1);
await scheduler.Schedule(command2, kernel2);
executionList.Should().BeEquivalentSequenceTo(1, 2, 3);
}

commandsHandledOnKernel1.Should().ContainSingle().Which.Should().Be(command1);
commandsHandledOnKernel2.Should().ContainSingle().Which.Should().Be(command2);
[Fact]
public void scheduled_work_does_not_execute_in_parallel()
{
throw new NotImplementedException();
}

[Fact]
public async Task scheduling_a_command_will_defer_deferred_commands_scheduled_on_same_kernel()
public async Task deferred_work_is_executed_before_new_work()
{
var commandsHandledOnKernel1 = new List<KernelCommand>();
var executionList = new List<int>();

var scheduler = new KernelCommandScheduler();
var scheduler = new KernelScheduler<int, int>();
scheduler.RegisterDeferredOperationSource(
v => Enumerable.Repeat(v * 10, v), (v) => executionList.Add(v));

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (_, _) => Task.CompletedTask
};

var deferredCommand1 = new SubmitCode("deferred for kernel 1");
var deferredCommand2 = new SubmitCode("deferred for kernel 2");
var deferredCommand3 = new SubmitCode("deferred for kernel 1");
var command1 = new SubmitCode("for kernel 1");


scheduler.DeferCommand(deferredCommand1, kernel1);
scheduler.DeferCommand(deferredCommand2, kernel2);
scheduler.DeferCommand(deferredCommand3, kernel1);
await scheduler.Schedule(command1, kernel1);
await scheduler.Schedule(1, (v) => executionList.Add(v));
await scheduler.Schedule(2, (v) => executionList.Add(v));
await scheduler.Schedule(3, (v) => executionList.Add(v));

commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
executionList.Should().BeEquivalentSequenceTo(10,1,20,20, 2,30,30,30, 3);
}

[Fact]
public async Task deferred_command_not_executed_are_still_in_deferred_queue()
public async Task awaiting_one_operation_does_not_wait_all()
{
var commandsHandledOnKernel1 = new List<KernelCommand>();
var commandsHandledOnKernel2 = new List<KernelCommand>();
var executionList = new List<int>();

var scheduler = new KernelCommandScheduler();
var scheduler = new KernelScheduler<int, int>();

var kernel1 = new FakeKernel("kernel1")
{
Handle = (command, _) =>
{
commandsHandledOnKernel1.Add(command);
return Task.CompletedTask;
}
};
var kernel2 = new FakeKernel("kernel2")
{
Handle = (command, _) =>
{
commandsHandledOnKernel2.Add(command);
return Task.CompletedTask;
}
};

var deferredCommand1 = new SubmitCode("deferred for kernel 1");
var deferredCommand2 = new SubmitCode("deferred for kernel 2");
var deferredCommand3 = new SubmitCode("deferred for kernel 1");
var command1 = new SubmitCode("for kernel 1");
var command2 = new SubmitCode("for kernel 2");

scheduler.DeferCommand(deferredCommand1, kernel1);
scheduler.DeferCommand(deferredCommand2, kernel2);
scheduler.DeferCommand(deferredCommand3, kernel1);
await scheduler.Schedule(command1, kernel1);

commandsHandledOnKernel2.Should().BeEmpty();
commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
await scheduler.Schedule(command2, kernel2);
commandsHandledOnKernel2.Should().BeEquivalentSequenceTo(deferredCommand2, command2);

await scheduler.Schedule(1, (v) => executionList.Add(v));
await scheduler.Schedule(2, (v) => executionList.Add(v));
scheduler.Schedule(3, (v) => executionList.Add(v));

executionList.Should().BeEquivalentSequenceTo(10, 1, 20, 20, 2);
}

[Fact]
public async Task deferred_command_on_parent_kernel_are_executed_when_scheduling_command_on_child_kernel()
public void new_work_is_executed_after_all_require()
{
var commandHandledList = new List<(KernelCommand command, Kernel kernel)>();

var scheduler = new KernelCommandScheduler();

var childKernel = new FakeKernel("kernel1")
{
Handle = (command, context) => command.InvokeAsync(context)
};
var parentKernel = new CompositeKernel
{
childKernel
};

parentKernel.DefaultKernelName = childKernel.Name;

var deferredCommand1 = new TestCommand((command, context) =>
{
commandHandledList.Add((command, context.HandlingKernel));
return Task.CompletedTask;
}, childKernel.Name);
var deferredCommand2 = new TestCommand((command, context) =>
{
commandHandledList.Add((command, context.HandlingKernel));
return Task.CompletedTask;
}, parentKernel.Name);
var deferredCommand3 = new TestCommand((command, context) =>
{
commandHandledList.Add((command, context.HandlingKernel));
return Task.CompletedTask;
}, childKernel.Name);
var command1 = new TestCommand((command, context) =>
{
commandHandledList.Add((command, context.HandlingKernel));
return Task.CompletedTask;
}, childKernel.Name);

scheduler.DeferCommand(deferredCommand1, childKernel);
scheduler.DeferCommand(deferredCommand2, parentKernel);
scheduler.DeferCommand(deferredCommand3, childKernel);
await scheduler.Schedule(command1, childKernel);

commandHandledList.Select(e => e.command).Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand2, deferredCommand3, command1);

commandHandledList.Select(e => e.kernel).Should().BeEquivalentSequenceTo(childKernel, parentKernel, childKernel, childKernel);
throw new NotImplementedException();
}

//[Fact]
//public async Task command_execute_on_kernel_specified_at_scheduling_time()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();
// var commandsHandledOnKernel2 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel2.Add(command);
// return Task.CompletedTask;
// }
// };

// var command1 = new SubmitCode("for kernel 1", kernel1.Name);
// var command2 = new SubmitCode("for kernel 2", kernel2.Name);

// await scheduler.Schedule(command1);
// await scheduler.Schedule(command2);

// commandsHandledOnKernel1.Should().ContainSingle().Which.Should().Be(command1);
// commandsHandledOnKernel2.Should().ContainSingle().Which.Should().Be(command2);
//}

//[Fact]
//public async Task scheduling_a_command_will_defer_deferred_commands_scheduled_on_same_kernel()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (_, _) => Task.CompletedTask
// };

// var deferredCommand1 = new SubmitCode("deferred for kernel 1", kernel1.Name);
// var deferredCommand2 = new SubmitCode("deferred for kernel 2", kernel2.Name);
// var deferredCommand3 = new SubmitCode("deferred for kernel 1", kernel1.Name);
// var command1 = new SubmitCode("for kernel 1", kernel1.Name);

// scheduler.DeferCommand(deferredCommand1);
// scheduler.DeferCommand(deferredCommand2);
// scheduler.DeferCommand(deferredCommand3);
// await scheduler.Schedule(command1);

// commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
// commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
//}

//[Fact]
//public async Task deferred_command_not_executed_are_still_in_deferred_queue()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();
// var commandsHandledOnKernel2 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel2.Add(command);
// return Task.CompletedTask;
// }
// };

// var deferredCommand1 = new SubmitCode("deferred for kernel 1");
// var deferredCommand2 = new SubmitCode("deferred for kernel 2");
// var deferredCommand3 = new SubmitCode("deferred for kernel 1");
// var command1 = new SubmitCode("for kernel 1");
// var command2 = new SubmitCode("for kernel 2");

// scheduler.DeferCommand(deferredCommand1, kernel1);
// scheduler.DeferCommand(deferredCommand2, kernel2);
// scheduler.DeferCommand(deferredCommand3, kernel1);
// await scheduler.Schedule(command1, kernel1);

// commandsHandledOnKernel2.Should().BeEmpty();
// commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
// commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
// await scheduler.Schedule(command2, kernel2);
// commandsHandledOnKernel2.Should().BeEquivalentSequenceTo(deferredCommand2, command2);
//}

//[Fact]
//public async Task deferred_command_on_parent_kernel_are_executed_when_scheduling_command_on_child_kernel()
//{
// var commandHandledList = new List<(KernelCommand command, Kernel kernel)>();

// var scheduler = new KernelCommandScheduler();

// var childKernel = new FakeKernel("kernel1")
// {
// Handle = (command, context) => command.InvokeAsync(context)
// };
// var parentKernel = new CompositeKernel
// {
// childKernel
// };

// parentKernel.DefaultKernelName = childKernel.Name;

// var deferredCommand1 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);
// var deferredCommand2 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, parentKernel.Name);
// var deferredCommand3 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);
// var command1 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);

// scheduler.DeferCommand(deferredCommand1, childKernel);
// scheduler.DeferCommand(deferredCommand2, parentKernel);
// scheduler.DeferCommand(deferredCommand3, childKernel);
// await scheduler.Schedule(command1, childKernel);

// commandHandledList.Select(e => e.command).Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand2, deferredCommand3, command1);

// commandHandledList.Select(e => e.kernel).Should().BeEquivalentSequenceTo(childKernel, parentKernel, childKernel, childKernel);
//}
}
}
}
Loading

0 comments on commit e3aa6bd

Please sign in to comment.