Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

windows vectored io for sockets #1552

Merged
merged 9 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Win32-network/Win32-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ library
System.Win32.Async.ErrCode
System.Win32.Async.IOManager
System.Win32.Async.Internal
System.Win32.Async.WSABuf
System.Win32.Async.Socket
System.Win32.Async.Socket.ByteString
System.Win32.Async.Socket.ByteString.Lazy
c-sources: cbits/Win32Async.c
cbits/Win32Socket.c
build-depends: base >= 4.5 && < 4.13,
Expand Down
8 changes: 2 additions & 6 deletions Win32-network/cbits/Win32Socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ typedef struct _WSA_PER_IO_DATA {
} WSA_PER_IO_DATA;

DllExport
void HsSendBuf(SOCKET s, CHAR *buf, ULONG len, HsStablePtr userData)
void HsSendBuf(SOCKET s, WSABUF *lpBuffers, DWORD dwBufferCount, HsStablePtr userData)
{
int res;
WSA_PER_IO_DATA *perIoDataPtr;
Expand All @@ -26,11 +26,7 @@ void HsSendBuf(SOCKET s, CHAR *buf, ULONG len, HsStablePtr userData)
perIoDataPtr->WSAOverlapped.OffsetHigh = 0;
perIoDataPtr->UserData = userData;

WSABUF wsaBuf;
wsaBuf.len = len;
wsaBuf.buf = buf;

res = WSASend(s, &wsaBuf, 1, NULL, 0, &(perIoDataPtr->WSAOverlapped), NULL);
res = WSASend(s, lpBuffers, dwBufferCount, NULL, 0, &(perIoDataPtr->WSAOverlapped), NULL);
if (!res) {
DWORD error;
error = WSAGetLastError();
Expand Down
7 changes: 5 additions & 2 deletions Win32-network/src/System/Win32/Async.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
-- | This is the main module of `Win32-network` package which should be used
-- with "System.Win32.Async.Socket.ByteString",
-- "System.Win32.Async.Socket.ByteString.Lazy" or "System.Win32.Async.File" for
-- sending and/or receiving.
--
module System.Win32.Async
( module System.Win32.Async.File
, module System.Win32.Async.ErrCode
, module System.Win32.Async.IOManager
, module System.Win32.Async.Socket
, module System.Win32.Async.Socket.ByteString
) where

import System.Win32.Async.IOManager
import System.Win32.Async.File
import System.Win32.Async.ErrCode
import System.Win32.Async.Socket
import System.Win32.Async.Socket.ByteString
26 changes: 19 additions & 7 deletions Win32-network/src/System/Win32/Async/File.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import qualified System.Win32.Types as Win32
import System.Win32.Async.ErrCode
import System.Win32.Async.Internal

-- | Read a given number of bytes from a 'HANDLE'.
-- | Read a given number of bytes from a 'HANDLE'. The 'HANDLE' must be
-- opened with 'System.Win32.fILE_FLAG_OVERLAPPED' and associated with an IO
-- completion port via
-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'.
--
readHandle :: HANDLE
-> Int
Expand All @@ -47,7 +50,9 @@ foreign import ccall safe "HsAsyncRead"
-> IO ()


-- | Write a 'ByteString' to a HANDLE.
-- | Write a 'ByteString' to a 'HANDLE'. The 'HANDLE' must be opened with
-- 'System.Win32.fILE_FLAG_OVERLAPPED' and associated with an IO completion port
-- via 'System.Win32.Async.IOManager.associateWithIOCompletionPort'.
--
writeHandle :: HANDLE
-> ByteString
Expand All @@ -64,16 +69,23 @@ foreign import ccall safe "HsAsyncWrite"
-> IO ()


-- | Connect named pipe aka accept a connection. The 'HANDLE' must be opened
-- with 'System.Win32.FILE_FLAG_OVERLLAPPED' and associated with IO completion
-- port via 'System.Win32.Async.IOManager.associateWithIOCompletionPort'.
--
-- [msdn documentation](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe)
--
connectNamedPipe :: HANDLE -> IO ()
connectNamedPipe h = void $ waitForCompletion "connectNamedPipe" $ \ptr -> do
c_ConnectNamedPipe h ptr
errCode <- Win32.getLastError
-- connectNamedPipe can error with:
-- * 'ERROR_PIPE_LISTENING' - the pipe is listening, we need to wait more
-- * 'ERROR_PIPE_CONNECTED' - the pipe is already connected
-- * 'ERROR_NO_DATA' - previous client has not disconnected, we
-- should error
-- * 'ERROR_IO_PENDING' - IO is pending, 'waitForCompletion' should
--
-- 'ERROR_PIPE_LISTENING' - the pipe is listening, we need to wait more
-- 'ERROR_PIPE_CONNECTED' - the pipe is already connected
-- 'ERROR_NO_DATA' - previous client has not disconnected, we
-- should error
-- 'ERROR_IO_PENDING' - IO is pending, 'waitForCompletion' should
-- resolve this
unless ( errCode == eRROR_PIPE_LISTENING
|| errCode == eRROR_PIPE_CONNECTED
Expand Down
10 changes: 6 additions & 4 deletions Win32-network/src/System/Win32/Async/IOManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import qualified System.Win32.Types as Win32
import qualified System.Win32.File as Win32 (closeHandle)
import System.Win32.Async.ErrCode

-- | New type wrapper which holds 'HANDLE' of the I/O completion port.
-- <https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport>
-- | New type wrapper which holds 'HANDLE' of an [I/O completion
-- port](https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport).
--
newtype IOCompletionPort = IOCompletionPort HANDLE
deriving Show

closeIOCompletionPort :: IOCompletionPort -> IO ()
closeIOCompletionPort (IOCompletionPort iocp) = Win32.closeHandle iocp

-- | Windows documentation:
-- | [msdn documentation](https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport)
--
createIOCompletionPort
:: DWORD -- ^ number of concurrent threads
Expand All @@ -57,7 +57,9 @@ foreign import ccall unsafe "windows.h CreateIoCompletionPort"
c_CreateIoCompletionPort :: HANDLE -> HANDLE -> Ptr () -> DWORD -> IO HANDLE

-- | Associate with I/O completion port. This can be used multiple times on
-- a file descriptor.
-- a file descriptor. A 'HANDLE' must be opend with
-- 'System.Win32.fILE_FLAG_OVERLAPPED' flag for this call to succeed; 'Socket's
-- do not require special initialisation.
--
associateWithIOCompletionPort :: Either HANDLE Socket
-> IOCompletionPort
Expand Down
11 changes: 10 additions & 1 deletion Win32-network/src/System/Win32/Async/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
module System.Win32.Async.Internal where
module System.Win32.Async.Internal
( SOCKET
, CInt (..)
, waitForCompletion
, wsaWaitForCompletion
) where

import Control.Concurrent

import Foreign.C (CInt (..))
import Foreign.StablePtr (StablePtr, newStablePtr)
import System.Win32.Types (ErrCode)
import qualified System.Win32.Types as Win32
import System.Win32.Async.ErrCode


type SOCKET = CInt

waitForCompletion :: String
-> (StablePtr (MVar (Either ErrCode Int)) -> IO ())
-> IO Int
Expand Down
19 changes: 11 additions & 8 deletions Win32-network/src/System/Win32/Async/Socket.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE InterruptibleFFI #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module System.Win32.Async.Socket
Expand All @@ -16,15 +17,15 @@ import Data.Word
import Foreign.C (CInt (..))
import Foreign.Ptr (Ptr)
import Foreign.StablePtr (StablePtr)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Storable (Storable (poke))

import Network.Socket (Socket, SockAddr)
import qualified Network.Socket as Socket

import System.Win32.Types
import System.Win32.Async.Internal


type SOCKET = CInt
import System.Win32.Async.WSABuf


sendBuf :: Socket
Expand All @@ -34,12 +35,14 @@ sendBuf :: Socket
sendBuf sock buf size = Socket.withFdSocket sock $ \fd ->
-- on Windows sockets are Word32, GHC represents file descriptors with CInt
-- which is Int32.
wsaWaitForCompletion "sendBuf" (c_sendBuf fd buf (fromIntegral size))
alloca $ \bufs_ptr -> do
poke bufs_ptr WSABuf {buf, len = fromIntegral size}
wsaWaitForCompletion "sendBuf" (c_sendBuf fd bufs_ptr 1)

foreign import ccall safe "HsSendBuf"
c_sendBuf :: SOCKET
-> Ptr Word8
-> Word32
-> Ptr WSABuf -- ^ lpBuffers
-> DWORD -- ^ dwBufferCount
-> StablePtr b
-> IO ()

Expand All @@ -51,7 +54,7 @@ connect :: Socket -> SockAddr -> IO ()
connect sock addr = do
v <- newEmptyMVar
_ <- mask_ $ forkIOWithUnmask $ \unmask ->
unmask (Socket.connect sock addr >> putMVar v Nothing)
(unmask (Socket.connect sock addr) >> putMVar v Nothing)
`catch` (\(e :: IOException) -> putMVar v (Just e))
r <- takeMVar v
case r of
Expand All @@ -73,7 +76,7 @@ accept :: Socket -> IO (Socket, SockAddr)
accept sock = do
v <- newEmptyMVar
_ <- mask_ $ forkIOWithUnmask $ \unmask ->
unmask (Socket.accept sock >>= putMVar v . Right)
(unmask (Socket.accept sock) >>= putMVar v . Right)
`catch` (\(e :: IOException) -> putMVar v (Left e))
r <- takeMVar v
case r of
Expand Down
11 changes: 9 additions & 2 deletions Win32-network/src/System/Win32/Async/Socket/ByteString.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import Network.Socket (Socket)
import System.Win32.Async.Socket


-- | Send a 'ByteString' over a socket, which must be in a connected state.
-- Returns number of bytes sent.
-- | Send a 'ByteString' over a socket, which must be in a connected state, and
-- must be associated with an IO completion port via
-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. Returns number
-- of bytes sent.
--
send :: Socket
-> ByteString
Expand All @@ -42,6 +44,11 @@ sendAll sock bs = do
$ sendAll sock (BS.drop sent bs)


-- | Recv a 'ByteString' from a socket, which must be in a connected state, and
-- must be associated with an IO completion port via
-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'. It may return
-- less bytes than requested.
--
recv :: Socket
-> Int
-> IO ByteString
Expand Down
98 changes: 98 additions & 0 deletions Win32-network/src/System/Win32/Async/Socket/ByteString/Lazy.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{-# LANGUAGE BangPatterns #-}

module System.Win32.Async.Socket.ByteString.Lazy
( send
, sendAll
, recv
) where


import Control.Monad (when)
import qualified Data.ByteString as BS
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Internal as BL (ByteString (..))
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)
import Data.Int (Int64)
import Foreign.C (CInt (..))
import Foreign.Marshal.Array (allocaArray)
import Foreign.Ptr
import Foreign.StablePtr (StablePtr)
import Foreign.Storable

import Network.Socket (Socket)
import qualified Network.Socket as Socket

import System.Win32.Types (DWORD)

import System.Win32.Async.WSABuf
import System.Win32.Async.Internal
import qualified System.Win32.Async.Socket.ByteString as Socket.ByteString


-- | Sending each chunk using vectored I/O. In one system call one can
-- transmit at most 1024 chunks, it is safe to transmit less than 4194304
-- bytes.
--
-- The socket must be in a connected state, and must be associated with an IO
-- completion port via
-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'.
--
send :: Socket
-> ByteString
-> IO Int64
send sock bs = do
let cs = take maxNumChunks (BL.toChunks bs)
size = length cs
siz <- Socket.withFdSocket sock $ \fd -> allocaArray size $ \ptr ->
withPokes cs ptr $ \nwsabuf ->
wsaWaitForCompletion "send" (c_sendBuf fd ptr nwsabuf)
return $ fromIntegral siz
where
withPokes ss p f = loop ss p 0 0
where
loop (c:cs) q k !nwsabuf
| k < maxNumBytes = unsafeUseAsCStringLen c $ \(ptr, strlen) -> do
poke q $ WSABuf (fromIntegral strlen) (castPtr ptr)
loop cs
(q `plusPtr` sizeOf (undefined :: WSABuf))
(k + fromIntegral strlen)
(nwsabuf + 1)
| otherwise = f nwsabuf
loop _ _ _ nwsabuf = f nwsabuf

maxNumBytes, maxNumChunks :: Int
maxNumBytes = 4194304 -- maximum number of bytes to transmit in one system call
maxNumChunks = 1024 -- maximum number of chunks to transmit in one system call


foreign import ccall safe "HsSendBuf"
c_sendBuf :: SOCKET
-> Ptr WSABuf -- ^ lpBuffers
-> DWORD -- ^ dwBufferCount
-> StablePtr b
-> IO ()

sendAll :: Socket
-> ByteString
-> IO ()
sendAll _sock bs | BL.null bs = return ()
sendAll sock bs = do
sent <- send sock bs
-- it is simpler than `Network.Socket.Lazy.sendAll` - waiting for sending
-- all the chunks is already perfomed by 'send'.
let bs' = BL.drop sent bs
when (sent >= 0 && not (BL.null bs')) $ sendAll sock bs'

-- | Receive bytes from a socket, which must be in a connected state, and must
-- be associated with an IO completion port via
-- 'System.Win32.Async.IOManager.associateWithIOCompletionProt'.
-- It can return less bytes than requested.
--
recv :: Socket
-> Int
-> IO ByteString
recv sock size = toChunk <$> Socket.ByteString.recv sock size
where
toChunk bs | BS.null bs = BL.Empty
| otherwise = BL.Chunk bs BL.Empty
33 changes: 33 additions & 0 deletions Win32-network/src/System/Win32/Async/WSABuf.hsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{-# LANGUAGE NamedFieldPuns #-}

#include <winsock2.h>

module System.Win32.Async.WSABuf
( WSABuf (..)
) where

import Data.Word (Word8)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable (..))
import System.Win32.Types (ULONG)


-- | 'WSABuf' is Haskell representation of 'WSABUF' struct.
--
data WSABuf = WSABuf {
len :: ULONG,
coot marked this conversation as resolved.
Show resolved Hide resolved
buf :: Ptr Word8
}

instance Storable WSABuf where
sizeOf _ = (#const sizeof(WSABUF))
alignment _ = (#alignment WSABUF)

peek p = do
len <- (#peek WSABUF, len) p
buf <- (#peek WSABUF, buf) p
return $ WSABuf len buf

poke p WSABuf {len, buf} = do
(#poke WSABUF, len) p len
(#poke WSABUF, buf) p buf
Loading