Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COMM messages support #182

Merged
merged 1 commit into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion src/IfSharp.Kernel/Kernel.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,28 @@ open Newtonsoft.Json
open NetMQ
open NetMQ.Sockets

type IfSharpKernel(connectionInformation : ConnectionInformation) =
/// A function that by it's side effect sends the received dict as a comm_message
type SendCommMessage = Dictionary<string,obj> -> unit

type CommOpenCallback = SendCommMessage -> CommOpen -> unit
type CommMessageCallback = SendCommMessage -> CommMessage -> unit
type CommCloseCallback = CommTearDown -> unit

type CommId = string
type CommTargetName = string

/// The set of callbacks which define comm registration at the kernell side
type CommCallbacks = {
/// called upon comm creation
onOpen : CommOpenCallback
/// called to handle every received message while the come is opened
onMessage : CommMessageCallback
/// called upon comm close
onClose: CommCloseCallback
}


type IfSharpKernel(connectionInformation : ConnectionInformation) =
// heartbeat
let hbSocket = new RouterSocket()
let hbSocketURL = String.Format("{0}://{1}:{2}", connectionInformation.transport, connectionInformation.ip, connectionInformation.hb_port)
Expand Down Expand Up @@ -44,6 +64,11 @@ type IfSharpKernel(connectionInformation : ConnectionInformation) =
let mutable executionCount = 0
let mutable lastMessage : Option<KernelMessage> = None

/// Registered comm difinitions (can be activated from Frontend side by comm_open message containing registered comm_target name)
let mutable registeredComms : Map<CommTargetName,CommCallbacks> = Map.empty;
/// Comms that are in the open state
let mutable activeComms : Map<CommId,CommTargetName> = Map.empty;

/// Gets the header code to prepend to all items
let headerCode =
let file = FileInfo(Assembly.GetEntryAssembly().Location)
Expand Down Expand Up @@ -471,6 +496,66 @@ type IfSharpKernel(connectionInformation : ConnectionInformation) =
sendMessage shellSocket msg "inspect_reply" reply
()

let sendCommData sourceEnvelope commId (data:Dictionary<string,obj>) =
let message : CommMessage = {comm_id=commId; data = data}
sendMessage ioSocket sourceEnvelope "comm_msg" message

let commOpen (msg : KernelMessage) (content : CommOpen) =
if String.IsNullOrEmpty(content.target_name) then
// as defined in protocol
let reply: CommTearDown = {comm_id = content.comm_id; data = Dictionary<string,obj>();}
sendMessage ioSocket msg "comm_close" reply
match Map.tryFind content.target_name registeredComms with
| Some callbacks ->
// executing open callback
let onOpen = callbacks.onOpen
let sendOnjectWithComm = sendCommData msg content.comm_id
onOpen sendOnjectWithComm content
// saving comm_id for created instance
activeComms <- Map.add content.comm_id content.target_name activeComms
logMessage (sprintf "comm opened id=%s target_name=%s" content.comm_id content.target_name)
| None ->
logMessage (sprintf "received comOpen request for the unknown com target_name \"%s\". Please register comm with this target_name first." content.target_name)
let reply: CommTearDown = {comm_id = content.comm_id; data = Dictionary<string,obj>();}
sendMessage ioSocket msg "comm_close" reply

let commMessage (msg : KernelMessage) (content : CommMessage) =
match Map.tryFind content.comm_id activeComms with
| Some comm_target ->
// finding corresponding callback
let callbacks = Map.find comm_target registeredComms
// and executing it
let onMessage = callbacks.onMessage
let sendOnjectWithComm = sendCommData msg content.comm_id
onMessage sendOnjectWithComm content
logMessage (sprintf "comm message handled id=%s target_name=%s" content.comm_id comm_target)
| None -> logMessage (sprintf "Got comm message (comm_id=%s), but there is nor opened comms with such comm_id. Ignoring" content.comm_id)

let commClose (msg : KernelMessage) (content : CommTearDown) =
match Map.tryFind content.comm_id activeComms with
| Some target_name ->
// executing close callback
let callbacks = Map.find target_name registeredComms
callbacks.onClose content
// removing comm from opened comms
activeComms <- Map.remove content.comm_id activeComms
logMessage (sprintf "comm closed id=%s target_name=%s" content.comm_id target_name)
| None -> logMessage (sprintf "Got comm close request (comm_id=%s), but there is nor opened comms with such comm_id" content.comm_id)

let commInfoRequest (msg : KernelMessage) (content : CommInfoRequest) =
// returning all open comms
let pairToDict pair =
let comm_id,target_name = pair
let dict = new Dictionary<string,string>();
dict.Add("target_name",target_name)
comm_id,dict
let openedCommsDict = Dictionary<string,Dictionary<string,string>>()
activeComms |> Map.toSeq |> Seq.map pairToDict |> Seq.iter (fun entry -> let key,value = entry in openedCommsDict.Add(key,value))
let reply = { comms = openedCommsDict}
sendMessage shellSocket msg "comm_info_reply" reply
logMessage (sprintf "Reporting %d opened comms" openedCommsDict.Count)


/// Loops forever receiving messages from the client and processing them
let doShell() =

Expand All @@ -493,6 +578,10 @@ type IfSharpKernel(connectionInformation : ConnectionInformation) =
| HistoryRequest(r) -> historyRequest msg r
| ObjectInfoRequest(r) -> objectInfoRequest msg r
| InspectRequest(r) -> inspectRequest msg r
| CommOpen(r) -> commOpen msg r
| CommMessage(r) -> commMessage msg r
| CommTearDown(r) -> commClose msg r
| CommInfoRequest(r) -> commInfoRequest msg r
| _ -> logMessage (String.Format("Unknown content type on shell. msg_type is `{0}`", msg.Header.msg_type))
with
| ex -> handleException ex
Expand All @@ -516,6 +605,32 @@ type IfSharpKernel(connectionInformation : ConnectionInformation) =
hbSocket.SendMultipartBytes hb
with
| ex -> handleException ex

/// Regesters a comm with specified target_name and callbacks
member __.RegisterComm(target_name, onOpen,onMessage,onClose) =
if Map.containsKey target_name registeredComms then
logMessage (sprintf "Warning! The comm with target_name \"%s\" is already registered. Overriding previous registration" target_name)
let callbacks :CommCallbacks =
{
onOpen = onOpen
onMessage = onMessage
onClose = onClose
}
registeredComms <- Map.add target_name callbacks registeredComms

/// Removes comm registration by specified comm target_name
member __.UnregisterComm(target_name) =
registeredComms <- Map.remove target_name registeredComms

/// Returns the constructed comm_id, send function. Comm target_name must be registered with RegisterComm prior to this call
//member __.OpenComm(target_name) =
// match Map.tryFind target_name registeredComms with
// | Some callbacks ->
// sendMessage

// | Nonw ->
// failwith (sprintf "the comm target_name \"%s\" is not registered. Register it befor opening" target_name)


/// Clears the display
member __.ClearDisplay () =
Expand Down
38 changes: 34 additions & 4 deletions src/IfSharp.Kernel/ShellMessages.fs
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,35 @@ type ConnectReply =
hb_port: int; // # The port the heartbeat socket is listening on.
}

type CommOpen = obj
type CommOpen =
{
comm_id: string; // # unique Com instance ID that has been just created by frontend
target_name: string; // # Which type of comm instance to construct at kernel side
data: Dictionary<string,obj>; // # initialization specific data
}

type CommInfoRequest = obj
type CommMessage =
{
comm_id: string; // # unique Com instance ID
data: Dictionary<string,obj>; // # payload
}

type CommTearDown =
{
comm_id: string; // # unique Com instance ID
data: Dictionary<string,obj>; // # payload
}

type CommInfoRequest =
{
target_name: string; // Optional, the target name
}

type CommInfoReply =
{
// A dictionary of the comms, indexed by uuids.
comms: Dictionary<string,Dictionary<string,string>>
}

type KernelRequest = obj

Expand Down Expand Up @@ -498,9 +524,12 @@ type ShellMessage =
| ConnectRequest of ConnectRequest
| ConnectReply of ConnectReply

// comm open?
// Comm related
| CommOpen of CommOpen
| CommMessage of CommMessage
| CommTearDown of CommTearDown
| CommInfoRequest of CommInfoRequest
| CommInfoReply of CommInfoReply

// kernel info
| KernelRequest of KernelRequest
Expand Down Expand Up @@ -561,8 +590,9 @@ module ShellMessages =
| "shutdown_request" -> ShutdownRequest (JsonConvert.DeserializeObject<ShutdownRequest>(messageJson))
| "shutdown_reply" -> ShutdownReply (JsonConvert.DeserializeObject<ShutdownReply>(messageJson))

//Jupyter 4.x support, do we need to do anything with this?
| "comm_open" -> CommOpen (JsonConvert.DeserializeObject<CommOpen>(messageJson))
| "comm_msg" -> CommMessage (JsonConvert.DeserializeObject<CommMessage>(messageJson))
| "comm_close" -> CommTearDown (JsonConvert.DeserializeObject<CommTearDown>(messageJson))

| "comm_info_request" -> CommInfoRequest (JsonConvert.DeserializeObject<CommInfoRequest>(messageJson))

Expand Down