diff --git a/Win32-network/Win32-network.cabal b/Win32-network/Win32-network.cabal index 9f8df5891b7..b9ed12ce098 100644 --- a/Win32-network/Win32-network.cabal +++ b/Win32-network/Win32-network.cabal @@ -28,8 +28,10 @@ library System.Win32.Async.ErrCode System.Win32.Async.IOManager System.Win32.Async.Internal + System.Win32.Async.WSABuf System.Win32.Async.Socket System.Win32.Async.Socket.ByteString + System.Win32.Async.Socket.ByteString.Lazy c-sources: cbits/Win32Async.c cbits/Win32Socket.c build-depends: base >= 4.5 && < 4.13, diff --git a/Win32-network/cbits/Win32Socket.c b/Win32-network/cbits/Win32Socket.c index de0d4e9cd2a..4b780dbb382 100644 --- a/Win32-network/cbits/Win32Socket.c +++ b/Win32-network/cbits/Win32Socket.c @@ -11,7 +11,7 @@ typedef struct _WSA_PER_IO_DATA { } WSA_PER_IO_DATA; DllExport -void HsSendBuf(SOCKET s, CHAR *buf, ULONG len, HsStablePtr userData) +void HsSendBuf(SOCKET s, WSABUF *lpBuffers, DWORD dwBufferCount, HsStablePtr userData) { int res; WSA_PER_IO_DATA *perIoDataPtr; @@ -26,11 +26,7 @@ void HsSendBuf(SOCKET s, CHAR *buf, ULONG len, HsStablePtr userData) perIoDataPtr->WSAOverlapped.OffsetHigh = 0; perIoDataPtr->UserData = userData; - WSABUF wsaBuf; - wsaBuf.len = len; - wsaBuf.buf = buf; - - res = WSASend(s, &wsaBuf, 1, NULL, 0, &(perIoDataPtr->WSAOverlapped), NULL); + res = WSASend(s, lpBuffers, dwBufferCount, NULL, 0, &(perIoDataPtr->WSAOverlapped), NULL); if (!res) { DWORD error; error = WSAGetLastError(); diff --git a/Win32-network/src/System/Win32/Async.hs b/Win32-network/src/System/Win32/Async.hs index 43d3be6d572..4a97e79dd0e 100644 --- a/Win32-network/src/System/Win32/Async.hs +++ b/Win32-network/src/System/Win32/Async.hs @@ -1,13 +1,16 @@ +-- | This is the main module of `Win32-network` package which should be used +-- with "System.Win32.Async.Socket.ByteString", +-- "System.Win32.Async.Socket.ByteString.Lazy" or "System.Win32.Async.File" for +-- sending and/or receiving. +-- module System.Win32.Async ( module System.Win32.Async.File , module System.Win32.Async.ErrCode , module System.Win32.Async.IOManager , module System.Win32.Async.Socket - , module System.Win32.Async.Socket.ByteString ) where import System.Win32.Async.IOManager import System.Win32.Async.File import System.Win32.Async.ErrCode import System.Win32.Async.Socket -import System.Win32.Async.Socket.ByteString diff --git a/Win32-network/src/System/Win32/Async/File.hs b/Win32-network/src/System/Win32/Async/File.hs index ad318c30955..4b4cf4a8b61 100644 --- a/Win32-network/src/System/Win32/Async/File.hs +++ b/Win32-network/src/System/Win32/Async/File.hs @@ -29,7 +29,10 @@ import qualified System.Win32.Types as Win32 import System.Win32.Async.ErrCode import System.Win32.Async.Internal --- | Read a given number of bytes from a 'HANDLE'. +-- | Read a given number of bytes from a 'HANDLE'. The 'HANDLE' must be +-- opened with 'System.Win32.fILE_FLAG_OVERLAPPED' and associated with an IO +-- completion port via +-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. -- readHandle :: HANDLE -> Int @@ -47,7 +50,9 @@ foreign import ccall safe "HsAsyncRead" -> IO () --- | Write a 'ByteString' to a HANDLE. +-- | Write a 'ByteString' to a 'HANDLE'. The 'HANDLE' must be opened with +-- 'System.Win32.fILE_FLAG_OVERLAPPED' and associated with an IO completion port +-- via 'System.Win32.Async.IOManager.associateWithIOCompletionPort'. -- writeHandle :: HANDLE -> ByteString @@ -64,16 +69,23 @@ foreign import ccall safe "HsAsyncWrite" -> IO () +-- | Connect named pipe aka accept a connection. The 'HANDLE' must be opened +-- with 'System.Win32.FILE_FLAG_OVERLLAPPED' and associated with IO completion +-- port via 'System.Win32.Async.IOManager.associateWithIOCompletionPort'. +-- +-- [msdn documentation](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe) +-- connectNamedPipe :: HANDLE -> IO () connectNamedPipe h = void $ waitForCompletion "connectNamedPipe" $ \ptr -> do c_ConnectNamedPipe h ptr errCode <- Win32.getLastError -- connectNamedPipe can error with: - -- * 'ERROR_PIPE_LISTENING' - the pipe is listening, we need to wait more - -- * 'ERROR_PIPE_CONNECTED' - the pipe is already connected - -- * 'ERROR_NO_DATA' - previous client has not disconnected, we - -- should error - -- * 'ERROR_IO_PENDING' - IO is pending, 'waitForCompletion' should + -- + -- 'ERROR_PIPE_LISTENING' - the pipe is listening, we need to wait more + -- 'ERROR_PIPE_CONNECTED' - the pipe is already connected + -- 'ERROR_NO_DATA' - previous client has not disconnected, we + -- should error + -- 'ERROR_IO_PENDING' - IO is pending, 'waitForCompletion' should -- resolve this unless ( errCode == eRROR_PIPE_LISTENING || errCode == eRROR_PIPE_CONNECTED diff --git a/Win32-network/src/System/Win32/Async/IOManager.hs b/Win32-network/src/System/Win32/Async/IOManager.hs index 6d03b55f734..35d5db400ca 100644 --- a/Win32-network/src/System/Win32/Async/IOManager.hs +++ b/Win32-network/src/System/Win32/Async/IOManager.hs @@ -30,8 +30,8 @@ import qualified System.Win32.Types as Win32 import qualified System.Win32.File as Win32 (closeHandle) import System.Win32.Async.ErrCode --- | New type wrapper which holds 'HANDLE' of the I/O completion port. --- +-- | New type wrapper which holds 'HANDLE' of an [I/O completion +-- port](https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport). -- newtype IOCompletionPort = IOCompletionPort HANDLE deriving Show @@ -39,7 +39,7 @@ newtype IOCompletionPort = IOCompletionPort HANDLE closeIOCompletionPort :: IOCompletionPort -> IO () closeIOCompletionPort (IOCompletionPort iocp) = Win32.closeHandle iocp --- | Windows documentation: +-- | [msdn documentation](https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport) -- createIOCompletionPort :: DWORD -- ^ number of concurrent threads @@ -57,7 +57,9 @@ foreign import ccall unsafe "windows.h CreateIoCompletionPort" c_CreateIoCompletionPort :: HANDLE -> HANDLE -> Ptr () -> DWORD -> IO HANDLE -- | Associate with I/O completion port. This can be used multiple times on --- a file descriptor. +-- a file descriptor. A 'HANDLE' must be opend with +-- 'System.Win32.fILE_FLAG_OVERLAPPED' flag for this call to succeed; 'Socket's +-- do not require special initialisation. -- associateWithIOCompletionPort :: Either HANDLE Socket -> IOCompletionPort diff --git a/Win32-network/src/System/Win32/Async/Internal.hs b/Win32-network/src/System/Win32/Async/Internal.hs index 22b00da924c..0659b8dd2dc 100644 --- a/Win32-network/src/System/Win32/Async/Internal.hs +++ b/Win32-network/src/System/Win32/Async/Internal.hs @@ -1,12 +1,21 @@ -module System.Win32.Async.Internal where +module System.Win32.Async.Internal + ( SOCKET + , CInt (..) + , waitForCompletion + , wsaWaitForCompletion + ) where import Control.Concurrent +import Foreign.C (CInt (..)) import Foreign.StablePtr (StablePtr, newStablePtr) import System.Win32.Types (ErrCode) import qualified System.Win32.Types as Win32 import System.Win32.Async.ErrCode + +type SOCKET = CInt + waitForCompletion :: String -> (StablePtr (MVar (Either ErrCode Int)) -> IO ()) -> IO Int diff --git a/Win32-network/src/System/Win32/Async/Socket.hs b/Win32-network/src/System/Win32/Async/Socket.hs index aefb06e65fc..407536e3259 100644 --- a/Win32-network/src/System/Win32/Async/Socket.hs +++ b/Win32-network/src/System/Win32/Async/Socket.hs @@ -1,4 +1,5 @@ {-# LANGUAGE InterruptibleFFI #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} module System.Win32.Async.Socket @@ -16,15 +17,15 @@ import Data.Word import Foreign.C (CInt (..)) import Foreign.Ptr (Ptr) import Foreign.StablePtr (StablePtr) +import Foreign.Marshal.Alloc (alloca) +import Foreign.Storable (Storable (poke)) import Network.Socket (Socket, SockAddr) import qualified Network.Socket as Socket import System.Win32.Types import System.Win32.Async.Internal - - -type SOCKET = CInt +import System.Win32.Async.WSABuf sendBuf :: Socket @@ -34,12 +35,14 @@ sendBuf :: Socket sendBuf sock buf size = Socket.withFdSocket sock $ \fd -> -- on Windows sockets are Word32, GHC represents file descriptors with CInt -- which is Int32. - wsaWaitForCompletion "sendBuf" (c_sendBuf fd buf (fromIntegral size)) + alloca $ \bufs_ptr -> do + poke bufs_ptr WSABuf {buf, len = fromIntegral size} + wsaWaitForCompletion "sendBuf" (c_sendBuf fd bufs_ptr 1) foreign import ccall safe "HsSendBuf" c_sendBuf :: SOCKET - -> Ptr Word8 - -> Word32 + -> Ptr WSABuf -- ^ lpBuffers + -> DWORD -- ^ dwBufferCount -> StablePtr b -> IO () @@ -51,7 +54,7 @@ connect :: Socket -> SockAddr -> IO () connect sock addr = do v <- newEmptyMVar _ <- mask_ $ forkIOWithUnmask $ \unmask -> - unmask (Socket.connect sock addr >> putMVar v Nothing) + (unmask (Socket.connect sock addr) >> putMVar v Nothing) `catch` (\(e :: IOException) -> putMVar v (Just e)) r <- takeMVar v case r of @@ -73,7 +76,7 @@ accept :: Socket -> IO (Socket, SockAddr) accept sock = do v <- newEmptyMVar _ <- mask_ $ forkIOWithUnmask $ \unmask -> - unmask (Socket.accept sock >>= putMVar v . Right) + (unmask (Socket.accept sock) >>= putMVar v . Right) `catch` (\(e :: IOException) -> putMVar v (Left e)) r <- takeMVar v case r of diff --git a/Win32-network/src/System/Win32/Async/Socket/ByteString.hs b/Win32-network/src/System/Win32/Async/Socket/ByteString.hs index dfe4cbc2dac..f9342f05099 100644 --- a/Win32-network/src/System/Win32/Async/Socket/ByteString.hs +++ b/Win32-network/src/System/Win32/Async/Socket/ByteString.hs @@ -20,8 +20,10 @@ import Network.Socket (Socket) import System.Win32.Async.Socket --- | Send a 'ByteString' over a socket, which must be in a connected state. --- Returns number of bytes sent. +-- | Send a 'ByteString' over a socket, which must be in a connected state, and +-- must be associated with an IO completion port via +-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. Returns number +-- of bytes sent. -- send :: Socket -> ByteString @@ -42,6 +44,11 @@ sendAll sock bs = do $ sendAll sock (BS.drop sent bs) +-- | Recv a 'ByteString' from a socket, which must be in a connected state, and +-- must be associated with an IO completion port via +-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. It may return +-- less bytes than requested. +-- recv :: Socket -> Int -> IO ByteString diff --git a/Win32-network/src/System/Win32/Async/Socket/ByteString/Lazy.hs b/Win32-network/src/System/Win32/Async/Socket/ByteString/Lazy.hs new file mode 100644 index 00000000000..a7473a5bea3 --- /dev/null +++ b/Win32-network/src/System/Win32/Async/Socket/ByteString/Lazy.hs @@ -0,0 +1,98 @@ +{-# LANGUAGE BangPatterns #-} + +module System.Win32.Async.Socket.ByteString.Lazy + ( send + , sendAll + , recv + ) where + + +import Control.Monad (when) +import qualified Data.ByteString as BS +import Data.ByteString.Lazy (ByteString) +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Lazy.Internal as BL (ByteString (..)) +import Data.ByteString.Unsafe (unsafeUseAsCStringLen) +import Data.Int (Int64) +import Foreign.C (CInt (..)) +import Foreign.Marshal.Array (allocaArray) +import Foreign.Ptr +import Foreign.StablePtr (StablePtr) +import Foreign.Storable + +import Network.Socket (Socket) +import qualified Network.Socket as Socket + +import System.Win32.Types (DWORD) + +import System.Win32.Async.WSABuf +import System.Win32.Async.Internal +import qualified System.Win32.Async.Socket.ByteString as Socket.ByteString + + +-- | Sending each chunk using vectored I/O. In one system call one can +-- transmit at most 1024 chunks, it is safe to transmit less than 4194304 +-- bytes. +-- +-- The socket must be in a connected state, and must be associated with an IO +-- completion port via +-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. +-- +send :: Socket + -> ByteString + -> IO Int64 +send sock bs = do + let cs = take maxNumChunks (BL.toChunks bs) + size = length cs + siz <- Socket.withFdSocket sock $ \fd -> allocaArray size $ \ptr -> + withPokes cs ptr $ \nwsabuf -> + wsaWaitForCompletion "send" (c_sendBuf fd ptr nwsabuf) + return $ fromIntegral siz + where + withPokes ss p f = loop ss p 0 0 + where + loop (c:cs) q k !nwsabuf + | k < maxNumBytes = unsafeUseAsCStringLen c $ \(ptr, strlen) -> do + poke q $ WSABuf (fromIntegral strlen) (castPtr ptr) + loop cs + (q `plusPtr` sizeOf (undefined :: WSABuf)) + (k + fromIntegral strlen) + (nwsabuf + 1) + | otherwise = f nwsabuf + loop _ _ _ nwsabuf = f nwsabuf + + maxNumBytes, maxNumChunks :: Int + maxNumBytes = 4194304 -- maximum number of bytes to transmit in one system call + maxNumChunks = 1024 -- maximum number of chunks to transmit in one system call + + +foreign import ccall safe "HsSendBuf" + c_sendBuf :: SOCKET + -> Ptr WSABuf -- ^ lpBuffers + -> DWORD -- ^ dwBufferCount + -> StablePtr b + -> IO () + +sendAll :: Socket + -> ByteString + -> IO () +sendAll _sock bs | BL.null bs = return () +sendAll sock bs = do + sent <- send sock bs + -- it is simpler than `Network.Socket.Lazy.sendAll` - waiting for sending + -- all the chunks is already perfomed by 'send'. + let bs' = BL.drop sent bs + when (sent >= 0 && not (BL.null bs')) $ sendAll sock bs' + +-- | Receive bytes from a socket, which must be in a connected state, and must +-- be associated with an IO completion port via +-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. +-- It can return less bytes than requested. +-- +recv :: Socket + -> Int + -> IO ByteString +recv sock size = toChunk <$> Socket.ByteString.recv sock size + where + toChunk bs | BS.null bs = BL.Empty + | otherwise = BL.Chunk bs BL.Empty diff --git a/Win32-network/src/System/Win32/Async/WSABuf.hsc b/Win32-network/src/System/Win32/Async/WSABuf.hsc new file mode 100644 index 00000000000..47d9182148e --- /dev/null +++ b/Win32-network/src/System/Win32/Async/WSABuf.hsc @@ -0,0 +1,33 @@ +{-# LANGUAGE NamedFieldPuns #-} + +#include + +module System.Win32.Async.WSABuf + ( WSABuf (..) + ) where + +import Data.Word (Word8) +import Foreign.Ptr (Ptr) +import Foreign.Storable (Storable (..)) +import System.Win32.Types (ULONG) + + +-- | 'WSABuf' is Haskell representation of 'WSABUF' struct. +-- +data WSABuf = WSABuf { + len :: ULONG, + buf :: Ptr Word8 + } + +instance Storable WSABuf where + sizeOf _ = (#const sizeof(WSABUF)) + alignment _ = (#alignment WSABUF) + + peek p = do + len <- (#peek WSABUF, len) p + buf <- (#peek WSABUF, buf) p + return $ WSABuf len buf + + poke p WSABuf {len, buf} = do + (#poke WSABUF, len) p len + (#poke WSABUF, buf) p buf diff --git a/Win32-network/src/System/Win32/NamedPipes.hsc b/Win32-network/src/System/Win32/NamedPipes.hsc index d8a1075ab18..9e5e5fb5fe8 100644 --- a/Win32-network/src/System/Win32/NamedPipes.hsc +++ b/Win32-network/src/System/Win32/NamedPipes.hsc @@ -21,6 +21,7 @@ module System.Win32.NamedPipes ( pIPE_ACCESS_DUPLEX, pIPE_ACCESS_INBOUND, pIPE_ACCESS_OUTBOUND, + fILE_FLAG_OVERLAPPED, PipeMode, pIPE_TYPE_BYTE, pIPE_TYPE_MESSAGE, @@ -104,6 +105,20 @@ pIPE_UNLIMITED_INSTANCES = #const PIPE_UNLIMITED_INSTANCES -- For full details see -- -- +-- To create a named pipe which can be associate with IO completion port on +-- needs to pass 'fILE_FLAG_OVERLAPPED' to 'OpenMode' argument, +-- e.g. +-- +-- > Win32.createNamedPipe pipeName +-- > (pIPE_ACCESS_DUPLEX .|. fILE_FLAG_OVERLAPPED) +-- > (pIPE_TYPE_BYTE .|. pIPE_READMODE_BYTE) +-- > pIPE_UNLIMITED_INSTANCES +-- > 512 +-- > 512 +-- > 0 +-- > Nothing +-- +-- createNamedPipe :: String -- ^ pipe name of form @\.\pipe\{pipename}@ -> OpenMode -> PipeMode diff --git a/Win32-network/test/Test/Async/Socket.hs b/Win32-network/test/Test/Async/Socket.hs index fc3207da1d9..18a02d6befe 100644 --- a/Win32-network/test/Test/Async/Socket.hs +++ b/Win32-network/test/Test/Async/Socket.hs @@ -1,6 +1,7 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} @@ -17,9 +18,10 @@ import Data.Functor (void) import Data.Foldable (foldl', traverse_) import GHC.IO.Exception (IOException (..)) -import qualified System.Win32.Async.IOManager as Async -import qualified System.Win32.Async.Socket as Async -import qualified System.Win32.Async.Socket.ByteString as Async +import qualified System.Win32.Async.IOManager as Async +import qualified System.Win32.Async.Socket as Async +import qualified System.Win32.Async.Socket.ByteString as Async +import qualified System.Win32.Async.Socket.ByteString.Lazy as Async.Lazy import Network.Socket (Socket, SockAddr (..)) import qualified Network.Socket as Socket @@ -45,9 +47,15 @@ tests = , testProperty "send and recv" (ioProperty . prop_send_recv) , testProperty "PingPong test" - $ withMaxSuccess 50 prop_PingPong + $ withMaxSuccess 100 prop_PingPong , testProperty "PingPongPipelined test" - $ withMaxSuccess 50 prop_PingPongPipelined + $ withMaxSuccess 100 prop_PingPongPipelined + , testGroup "vectored io" + [ testProperty "PingPong test" + $ withMaxSuccess 100 prop_PingPongLazy + , testProperty "PingPongPipelined test" + $ withMaxSuccess 100 prop_PingPongPipelinedLazy + ] ] -- The stock 'connect' is not interruptible. This tests is not reliable on @@ -190,6 +198,16 @@ prop_send_recv (LargeNonEmptyBS bs _size) = pure $ bs == bs' +-- +-- BinaryChannels using 'System.Win32.Socket.Bytestring' or +-- 'System.Win32.Socket.ByteString.Lazy' (vectored io). +-- + + +-- | 'BinaryChannel' defined in terms of +-- 'System.Win32.Socket.Bytestring.sendAll' and +-- 'System.Win32.Socket.Bytestring.recv' +-- socketToBinaryChannel :: Binary a => Socket -> BinaryChannel a @@ -214,12 +232,58 @@ socketToBinaryChannel sock = BinaryChannel { readChannel, writeChannel, closeCha closeChannel = Socket.close sock -prop_PingPong :: Positive Int +-- | Like 'socketToBinaryChannel' but using +-- 'System.Win32.Async.Socket.ByteString.Lazy' (vectored io). +-- +socketToLazyBinaryChannel :: Binary a + => Socket + -> BinaryChannel a +socketToLazyBinaryChannel sock = BinaryChannel { readChannel, writeChannel, closeChannel } + where + recvLazyLen :: Int -> IO BL.ByteString + recvLazyLen = go [] + where + go bufs !l | l <= 0 = return $ BL.concat (reverse bufs) + | otherwise = do + buf <- Async.Lazy.recv sock l + go (buf : bufs) (l - fromIntegral (BL.length buf)) + + readChannel b = do + -- putStrLn "readChannel: header" + s <- decode <$> Async.Lazy.recv sock 8 + -- putStrLn $ "readChannel: header: " ++ show s + if b + then do + -- putStrLn $ "recvLazyLen: " ++ show s + bs' <- recvLazyLen s + -- putStrLn $ "recvLazyLen: done" + pure $ Just $ decode bs' + else pure Nothing + + writeChannel b a = + do + let bs :: BL.ByteString + bs = encode a + size :: Int + size = bool (+1) id b (fromIntegral $ BL.length bs) + Async.Lazy.sendAll sock (encode size) + Async.Lazy.sendAll sock bs + `catch` (\(e :: IOException) -> putStrLn (show e) >> throwIO e) + + closeChannel = Socket.close sock + + +-- +-- Ping Pong Tests +-- + +test_PingPong :: (forall a. Binary a => Socket -> BinaryChannel a) + -> Int -> Blocking - -> LargeNonEmptyBS - -> Property -prop_PingPong (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = - ioProperty $ Async.withIOManager $ \iocp -> + -> ByteString + -> IO Bool +test_PingPong createBinaryChannel n blocking bs = + Async.withIOManager $ \iocp -> bracket ((,) <$> Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol <*> Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol) @@ -236,7 +300,7 @@ prop_PingPong (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = do (socket, _) <- Socket.accept sockIn Async.associateWithIOCompletionPort (Right socket) iocp - let channel = socketToBinaryChannel socket + let channel = createBinaryChannel socket unmask (runPingPongServer channel (constPingPongServer @ByteString)) `finally` putMVar lock () @@ -244,7 +308,7 @@ prop_PingPong (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = -- listening state accepting connections. Socket.connect sockOut addr' Async.associateWithIOCompletionPort (Right sockOut) iocp - let channelOut = socketToBinaryChannel sockOut + let channelOut = createBinaryChannel sockOut res <- runPingPongClient channelOut blocking tid (constPingPongClient n bs) -- this lock asserts that the server was terminated @@ -255,18 +319,36 @@ prop_PingPong (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = _ -> res == replicate (pred n) bs +prop_PingPong :: Positive Int + -> Blocking + -> LargeNonEmptyBS + -> Property +prop_PingPong (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = + ioProperty $ test_PingPong socketToBinaryChannel n blocking bs + +prop_PingPongLazy :: Positive Int + -> Blocking + -> LargeNonEmptyBS + -> Property +prop_PingPongLazy (Positive n) blocking (LargeNonEmptyBS bs _bufSize) = + ioProperty $ test_PingPong socketToLazyBinaryChannel n blocking bs + -prop_PingPongPipelined :: Blocking - -> NonEmptyList LargeNonEmptyBS - -> Property -prop_PingPongPipelined blocking (NonEmpty bss0) = - ioProperty $ Async.withIOManager $ \iocp -> +-- +-- Pipelined Ping Pong Tests +-- + +test_PingPongPipelined :: (forall a. Binary a => Socket -> BinaryChannel a) + -> Blocking + -> [ByteString] + -> IO Bool +test_PingPongPipelined createBinaryChannel blocking bss = + Async.withIOManager $ \iocp -> bracket ((,) <$> Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol <*> Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol) (\(x, y) -> Socket.close x >> Socket.close y) $ \(sockIn, sockOut) -> do - let bss = map getLargeNonEmptyBS bss0 lock <- newEmptyMVar -- fork a PingPong server @@ -278,7 +360,7 @@ prop_PingPongPipelined blocking (NonEmpty bss0) = do (socket, _) <- Socket.accept sockIn Async.associateWithIOCompletionPort (Right socket) iocp - let channel = socketToBinaryChannel socket + let channel = createBinaryChannel socket unmask (runPingPongServer channel (constPingPongServer @ByteString)) `finally` putMVar lock () @@ -286,7 +368,7 @@ prop_PingPongPipelined blocking (NonEmpty bss0) = -- listening state accepting connections. Socket.connect sockOut addr' Async.associateWithIOCompletionPort (Right sockOut) iocp - let channelOut = socketToBinaryChannel sockOut + let channelOut = createBinaryChannel sockOut res <- runPingPongClientPipelined channelOut blocking tid bss -- this lock asserts that the server was terminated @@ -299,3 +381,17 @@ prop_PingPongPipelined blocking (NonEmpty bss0) = _ -> True -- if we evalute this case branch, it means that -- killing blocked thread did not deadlock. + +prop_PingPongPipelined :: Blocking + -> NonEmptyList LargeNonEmptyBS + -> Property +prop_PingPongPipelined blocking (NonEmpty bss) = + ioProperty $ + test_PingPongPipelined socketToBinaryChannel blocking (map getLargeNonEmptyBS bss) + +prop_PingPongPipelinedLazy :: Blocking + -> NonEmptyList LargeNonEmptyBS + -> Property +prop_PingPongPipelinedLazy blocking (NonEmpty bss) = + ioProperty $ + test_PingPongPipelined socketToLazyBinaryChannel blocking (map getLargeNonEmptyBS bss) diff --git a/network-mux/src/Network/Mux/Bearer/Socket.hs b/network-mux/src/Network/Mux/Bearer/Socket.hs index f11d4cdb54a..f07d1fda861 100644 --- a/network-mux/src/Network/Mux/Bearer/Socket.hs +++ b/network-mux/src/Network/Mux/Bearer/Socket.hs @@ -21,8 +21,7 @@ import qualified Network.Socket as Socket #if !defined(mingw32_HOST_OS) import qualified Network.Socket.ByteString.Lazy as Socket (recv, sendAll) #else -import Data.Foldable (traverse_) -import qualified System.Win32.Async as Win32.Async +import qualified System.Win32.Async.Socket.ByteString.Lazy as Win32.Async #endif import qualified Network.Mux as Mx @@ -36,9 +35,10 @@ import qualified Network.Mux.Time as Mx -- | -- Create @'MuxBearer'@ from a socket. -- --- On Windows 'System.Win32.Async` operations are used to read and write. This --- means that the socket must be associated with the I/O completion port with --- 'System.Win32.Async.associateWithIOCompletionPort'. +-- On Windows 'System.Win32.Async` operations are used to read and write from +-- a socket. This means that the socket must be associated with the I/O +-- completion port with +-- 'System.Win32.Async.IOManager.associateWithIOCompletionPort'. -- -- Note: 'IOException's thrown by 'sendAll' and 'recv' are wrapped in -- 'MuxError'. @@ -74,7 +74,7 @@ socketAsMuxBearer tracer sd = recvLen' waitingOnNxtHeader l bufs = do traceWith tracer $ Mx.MuxTraceRecvStart $ fromIntegral l #if defined(mingw32_HOST_OS) - buf <- BL.fromStrict <$> Win32.Async.recv sd (fromIntegral l) + buf <- Win32.Async.recv sd (fromIntegral l) #else buf <- Socket.recv sd l #endif @@ -102,8 +102,7 @@ socketAsMuxBearer tracer sd = buf = Mx.encodeMuxSDU sdu' traceWith tracer $ Mx.MuxTraceSendStart sdu' #if defined(mingw32_HOST_OS) - -- TODO: issue #1430: vectored I/O on Windows - traverse_ (Win32.Async.sendAll sd) (BL.toChunks buf) + Win32.Async.sendAll sd buf #else Socket.sendAll sd buf #endif diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index 23b31652554..4c918b83e6c 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -261,10 +261,6 @@ test-suite test-network build-depends: Win32-network <0.2.0.0, Win32 >= 2.5.4.1 && <2.9 - if os(windows) - build-depends: Win32-network <0.2.0.0, - Win32 >= 2.5.4.1 && <2.9 - ghc-options: -Wall -Wno-unticked-promoted-constructors -fno-ignore-asserts diff --git a/ouroboros-network/test/Test/Pipe.hs b/ouroboros-network/test/Test/Pipe.hs index 8842ec1665b..5823628efee 100644 --- a/ouroboros-network/test/Test/Pipe.hs +++ b/ouroboros-network/test/Test/Pipe.hs @@ -40,7 +40,7 @@ import System.Process (createPipe) import System.IO (hClose) #endif -import Ouroboros.Network.Block (decodeTip, encodeTip) +import Ouroboros.Network.Block (encodeTip, decodeTip) import Ouroboros.Network.MockChain.Chain (Chain, ChainUpdate, Point) import qualified Ouroboros.Network.MockChain.Chain as Chain import qualified Ouroboros.Network.MockChain.ProducerState as CPS