Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
colombod committed Feb 24, 2021
1 parent 682d49c commit d05b3d6
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

namespace Microsoft.DotNet.Interactive.Tests
{
public class KernelCommandSchedulerTests : IDisposable
public class KernelSchedulerTests : IDisposable
{
private readonly CompositeDisposable _disposables = new();

public KernelCommandSchedulerTests(ITestOutputHelper output)
public KernelSchedulerTests(ITestOutputHelper output)
{
DisposeAfterTest(output.SubscribeToPocketLogger());
}
Expand All @@ -35,7 +35,7 @@ public void Dispose()
}
catch (Exception ex)
{
Logger<KernelCommandSchedulerTests>.Log.Error(exception: ex);
Logger<KernelSchedulerTests>.Log.Error(exception: ex);
}
}

Expand Down
33 changes: 1 addition & 32 deletions src/Microsoft.DotNet.Interactive/CompositeKernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void Add(Kernel kernel, IReadOnlyCollection<string> aliases = null)
}

kernel.ParentKernel = this;
kernel.SetScheduler(Scheduler);
kernel.AddMiddleware(LoadExtensions);

AddChooseKernelDirective(kernel, aliases);
Expand Down Expand Up @@ -202,38 +203,6 @@ private Kernel GetHandlingKernel(
return kernel ?? this;
}

internal override async Task HandleAsync(
KernelCommand command,
KernelInvocationContext context)
{
var kernel = context.HandlingKernel;

if (kernel is null)
{
throw new NoSuitableKernelException(command);
}

switch (command)
{
case Cancel _:
CancelCommands();
kernel.CancelCommands();
break;
}

await kernel.RunDeferredCommandsAsync();

if (kernel != this)
{
// route to a subkernel
await kernel.Pipeline.SendAsync(command, context);
}
else
{
await base.HandleAsync(command, context);
}
}

private protected override IEnumerable<Parser> GetDirectiveParsersForCompletion(
DirectiveNode directiveNode,
int requestPosition)
Expand Down
106 changes: 82 additions & 24 deletions src/Microsoft.DotNet.Interactive/Kernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Parsing;
Expand All @@ -24,11 +25,12 @@ public abstract partial class Kernel : IDisposable
{
private readonly Subject<KernelEvent> _kernelEvents = new();
private readonly CompositeDisposable _disposables;

private readonly ConcurrentQueue<KernelCommand> _deferredCommands = new();
private readonly Dictionary<Type, KernelCommandInvocation> _dynamicHandlers = new();
private FrontendEnvironment _frontendEnvironment;
private ChooseKernelDirective _chooseKernelDirective;
private readonly KernelCommandScheduler _scheduler;

private KernelScheduler<KernelCommand, KernelCommandResult> _commandScheduler = null;

protected Kernel(string name)
{
Expand All @@ -45,8 +47,7 @@ protected Kernel(string name)

Pipeline = new KernelCommandPipeline(this);

_scheduler = new KernelCommandScheduler();


AddSetKernelMiddleware();

AddDirectiveMiddlewareAndCommonCommandHandlers();
Expand All @@ -56,7 +57,17 @@ protected Kernel(string name)
));
}

internal KernelCommandScheduler Scheduler => _scheduler;
internal KernelScheduler<KernelCommand, KernelCommandResult> Scheduler
{
get
{
if(_commandScheduler is null)
{
SetScheduler(new ());
}
return _commandScheduler;
}
}

internal KernelCommandPipeline Pipeline { get; }

Expand All @@ -76,7 +87,13 @@ public void DeferCommand(KernelCommand command)
}

command.SetToken($"deferredCommand::{Guid.NewGuid():N}");
Scheduler.DeferCommand(command, this);

if(string.IsNullOrWhiteSpace(command.TargetKernelName))
{
command.TargetKernelName = Name;
}

_deferredCommands.Enqueue(command);
}

private void AddSetKernelMiddleware()
Expand Down Expand Up @@ -235,35 +252,20 @@ internal virtual async Task HandleAsync(
public Task<KernelCommandResult> SendAsync(
KernelCommand command,
CancellationToken cancellationToken)
{
return SendAsync(command, cancellationToken, null);
}

internal Task<KernelCommandResult> SendAsync(
KernelCommand command,
CancellationToken cancellationToken,
Action onDone)
{
if (command == null)
{
throw new ArgumentNullException(nameof(command));
}

return Scheduler.Schedule(command, this, cancellationToken, onDone);

return Scheduler.Schedule(command, OnExecuteAsync, Name);
}

protected internal void CancelCommands()
{
Scheduler.CancelCommands();
}

internal Task RunDeferredCommandsAsync()
{
return Scheduler.RunDeferredCommandsAsync(this);

Scheduler.Cancel();
}

protected internal void PublishEvent(KernelEvent kernelEvent)
{
if (kernelEvent == null)
Expand Down Expand Up @@ -436,5 +438,61 @@ protected virtual ChooseKernelDirective CreateChooseKernelDirective()
}

internal ChooseKernelDirective ChooseKernelDirective => _chooseKernelDirective ??= CreateChooseKernelDirective();

internal void SetScheduler(KernelScheduler<KernelCommand, KernelCommandResult> scheduler)
{

_commandScheduler = scheduler;

IEnumerable<KernelCommand> GetDeferredOperations(KernelCommand command, string scope)
{
if (scope != Name)
{
yield break;
}

while (_deferredCommands.TryDequeue(out var kernelCommand))
{
yield return kernelCommand;
}
}

Scheduler.RegisterDeferredOperationSource(GetDeferredOperations, OnExecuteAsync);
}

internal async Task<KernelCommandResult> OnExecuteAsync(KernelCommand command)
{
var context = KernelInvocationContext.Establish(command);

// only subscribe for the root command
using var _ = context.Command == command
? context.KernelEvents.Subscribe(PublishEvent)
: Disposable.Empty;

try
{
await Pipeline.SendAsync(command, context);

if (command == context.Command)
{
await context.DisposeAsync();
}
else
{
context.Complete(command);
}

return context.Result;
}
catch (Exception exception)
{
if (!context.IsComplete)
{
context.Fail(exception);
}

throw;
}
}
}
}
21 changes: 5 additions & 16 deletions src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ public class KernelCommandScheduler

private readonly ConcurrentQueue<KernelOperation> _commandQueue = new();

public Task<KernelCommandResult> Schedule(KernelCommand command, Kernel kernel)
{
return Schedule(command, kernel, CancellationToken.None, () => { });
}


public Task<KernelCommandResult> Schedule(KernelCommand command, Kernel kernel, CancellationToken cancellationToken, Action onDone)
public Task<KernelCommandResult> Schedule(KernelCommand command, Kernel kernel, CancellationToken cancellationToken)
{


switch (command)
{
case Cancel _:
Expand All @@ -46,15 +41,14 @@ public Task<KernelCommandResult> Schedule(KernelCommand command, Kernel kernel,

_commandQueue.Enqueue(operation);

ProcessCommandQueue(_commandQueue, cancellationToken, onDone);
ProcessCommandQueue(_commandQueue, cancellationToken);

return kernelCommandResultSource.Task;
}

private void ProcessCommandQueue(
ConcurrentQueue<KernelOperation> commandQueue,
CancellationToken cancellationToken,
Action onDone)
CancellationToken cancellationToken)
{
if (commandQueue.TryDequeue(out var currentOperation))
{
Expand All @@ -64,13 +58,9 @@ private void ProcessCommandQueue(

await ExecuteCommand(currentOperation);

ProcessCommandQueue(commandQueue, cancellationToken, onDone);
ProcessCommandQueue(commandQueue, cancellationToken);
}, cancellationToken).ConfigureAwait(false);
}
else
{
onDone?.Invoke();
}
}

private async Task ExecuteCommand(KernelOperation operation)
Expand Down Expand Up @@ -142,8 +132,7 @@ internal Task RunDeferredCommandsAsync(Kernel kernel)
UndeferCommandsFor(kernel);
ProcessCommandQueue(
_commandQueue,
CancellationToken.None,
() => tcs.SetResult(Unit.Default));
CancellationToken.None);
return tcs.Task;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.DotNet.Interactive/KernelScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void Dispose()

public delegate Task<U> OnExecuteDelegate(T value);

public delegate IEnumerable<T> GetDeferredOperationsDelegate(T operationToExecute, string queueName);
public delegate IEnumerable<T> GetDeferredOperationsDelegate(T state, string scope);

private class ScheduledOperation
{
Expand Down
56 changes: 0 additions & 56 deletions src/Microsoft.DotNet.Interactive/KernelSchedulerExtensions.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ internal static KernelCommandInvocation DoNugetRestore()
};

await invocationContext.QueueAction(restore);
var kernel = invocationContext.HandlingKernel;
await kernel.RunDeferredCommandsAsync();
};

static string InstallingPackageMessage(PackageReference package)
Expand Down

0 comments on commit d05b3d6

Please sign in to comment.