Skip to content

Commit

Permalink
KernelInfo broadcast (#2469)
Browse files Browse the repository at this point in the history
* KernelInfo has always a Uri

* do not re enter commands while broadcasting

* align ts behaviour
  • Loading branch information
colombod authored Nov 15, 2022
1 parent 5e1b5fd commit 73634e8
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,10 @@ Microsoft.DotNet.Interactive
public System.Collections.Generic.IReadOnlyList<System.String> ProbingPaths { get;}
public System.String ToString()
public abstract class RoutingSlip
public System.Boolean Contains(System.Uri uri)
public System.Boolean Contains(System.Uri uri, System.Boolean includeTags = True)
public System.Boolean Contains(System.String uri)
protected System.Boolean Contains(Entry entry)
public System.Boolean ContainsUriWithoutTags(System.String uri)
public System.Void ContinueWith(RoutingSlip other)
protected System.Collections.Generic.ICollection<Entry> get_Entries()
public System.Void Stamp(System.Uri uri)
Expand Down
58 changes: 58 additions & 0 deletions src/Microsoft.DotNet.Interactive.Tests/KernelInfoTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,62 @@ public async Task It_returns_the_list_of_dynamic_kernel_commands()
nameof(CustomCommandTypes.FirstSubmission.MyCommand));
}
}

[Fact]
public async Task when_hosts_have_bidirectional_proxies_RequestKernelInfo_is_not_forwarded_back_to_the_host_that_initiated_the_request()
{
using var localCompositeKernel = new CompositeKernel("LOCAL")
{
new FakeKernel("fsharp")
};


using var remoteCompositeKernel = new CompositeKernel("REMOTE")
{
new CSharpKernel(),
new FakeKernel("fsharp", languageName: "fsharp")
};

ConnectHost.ConnectInProcessHost(
localCompositeKernel,
remoteCompositeKernel);

var remoteKernelUri = new Uri("kernel://remote/fsharp");

await localCompositeKernel
.Host
.ConnectProxyKernelOnDefaultConnectorAsync(
"proxied-fsharp",
remoteKernelUri);

// make a proxy from local to the remote composite kernel
await localCompositeKernel
.Host
.ConnectProxyKernelOnDefaultConnectorAsync(
"proxied-remote",
remoteCompositeKernel.KernelInfo.Uri);

// make a proxy from remote to the local composite kernel
await remoteCompositeKernel
.Host
.ConnectProxyKernelOnDefaultConnectorAsync(
"proxied-local",
localCompositeKernel.KernelInfo.Uri);

var result = await localCompositeKernel.SendAsync(
new RequestKernelInfo());

var events = result.KernelEvents.ToSubscribedList();

events.Should()
.ContainSingle<KernelInfoProduced>(e => e.KernelInfo.LocalName == "proxied-fsharp")
.Which
.KernelInfo
.Should()
.BeEquivalentTo(new
{
LanguageName = "fsharp",
RemoteUri = remoteKernelUri
}, c => c.ExcludingMissingMembers());
}
}
4 changes: 3 additions & 1 deletion src/Microsoft.DotNet.Interactive/CompositeKernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ private protected override async Task HandleRequestKernelInfoAsync(
{
if (childKernel.SupportsCommand(command))
{
await childKernel.HandleAsync(command, context);
var childCommand = new RequestKernelInfo(childKernel.Name);
childCommand.RoutingSlip.ContinueWith(command.RoutingSlip);
await childKernel.HandleAsync(childCommand, context);
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/Microsoft.DotNet.Interactive/Connection/ProxyKernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ private Task HandleByForwardingToRemoteAsync(KernelCommand command, KernelInvoca
command.GetOrCreateId();

command.OriginUri ??= KernelInfo.Uri;

if (command.DestinationUri is null)
{
command.DestinationUri = KernelInfo.RemoteUri;
}

if (command is RequestKernelInfo requestKernelInfo)
{
if (requestKernelInfo.RoutingSlip.Contains(KernelInfo.RemoteUri, false))
{
return Task.CompletedTask;
}
}

var targetKernelName = command.TargetKernelName;
command.TargetKernelName = null;
Expand Down
3 changes: 1 addition & 2 deletions src/Microsoft.DotNet.Interactive/Kernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public abstract partial class Kernel :
private readonly HashSet<Type> _supportedCommandTypes;

private readonly Subject<KernelEvent> _kernelEvents = new();
private readonly CompositeDisposable _disposables;
private readonly CompositeDisposable _disposables = new();
private readonly ConcurrentDictionary<Type, KernelCommandInvocation> _dynamicHandlers = new();
private readonly ImmediateScheduler<KernelCommand, KernelCommandResult> _fastPathScheduler = new();
private FrontendEnvironment _frontendEnvironment;
Expand All @@ -63,7 +63,6 @@ protected Kernel(

SubmissionParser = new SubmissionParser(this);

_disposables = new CompositeDisposable();
_disposables.Add(Disposable.Create(() => _kernelEvents.OnCompleted()));

Pipeline = new KernelCommandPipeline(this);
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.DotNet.Interactive/KernelCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ private void UpdateKernelInfoAndIndex(
kernel.KernelInfo.Uri = new Uri(host.Uri, kernel.Name);
_kernelsByLocalUri.TryAdd(kernel.KernelInfo.Uri, kernel);
}
else
{
kernel.KernelInfo.Uri = new Uri(_compositeKernel.KernelInfo.Uri, kernel.Name);
_kernelsByLocalUri.TryAdd(kernel.KernelInfo.Uri, kernel);
}

if (kernel is ProxyKernel proxyKernel)
{
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.DotNet.Interactive/KernelInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public KernelInfo(
LanguageName = languageName;
LanguageVersion = languageVersion;
NameAndAliases = new HashSet<string> { LocalName };
Uri = new Uri($"kernel://local/{LocalName}");

if (aliases is not null)
{
Expand Down
7 changes: 4 additions & 3 deletions src/Microsoft.DotNet.Interactive/RoutingSlip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ public string[] ToUriArray()
{
var entries = _entries.Select(e => e.AbsoluteUri).ToArray();
return entries;
}
}

public bool Contains(Uri uri) => Contains(uri.AbsoluteUri);
public bool Contains(Uri uri, bool includeTags = true) => includeTags ? Contains(uri.AbsoluteUri) : ContainsUriWithoutTags(GetAbsoluteUriWithoutQuery(uri));

public bool Contains(string uri) => _entries.Any(e => e.AbsoluteUri == uri);

public bool ContainsUriWithoutTags(string uri) => _entries.Any(e => e.Uri == uri);

public bool StartsWith(RoutingSlip other) => StartsWith(other._entries);

public bool StartsWith(params string[] uris)
Expand Down
21 changes: 17 additions & 4 deletions src/microsoft-dotnet-interactive/src/compositeKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ export class CompositeKernel extends Kernel {

for (let kernel of this._childKernels) {
if (kernel.supportsCommand(invocation.commandEnvelope.commandType)) {
await kernel.handleCommand({ command: {}, commandType: contracts.RequestKernelInfoType });
const childCommand: contracts.KernelCommandEnvelope = {
commandType: contracts.RequestKernelInfoType,
command: {
targetKernelName: kernel.kernelInfo.localName
},
routingSlip: []
};
routingslip.continueCommandRoutingSlip(childCommand, invocation.commandEnvelope.routingSlip || []);
await kernel.handleCommand(childCommand);
}
}
}
Expand Down Expand Up @@ -321,10 +329,15 @@ class KernelCollection implements Iterable<Kernel> {
this._kernelsByNameOrAlias.set(alias, kernel);
});

if (this._compositeKernel.host) {
kernel.kernelInfo.uri = routingslip.createKernelUri(`${this._compositeKernel.host.uri}${kernel.kernelInfo.localName}`);//?
this._kernelsByLocalUri.set(kernel.kernelInfo.uri, kernel);
let baseUri = this._compositeKernel.host?.uri || this._compositeKernel.kernelInfo.uri;

if (!baseUri!.endsWith("/")) {
baseUri += "/";

}
kernel.kernelInfo.uri = routingslip.createKernelUri(`${baseUri}${kernel.kernelInfo.localName}`);//?
this._kernelsByLocalUri.set(kernel.kernelInfo.uri, kernel);


if (kernel.kernelType === KernelType.proxy) {
this._kernelsByRemoteUri.set(kernel.kernelInfo.remoteUri!, kernel);
Expand Down
1 change: 1 addition & 0 deletions src/microsoft-dotnet-interactive/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class Kernel {
localName: name,
languageName: languageName,
aliases: [],
uri: routingslip.createKernelUri(`kernel://local/${name}`),
languageVersion: languageVersion,
supportedDirectives: [],
supportedKernelCommands: []
Expand Down
7 changes: 7 additions & 0 deletions src/microsoft-dotnet-interactive/src/proxyKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ export class ProxyKernel extends Kernel {
}

commandInvocation.commandEnvelope.routingSlip;//?

if (commandInvocation.commandEnvelope.commandType === contracts.RequestKernelInfoType) {
const destinationUri = this.kernelInfo.remoteUri!;
if (routingSlip.commandRoutingSlipContains(commandInvocation.commandEnvelope, destinationUri)) {
return Promise.resolve();
}
}
Logger.default.info(`proxy ${this.name}[local uri:${this.kernelInfo.uri}, remote uri:${this.kernelInfo.remoteUri}] forwarding command ${commandInvocation.commandEnvelope.commandType} to ${commandInvocation.commandEnvelope.command.destinationUri}`);
this._sender.send(commandInvocation.commandEnvelope);
Logger.default.info(`proxy ${this.name}[local uri:${this.kernelInfo.uri}, remote uri:${this.kernelInfo.remoteUri}] about to await with token ${commandToken} and commandid ${commandId}`);
Expand Down
12 changes: 6 additions & 6 deletions src/microsoft-dotnet-interactive/src/routingslip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ function routingSlipStartsWith(thisKernelUris: string[], otherKernelUris: string
return startsWith;
}

export function eventRoutingSlipContains(kernlEvent: contracts.KernelEventEnvelope, kernelUri: string): boolean {
return routingSlipContains(kernlEvent, kernelUri);
export function eventRoutingSlipContains(kernlEvent: contracts.KernelEventEnvelope, kernelUri: string, includeTags: boolean = true): boolean {
return routingSlipContains(kernlEvent, kernelUri, includeTags);
}

export function commandRoutingSlipContains(kernlEvent: contracts.KernelCommandEnvelope, kernelUri: string): boolean {
return routingSlipContains(kernlEvent, kernelUri);
export function commandRoutingSlipContains(kernlEvent: contracts.KernelCommandEnvelope, kernelUri: string, includeTags: boolean = true): boolean {
return routingSlipContains(kernlEvent, kernelUri, includeTags);
}

function routingSlipContains(kernelCommandOrEventEnvelope: KernelCommandOrEventEnvelope, kernelUri: string) {
function routingSlipContains(kernelCommandOrEventEnvelope: KernelCommandOrEventEnvelope, kernelUri: string, includeTags: boolean = true): boolean {
const normalizedUri = createKernelUri(kernelUri);
return kernelCommandOrEventEnvelope?.routingSlip?.find(e => normalizedUri === createKernelUri(e)) !== undefined;
return kernelCommandOrEventEnvelope?.routingSlip?.find(e => normalizedUri === (includeTags ? createKernelUriWithQuery(e) : createKernelUri(e))) !== undefined;
}
Loading

0 comments on commit 73634e8

Please sign in to comment.