Skip to content

Commit

Permalink
Merge #1499
Browse files Browse the repository at this point in the history
1499: Snockets r=karknu a=coot

This PR introduce snockets which abstract the interface for Berkeley sockets and named pipes (windows).

The PR consist of smaller reviewable and mostly buildable patches.

Highlight of changes:

Win32-network
--------------------

A bug fixed in `System.Win32.Async.connect`

network-mux
-----------------

* `runMuxWithPipes` handles named pipes as well
* new tests which include posix pipes, named pipes and queues
* mux-demo - windows named pipe demo using mux: an echo server and client

ouroboros-network
-------------------------

* `IOManager` - system indepdendent IO manager.  On Windows using `System.Win32.Async.withIOManager`, on posix it's a no-op
* `Snockets` - abstraction which handles both Berkeley sockets and Windows named pipes, together with smart constructors for both.
* `Snocket` shim layer with type alliases for different paltforms (posix / win32)
* demo-chain-sync - can now run on Windows with named pipes

future work
---------------
* `NodeToClient` and `NodeToNode` which are using named pipes for local clients on windows.

Co-authored-by: Marcin Szamotulski <[email protected]>
  • Loading branch information
iohk-bors[bot] and coot authored Feb 18, 2020
2 parents 5f77e24 + 810dba2 commit 4dc5fab
Show file tree
Hide file tree
Showing 35 changed files with 2,070 additions and 812 deletions.
2 changes: 2 additions & 0 deletions Win32-network/src/System/Win32/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module System.Win32.Async
, module System.Win32.Async.ErrCode
, module System.Win32.Async.IOManager
, module System.Win32.Async.Socket
, module System.Win32.Async.Socket.ByteString
) where

import System.Win32.Async.IOManager
import System.Win32.Async.File
import System.Win32.Async.ErrCode
import System.Win32.Async.Socket
import System.Win32.Async.Socket.ByteString
4 changes: 4 additions & 0 deletions Win32-network/src/System/Win32/Async/IOManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import System.Win32.Async.ErrCode
-- <https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport>
--
newtype IOCompletionPort = IOCompletionPort HANDLE
deriving Show

closeIOCompletionPort :: IOCompletionPort -> IO ()
closeIOCompletionPort (IOCompletionPort iocp) = Win32.closeHandle iocp
Expand All @@ -55,6 +56,9 @@ createIOCompletionPort concurrentThreads
foreign import ccall unsafe "windows.h CreateIoCompletionPort"
c_CreateIoCompletionPort :: HANDLE -> HANDLE -> Ptr () -> DWORD -> IO HANDLE

-- | Associate with I/O completion port. This can be used multiple times on
-- a file descriptor.
--
associateWithIOCompletionPort :: Either HANDLE Socket
-> IOCompletionPort
-> IO ()
Expand Down
2 changes: 1 addition & 1 deletion Win32-network/src/System/Win32/Async/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ connect :: Socket -> SockAddr -> IO ()
connect sock addr = do
v <- newEmptyMVar
_ <- mask_ $ forkIOWithUnmask $ \unmask ->
unmask (Socket.connect sock addr) >> putMVar v Nothing
unmask (Socket.connect sock addr >> putMVar v Nothing)
`catch` (\(e :: IOException) -> putMVar v (Just e))
r <- takeMVar v
case r of
Expand Down
154 changes: 154 additions & 0 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}

-- | Demo application which for now is only using mux over named pipes on
-- Windows.
--
-- TODO: extend it to use unix sockets.
--
module Main (main) where

import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (finally)
import Control.Tracer (Tracer (..), nullTracer, showTracing)
import Data.Bits
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BSC
import Data.Void

import qualified Network.Mux.Types as Mx
import qualified Network.Mux.Bearer.Pipe as Mx

import Test.Mux.ReqResp

import System.Win32
import System.Win32.NamedPipes
import qualified System.Win32.Async as Win32.Async

import System.IO
import System.Exit
import System.Environment

main :: IO ()
main = do
args <- getArgs
case args of
["server"] -> echoServer
["client", n, msg] -> client (read n) msg
_ -> usage

usage :: IO ()
usage = do
hPutStr stderr $ "usage: mux-demo server\n"
++" mux-demo client (n :: Int) (msg :: String)"
exitFailure

pipeName :: String
pipeName = "\\\\.\\pipe\\mux-demo"

putStrLn_ :: String -> IO ()
putStrLn_ = BSC.putStrLn . BSC.pack

debugTracer :: Show a => Tracer IO a
debugTracer = showTracing (Tracer putStrLn_)

--
-- Protocols
--

defaultProtocolLimits :: Mx.MiniProtocolLimits
defaultProtocolLimits =
Mx.MiniProtocolLimits {
Mx.maximumMessageSize = 3_000_000,
Mx.maximumIngressQueue = 3_000_000
}

--
-- server: accept loop, server loop
--


-- | Server accept loop.
--
echoServer :: IO ()
echoServer = Win32.Async.withIOManager $ \iocp -> do
hpipe <- createNamedPipe pipeName
(pIPE_ACCESS_DUPLEX .|. fILE_FLAG_OVERLAPPED)
(pIPE_TYPE_BYTE .|. pIPE_READMODE_BYTE)
pIPE_UNLIMITED_INSTANCES
1024
1024
0
Nothing
Win32.Async.associateWithIOCompletionPort (Left hpipe) iocp
Win32.Async.connectNamedPipe hpipe
_ <- forkIO $ do
serverLoop hpipe
`finally` closeHandle hpipe
threadDelay 1
echoServer


serverLoop :: HANDLE
-> IO ()
serverLoop h = do
let pipeChannel = Mx.pipeChannelFromNamedPipe h
Mx.runMuxWithPipes
nullTracer
app
pipeChannel
where
app :: Mx.MuxApplication 'Mx.ResponderApp IO Void ()
app = Mx.MuxApplication
[ Mx.MuxMiniProtocol {
Mx.miniProtocolNum = Mx.MiniProtocolNum 2,
Mx.miniProtocolLimits = defaultProtocolLimits,
Mx.miniProtocolRun = Mx.ResponderProtocolOnly
$ \channel -> runServer debugTracer channel serverApp
}
]

serverApp :: ReqRespServer ByteString ByteString IO ()
serverApp = ReqRespServer {
recvMsgReq = \req -> pure (req, serverApp),
recvMsgDone = pure ()
}


--
-- client
--


client :: Int -> String -> IO ()
client n msg = Win32.Async.withIOManager $ \iocp -> do
hpipe <- createFile pipeName
(gENERIC_READ .|. gENERIC_WRITE)
fILE_SHARE_NONE
Nothing
oPEN_EXISTING
fILE_FLAG_OVERLAPPED
Nothing
Win32.Async.associateWithIOCompletionPort (Left hpipe) iocp
let pipeChannel = Mx.pipeChannelFromNamedPipe hpipe
Mx.runMuxWithPipes
nullTracer
app
pipeChannel
where
app :: Mx.MuxApplication 'Mx.InitiatorApp IO () Void
app = Mx.MuxApplication
[ Mx.MuxMiniProtocol {
Mx.miniProtocolNum = Mx.MiniProtocolNum 2,
Mx.miniProtocolLimits = defaultProtocolLimits,
Mx.miniProtocolRun = Mx.InitiatorProtocolOnly
$ \channel -> runClient debugTracer channel (clientApp n (BSC.pack msg))
}
]

clientApp :: Int -> ByteString -> ReqRespClient ByteString ByteString IO ()
clientApp 0 _ = SendMsgDone (pure ())
clientApp m rawmsg = SendMsgReq rawmsg
(pure . clientApp (pred m)) -- send back request

33 changes: 33 additions & 0 deletions network-mux/network-mux.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ library
vector,
time >=1.6 && <1.10

if os(windows)
build-depends: Win32 >= 2.5.4.1 && <2.9,
Win32-network >=0.1 && <0.2

ghc-options: -Wall
-Wno-unticked-promoted-constructors
if flag(asserts)
Expand All @@ -60,6 +64,9 @@ library
Network.Mux.DeltaQ.TraceStatsSupport
Network.Mux.DeltaQ.TraceTransformer
Network.Mux.DeltaQ.TraceTypes
if os(windows)
exposed-modules:
Network.Mux.Bearer.NamedPipe
default-language: Haskell2010

test-suite test-network-mux
Expand Down Expand Up @@ -104,8 +111,34 @@ test-suite test-network-mux
tasty-hunit,
time

if os(windows)
build-depends: Win32 >= 2.5.4.1 && <2.9,
Win32-network >=0.1 && <0.2
ghc-options: -Wall
-Wno-unticked-promoted-constructors
-fno-ignore-asserts
-threaded
if flag(ipv6)
cpp-options: -DOUROBOROS_NETWORK_IPV6

executable mux-demo
if !os(windows)
buildable: False
hs-source-dirs: demo, test
main-is: mux-demo.hs
other-modules: Test.Mux.ReqResp
build-depends: base,
network-mux,
io-sim-classes,
io-sim,
contra-tracer,

binary,
bytestring,
cborg,
serialise,
Win32,
Win32-network
default-language: Haskell2010
ghc-options: -Wall
-threaded
85 changes: 85 additions & 0 deletions network-mux/src/Network/Mux/Bearer/NamedPipe.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Mux.Bearer.NamedPipe
( namedPipeAsBearer ) where

import Control.Monad (when)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Foldable (traverse_)

import GHC.Stack

import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer

import qualified Network.Mux as Mx
import Network.Mux.Types (MuxBearer)
import qualified Network.Mux.Types as Mx
import Network.Mux.Trace (MuxTrace)
import qualified Network.Mux.Trace as Mx
import qualified Network.Mux.Time as Mx
import qualified Network.Mux.Codec as Mx

import System.Win32 (HANDLE)
import qualified System.Win32.Async as Win32.Async


-- | Named pipe bearer. The 'HANDLE' must be associated with IO completion port
-- using 'System.Win32.Async.associateWithIOCompletionPort'.
--
namedPipeAsBearer :: Tracer IO MuxTrace
-> HANDLE
-> MuxBearer IO
namedPipeAsBearer tracer h =
Mx.MuxBearer {
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.sduSize = 24576
}
where
readNamedPipe :: HasCallStack => IO (Mx.MuxSDU, Time)
readNamedPipe = do
traceWith tracer Mx.MuxTraceRecvHeaderStart
hbuf <- recvLen' True 8 []
case Mx.decodeMuxSDU hbuf of
Left e -> throwM e
Right header -> do
traceWith tracer $ Mx.MuxTraceRecvHeaderEnd header
traceWith tracer $ Mx.MuxTraceRecvPayloadStart (fromIntegral $ Mx.msLength header)
blob <- recvLen' False (fromIntegral $ Mx.msLength header) []
ts <- getMonotonicTime
traceWith tracer (Mx.MuxTraceRecvDeltaQObservation header ts)
traceWith tracer $ Mx.MuxTraceRecvPayloadEnd blob
return (header {Mx.msBlob = blob}, ts)

recvLen' :: Bool -> Int64 -> [BL.ByteString] -> IO BL.ByteString
recvLen' _ 0 bufs = return (BL.concat $ reverse bufs)
recvLen' waitingOnNextHeader l bufs = do
traceWith tracer $ Mx.MuxTraceRecvStart $ fromIntegral l
buf <- BL.fromStrict <$> Win32.Async.readHandle h (fromIntegral l)
`catch` Mx.handleIOException "readHandle errored"
if BL.null buf
then do
when waitingOnNextHeader
$ threadDelay 1
throwM $ Mx.MuxError Mx.MuxBearerClosed (show h ++
" closed when reading data, waiting on next header " ++
show waitingOnNextHeader) callStack
else do
traceWith tracer (Mx.MuxTraceRecvEnd buf)
recvLen' False (l - fromIntegral (BL.length buf)) (buf : bufs)

writeNamedPipe :: Mx.MuxSDU -> IO Time
writeNamedPipe sdu = do
ts <- getMonotonicTime
let ts32 = Mx.timestampMicrosecondsLow32Bits ts
sdu' = sdu { Mx.msTimestamp = Mx.RemoteClockModel ts32 }
buf = Mx.encodeMuxSDU sdu'
traceWith tracer $ Mx.MuxTraceSendStart sdu'
traverse_ (Win32.Async.writeHandle h) (BL.toChunks buf)
`catch` Mx.handleIOException "writeHandle errored"
traceWith tracer Mx.MuxTraceSendEnd
return ts
Loading

0 comments on commit 4dc5fab

Please sign in to comment.