Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snockets #1499

Merged
merged 33 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
27c8fac
Win32-network: export ByteString operations from System.Win32.Async
coot Jan 20, 2020
f7d7c6a
Fix a bug in `System.Win32.Async.connect`
coot Jan 20, 2020
ada1a72
network-mux: named pipe bearer
coot Jan 20, 2020
3f02632
network-mux: use System.Win32.Async in the socket bearer
coot Jan 20, 2020
53d43e0
network-mux: pipeAsMuxBearer using named pipes on Windows
coot Jan 20, 2020
f9bb986
ouroboros-network: pipe tests on windows
coot Jan 20, 2020
9784797
mux-demo application over named pipes
coot Jan 23, 2020
5a7d54d
runMuxWithQueues: let it be polymorphic over peerid
coot Oct 18, 2019
5baaf0f
Extended mux tests
coot Jan 23, 2020
ec269f0
ouroboros-network: IOManager
coot Jan 20, 2020
4733dfc
ConnectionId makde polymorphic over address type
coot Jan 20, 2020
261bd10
NetworkMutableState made polymorphic over address type
coot Jan 20, 2020
44e8d9c
NetworkServerTracers made polymorphic over address type
coot Jan 20, 2020
e6bc5e8
Snocket interface
coot Jan 21, 2020
cf96fa7
socketSnocket: Snocket interface for Berkeley sockets
coot Jan 21, 2020
f35038e
namedPipeSnocket: snocket interface for named pipes
coot Jan 21, 2020
36b4183
LocalSnocket shim layer
coot Jan 21, 2020
99c4e28
Snockets: integrate with ouroboros-network
coot Jan 21, 2020
edcad9f
demo-chain-sync - use snocket
coot Jan 21, 2020
0c93670
Win32-network: derive Show instance for IOCompletionPort
coot Jan 21, 2020
aff62d1
Rename Ouroboros.Network.Socket.fromSocket to fromSnocket
coot Jan 21, 2020
9682241
Added debugging tracers to Ouroboros.Network.Socket
coot Jan 23, 2020
0959074
NetworkDNSSubscriptionTracers take addr rather than peerid
coot Jan 23, 2020
3821baf
Updated ouroboros-consensus
coot Jan 24, 2020
2a603f8
fixup! demo-chain-sync - use snocket
coot Feb 14, 2020
44c7f80
Added docs to associateWithIOCompletionPort
coot Jan 31, 2020
36c5e79
Improved documentation and layout of the Snocket module
coot Jan 31, 2020
855f5f6
ouroboros-network: pipe tests on windows
coot Jan 20, 2020
8dbe1de
Diffusion - support local clients
coot Jan 24, 2020
d2205e2
Use ClientSnocket in NodeToClient
coot Jan 24, 2020
aea86dc
ErrorPolicies - remove return callback and simplify types
coot Jan 28, 2020
c59bb7a
clientSubscriptionWorker
coot Jan 28, 2020
810dba2
scripts/test.sh - run all ouroboros-network tests
coot Jan 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Win32-network/src/System/Win32/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module System.Win32.Async
, module System.Win32.Async.ErrCode
, module System.Win32.Async.IOManager
, module System.Win32.Async.Socket
, module System.Win32.Async.Socket.ByteString
) where

import System.Win32.Async.IOManager
import System.Win32.Async.File
import System.Win32.Async.ErrCode
import System.Win32.Async.Socket
import System.Win32.Async.Socket.ByteString
4 changes: 4 additions & 0 deletions Win32-network/src/System/Win32/Async/IOManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import System.Win32.Async.ErrCode
-- <https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport>
--
newtype IOCompletionPort = IOCompletionPort HANDLE
deriving Show

closeIOCompletionPort :: IOCompletionPort -> IO ()
closeIOCompletionPort (IOCompletionPort iocp) = Win32.closeHandle iocp
Expand All @@ -55,6 +56,9 @@ createIOCompletionPort concurrentThreads
foreign import ccall unsafe "windows.h CreateIoCompletionPort"
c_CreateIoCompletionPort :: HANDLE -> HANDLE -> Ptr () -> DWORD -> IO HANDLE

-- | Associate with I/O completion port. This can be used multiple times on
-- a file descriptor.
--
associateWithIOCompletionPort :: Either HANDLE Socket
-> IOCompletionPort
-> IO ()
Expand Down
2 changes: 1 addition & 1 deletion Win32-network/src/System/Win32/Async/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ connect :: Socket -> SockAddr -> IO ()
connect sock addr = do
v <- newEmptyMVar
_ <- mask_ $ forkIOWithUnmask $ \unmask ->
unmask (Socket.connect sock addr) >> putMVar v Nothing
unmask (Socket.connect sock addr >> putMVar v Nothing)
`catch` (\(e :: IOException) -> putMVar v (Just e))
r <- takeMVar v
case r of
Expand Down
154 changes: 154 additions & 0 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}

-- | Demo application which for now is only using mux over named pipes on
-- Windows.
--
-- TODO: extend it to use unix sockets.
--
module Main (main) where

import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (finally)
import Control.Tracer (Tracer (..), nullTracer, showTracing)
import Data.Bits
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BSC
import Data.Void

import qualified Network.Mux.Types as Mx
import qualified Network.Mux.Bearer.Pipe as Mx

import Test.Mux.ReqResp

import System.Win32
import System.Win32.NamedPipes
import qualified System.Win32.Async as Win32.Async

import System.IO
import System.Exit
import System.Environment

main :: IO ()
main = do
args <- getArgs
case args of
["server"] -> echoServer
["client", n, msg] -> client (read n) msg
_ -> usage

usage :: IO ()
usage = do
hPutStr stderr $ "usage: mux-demo server\n"
++" mux-demo client (n :: Int) (msg :: String)"
exitFailure

pipeName :: String
pipeName = "\\\\.\\pipe\\mux-demo"

putStrLn_ :: String -> IO ()
putStrLn_ = BSC.putStrLn . BSC.pack

debugTracer :: Show a => Tracer IO a
debugTracer = showTracing (Tracer putStrLn_)

--
-- Protocols
--

defaultProtocolLimits :: Mx.MiniProtocolLimits
defaultProtocolLimits =
Mx.MiniProtocolLimits {
Mx.maximumMessageSize = 3_000_000,
Mx.maximumIngressQueue = 3_000_000
}

--
-- server: accept loop, server loop
--


-- | Server accept loop.
--
echoServer :: IO ()
echoServer = Win32.Async.withIOManager $ \iocp -> do
hpipe <- createNamedPipe pipeName
(pIPE_ACCESS_DUPLEX .|. fILE_FLAG_OVERLAPPED)
(pIPE_TYPE_BYTE .|. pIPE_READMODE_BYTE)
pIPE_UNLIMITED_INSTANCES
1024
1024
0
Nothing
Win32.Async.associateWithIOCompletionPort (Left hpipe) iocp
Win32.Async.connectNamedPipe hpipe
_ <- forkIO $ do
serverLoop hpipe
`finally` closeHandle hpipe
threadDelay 1
echoServer


serverLoop :: HANDLE
-> IO ()
serverLoop h = do
let pipeChannel = Mx.pipeChannelFromNamedPipe h
Mx.runMuxWithPipes
nullTracer
app
pipeChannel
where
app :: Mx.MuxApplication 'Mx.ResponderApp IO Void ()
app = Mx.MuxApplication
[ Mx.MuxMiniProtocol {
Mx.miniProtocolNum = Mx.MiniProtocolNum 2,
Mx.miniProtocolLimits = defaultProtocolLimits,
Mx.miniProtocolRun = Mx.ResponderProtocolOnly
$ \channel -> runServer debugTracer channel serverApp
}
]

serverApp :: ReqRespServer ByteString ByteString IO ()
serverApp = ReqRespServer {
recvMsgReq = \req -> pure (req, serverApp),
recvMsgDone = pure ()
}


--
-- client
--


client :: Int -> String -> IO ()
client n msg = Win32.Async.withIOManager $ \iocp -> do
hpipe <- createFile pipeName
(gENERIC_READ .|. gENERIC_WRITE)
fILE_SHARE_NONE
Nothing
oPEN_EXISTING
fILE_FLAG_OVERLAPPED
Nothing
Win32.Async.associateWithIOCompletionPort (Left hpipe) iocp
let pipeChannel = Mx.pipeChannelFromNamedPipe hpipe
Mx.runMuxWithPipes
nullTracer
app
pipeChannel
where
app :: Mx.MuxApplication 'Mx.InitiatorApp IO () Void
app = Mx.MuxApplication
[ Mx.MuxMiniProtocol {
Mx.miniProtocolNum = Mx.MiniProtocolNum 2,
Mx.miniProtocolLimits = defaultProtocolLimits,
Mx.miniProtocolRun = Mx.InitiatorProtocolOnly
$ \channel -> runClient debugTracer channel (clientApp n (BSC.pack msg))
}
]

clientApp :: Int -> ByteString -> ReqRespClient ByteString ByteString IO ()
clientApp 0 _ = SendMsgDone (pure ())
clientApp m rawmsg = SendMsgReq rawmsg
(pure . clientApp (pred m)) -- send back request

33 changes: 33 additions & 0 deletions network-mux/network-mux.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ library
vector,
time >=1.6 && <1.10

if os(windows)
build-depends: Win32 >= 2.5.4.1 && <2.9,
Win32-network >=0.1 && <0.2

ghc-options: -Wall
-Wno-unticked-promoted-constructors
if flag(asserts)
Expand All @@ -60,6 +64,9 @@ library
Network.Mux.DeltaQ.TraceStatsSupport
Network.Mux.DeltaQ.TraceTransformer
Network.Mux.DeltaQ.TraceTypes
if os(windows)
exposed-modules:
Network.Mux.Bearer.NamedPipe
default-language: Haskell2010

test-suite test-network-mux
Expand Down Expand Up @@ -104,8 +111,34 @@ test-suite test-network-mux
tasty-hunit,
time

if os(windows)
build-depends: Win32 >= 2.5.4.1 && <2.9,
Win32-network >=0.1 && <0.2
ghc-options: -Wall
-Wno-unticked-promoted-constructors
-fno-ignore-asserts
-threaded
if flag(ipv6)
cpp-options: -DOUROBOROS_NETWORK_IPV6

executable mux-demo
if !os(windows)
buildable: False
hs-source-dirs: demo, test
main-is: mux-demo.hs
other-modules: Test.Mux.ReqResp
build-depends: base,
network-mux,
io-sim-classes,
io-sim,
contra-tracer,

binary,
bytestring,
cborg,
serialise,
Win32,
Win32-network
default-language: Haskell2010
ghc-options: -Wall
-threaded
85 changes: 85 additions & 0 deletions network-mux/src/Network/Mux/Bearer/NamedPipe.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Mux.Bearer.NamedPipe
( namedPipeAsBearer ) where

import Control.Monad (when)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Foldable (traverse_)

import GHC.Stack

import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer

import qualified Network.Mux as Mx
import Network.Mux.Types (MuxBearer)
import qualified Network.Mux.Types as Mx
import Network.Mux.Trace (MuxTrace)
import qualified Network.Mux.Trace as Mx
import qualified Network.Mux.Time as Mx
import qualified Network.Mux.Codec as Mx

import System.Win32 (HANDLE)
import qualified System.Win32.Async as Win32.Async


-- | Named pipe bearer. The 'HANDLE' must be associated with IO completion port
-- using 'System.Win32.Async.associateWithIOCompletionPort'.
--
namedPipeAsBearer :: Tracer IO MuxTrace
-> HANDLE
-> MuxBearer IO
namedPipeAsBearer tracer h =
Mx.MuxBearer {
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.sduSize = 24576
}
where
readNamedPipe :: HasCallStack => IO (Mx.MuxSDU, Time)
readNamedPipe = do
traceWith tracer Mx.MuxTraceRecvHeaderStart
hbuf <- recvLen' True 8 []
case Mx.decodeMuxSDU hbuf of
Left e -> throwM e
Right header -> do
traceWith tracer $ Mx.MuxTraceRecvHeaderEnd header
traceWith tracer $ Mx.MuxTraceRecvPayloadStart (fromIntegral $ Mx.msLength header)
blob <- recvLen' False (fromIntegral $ Mx.msLength header) []
ts <- getMonotonicTime
traceWith tracer (Mx.MuxTraceRecvDeltaQObservation header ts)
traceWith tracer $ Mx.MuxTraceRecvPayloadEnd blob
return (header {Mx.msBlob = blob}, ts)

recvLen' :: Bool -> Int64 -> [BL.ByteString] -> IO BL.ByteString
recvLen' _ 0 bufs = return (BL.concat $ reverse bufs)
recvLen' waitingOnNextHeader l bufs = do
traceWith tracer $ Mx.MuxTraceRecvStart $ fromIntegral l
buf <- BL.fromStrict <$> Win32.Async.readHandle h (fromIntegral l)
`catch` Mx.handleIOException "readHandle errored"
if BL.null buf
then do
when waitingOnNextHeader
$ threadDelay 1
throwM $ Mx.MuxError Mx.MuxBearerClosed (show h ++
" closed when reading data, waiting on next header " ++
show waitingOnNextHeader) callStack
else do
traceWith tracer (Mx.MuxTraceRecvEnd buf)
recvLen' False (l - fromIntegral (BL.length buf)) (buf : bufs)

writeNamedPipe :: Mx.MuxSDU -> IO Time
writeNamedPipe sdu = do
ts <- getMonotonicTime
let ts32 = Mx.timestampMicrosecondsLow32Bits ts
sdu' = sdu { Mx.msTimestamp = Mx.RemoteClockModel ts32 }
buf = Mx.encodeMuxSDU sdu'
traceWith tracer $ Mx.MuxTraceSendStart sdu'
traverse_ (Win32.Async.writeHandle h) (BL.toChunks buf)
`catch` Mx.handleIOException "writeHandle errored"
traceWith tracer Mx.MuxTraceSendEnd
return ts
Loading