Skip to content

Commit

Permalink
Merge pull request #66 from kazu-yamamoto/sendto
Browse files Browse the repository at this point in the history
The new archtecure
  • Loading branch information
kazu-yamamoto authored Jul 29, 2024
2 parents 1b7779d + 6265552 commit d2ce1b0
Show file tree
Hide file tree
Showing 20 changed files with 461 additions and 440 deletions.
15 changes: 13 additions & 2 deletions Network/QUIC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,23 @@ module Network.QUIC (
sendStreamMany,

-- * Information
ConnectionInfo (..),
ConnectionInfo,
getConnectionInfo,
version,
cipher,
alpn,
handshakeMode,
retry,
localSockAddr,
remoteSockAddr,
localCID,
remoteCID,

-- * Statistics
ConnectionStats (..),
ConnectionStats,
getConnectionStats,
txBytes,
rxBytes,

-- * Synchronization
wait0RTTReady,
Expand Down
1 change: 0 additions & 1 deletion Network/QUIC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ module Network.QUIC.Client (
ccVersions,
-- , ccCredentials
ccValidate,
ccAutoMigration,

-- * Resumption
ResumptionInfo,
Expand Down
28 changes: 15 additions & 13 deletions Network/QUIC/Client/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ module Network.QUIC.Client.Reader (
recvClient,
ConnectionControl (..),
controlConnection,
clientSocket,
) where

import Data.List (intersect)
import Network.Socket (getSocketName)
import Network.UDP
import Network.Socket (Socket, close, getSocketName)
import Network.Socket.ByteString (recvFrom)
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E

Expand All @@ -23,11 +24,12 @@ import Network.QUIC.Packet
import Network.QUIC.Parameters
import Network.QUIC.Qlog
import Network.QUIC.Recovery
import Network.QUIC.Socket
import Network.QUIC.Types

-- | readerClient dies when the socket is closed.
readerClient :: UDPSocket -> Connection -> IO ()
readerClient cs0@(UDPSocket s0 _ _) conn = handleLogUnit logAction $ do
readerClient :: Socket -> Connection -> IO ()
readerClient s0 conn = handleLogUnit logAction $ do
wait
loop
where
Expand All @@ -42,10 +44,11 @@ readerClient cs0@(UDPSocket s0 _ _) conn = handleLogUnit logAction $ do
ito <- readMinIdleTimeout conn
mbs <-
timeout ito "readeClient" $
recv cs0
recvFrom s0 2048 -- fixme
case mbs of
Nothing -> close cs0
Just bs -> do
Nothing -> close s0
Just (bs, peersa) -> do
setPeerSockAddr conn peersa
now <- getTimeMicrosecond
let quicBit = greaseQuicBit $ getMyParameters conn
pkts <- decodePackets bs (not quicBit)
Expand Down Expand Up @@ -139,10 +142,9 @@ controlConnection' conn ActiveMigration = do

rebind :: Connection -> Microseconds -> IO ()
rebind conn microseconds = do
cs0 <- getSocket conn
cs <- natRebinding cs0
cs0' <- setSocket conn cs
let reader = readerClient cs conn
peersa <- getPeerSockAddr conn
newSock <- natRebinding peersa
oldSock <- setSocket conn newSock
let reader = readerClient newSock conn
forkIO reader >>= addReader conn
-- Using cs0' just in case.
fire conn microseconds $ close cs0'
fire conn microseconds $ close oldSock
22 changes: 10 additions & 12 deletions Network/QUIC/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ module Network.QUIC.Client.Run (
) where

import qualified Network.Socket as NS
import Network.UDP (UDPSocket (..))
import qualified Network.UDP as UDP
import UnliftIO.Async
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
Expand Down Expand Up @@ -107,13 +105,14 @@ runClient conf client0 isICVN verInfo = do

createClientConnection :: ClientConfig -> VersionInfo -> IO ConnRes
createClientConnection conf@ClientConfig{..} verInfo = do
us@(UDPSocket _ sa _) <-
UDP.clientSocket ccServerName ccPortName (not ccAutoMigration)
(sock, peersa) <- clientSocket ccServerName ccPortName
q <- newRecvQ
sref <- newIORef us
sref <- newIORef sock
psaref <- newIORef peersa
let send = \buf siz -> do
cs <- readIORef sref
UDP.sendBuf cs buf siz
s <- readIORef sref
sa <- readIORef psaref
void $ NS.sendBufTo s buf siz sa
recv = recvClient q
myCID <- newCID
peerCID <- newCID
Expand All @@ -135,6 +134,7 @@ createClientConnection conf@ClientConfig{..} verInfo = do
qLog
ccHooks
sref
psaref
q
send
recv
Expand All @@ -143,16 +143,14 @@ createClientConnection conf@ClientConfig{..} verInfo = do
initializeCoder conn InitialLevel $ initialSecrets ver peerCID
setupCryptoStreams conn -- fixme: cleanup
let pktSiz0 = fromMaybe 0 ccPacketSize
pktSiz = (defaultPacketSize sa `max` pktSiz0) `min` maximumPacketSize sa
pktSiz = (defaultPacketSize peersa `max` pktSiz0) `min` maximumPacketSize peersa
setMaxPacketSize conn pktSiz
setInitialCongestionWindow (connLDCC conn) pktSiz
setAddressValidated conn
let reader = readerClient us conn -- dies when s0 is closed.
let reader = readerClient sock conn -- dies when s0 is closed.
return $ ConnRes conn myAuthCIDs reader

-- | Creating a new socket and execute a path validation
-- with a new connection ID. Typically, this is used
-- for migration in the case where 'ccAutoMigration' is 'False'.
-- But this can also be used even when the value is 'True'.
-- with a new connection ID.
migrate :: Connection -> IO Bool
migrate conn = controlConnection conn ActiveMigration
57 changes: 29 additions & 28 deletions Network/QUIC/Closer.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.QUIC.Closer (closure) where

import Foreign.Marshal.Alloc
import Foreign.Ptr
import qualified Network.UDP as UDP
import qualified Network.Socket as NS
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E

Expand Down Expand Up @@ -39,27 +38,35 @@ closure conn ldcc (Left se)

closure' :: Connection -> LDCC -> Frame -> IO ()
closure' conn ldcc frame = do
killReaders conn
let bufsiz = maximumUdpPayloadSize
sendBuf <- mallocBytes bufsiz
recvBuf <- mallocBytes bufsiz
siz <- encodeCC conn (SizedBuffer sendBuf bufsiz) frame
us <- getSocket conn
let clos = do
UDP.close us
-- This is just in case.
-- UDP.close never throw exceptions.
getSocket conn >>= UDP.close
send = UDP.sendBuf us sendBuf siz
recv = UDP.recvBuf us recvBuf bufsiz
hook = onCloseCompleted $ connHooks conn
sock <- getSocket conn
peersa <- getPeerSockAddr conn
-- send
let sbuf@(SizedBuffer sendBuf _) = encryptRes conn
siz <- encodeCC conn sbuf frame
let send = void $ NS.sendBufTo sock sendBuf siz peersa
-- recv and clos
killReaders conn -- client only
(recv, freeRecvBuf, clos) <-
if isServer conn
then return (void $ connRecv conn, return (), return ())
else do
let bufsiz = maximumUdpPayloadSize
recvBuf <- mallocBytes bufsiz
let recv' = void $ NS.recvBuf sock recvBuf bufsiz
free' = free recvBuf
clos' = do
NS.close sock
-- This is just in case.
getSocket conn >>= NS.close
return (recv', free', clos')
-- hook
let hook = onCloseCompleted $ connHooks conn
pto <- getPTO ldcc
void $ forkFinally (closer conn pto send recv hook) $ \e -> do
case e of
Left e' -> connDebugLog conn $ "closure' " <> bhow e'
Right _ -> return ()
free sendBuf
free recvBuf
freeRecvBuf
clos

encodeCC :: Connection -> SizedBuffer -> Frame -> IO Int
Expand Down Expand Up @@ -93,12 +100,8 @@ encodeCC conn res0@(SizedBuffer sendBuf0 bufsiz0) frame = do
else
return 0

closer :: Connection -> Microseconds -> IO () -> IO Int -> IO () -> IO ()
closer _conn (Microseconds pto) send recv hook
#if defined(mingw32_HOST_OS)
| isServer _conn = send
#endif
| otherwise = loop (3 :: Int)
closer :: Connection -> Microseconds -> IO () -> IO () -> IO () -> IO ()
closer _conn (Microseconds pto) send recv hook = loop (3 :: Int)
where
loop 0 = return ()
loop n = do
Expand All @@ -107,14 +110,12 @@ closer _conn (Microseconds pto) send recv hook
mx <- timeout (Microseconds (pto !>>. 1)) "closer 1" recv
case mx of
Nothing -> hook
Just 0 -> return ()
Just _ -> loop (n - 1)
Just () -> loop (n - 1)
skip tmo@(Microseconds duration) base = do
mx <- timeout tmo "closer 2" recv
case mx of
Nothing -> return ()
Just 0 -> return ()
Just _ -> do
Just () -> do
Microseconds elapsed <- getElapsedTimeMicrosecond base
let duration' = duration - elapsed
when (duration' >= 5000) $ skip (Microseconds duration') base
3 changes: 0 additions & 3 deletions Network/QUIC/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ data ClientConfig = ClientConfig
, ccPacketSize :: Maybe Int
-- ^ QUIC packet size (UDP payload size)
, ccDebugLog :: Bool
, ccAutoMigration :: Bool
-- ^ If 'True', use a unconnected socket for auto migration. Otherwise, use a connected socket.
}

-- | The default value for client configuration.
Expand All @@ -107,7 +105,6 @@ defaultClientConfig =
, ccResumption = defaultResumptionInfo
, ccPacketSize = Nothing
, ccDebugLog = False
, ccAutoMigration = True
}

----------------------------------------------------------------
Expand Down
23 changes: 16 additions & 7 deletions Network/QUIC/Connection/Misc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Network.QUIC.Connection.Misc (
getSocket,
setSocket,
clearSocket,
getPeerSockAddr,
setPeerSockAddr,
getPeerAuthCIDs,
setPeerAuthCIDs,
getClientDstCID,
Expand All @@ -31,7 +33,7 @@ module Network.QUIC.Connection.Misc (
abortConnection,
) where

import Network.UDP
import Network.Socket (SockAddr, Socket)
import System.Mem.Weak
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
Expand Down Expand Up @@ -64,15 +66,22 @@ getOriginalVersion = chosenVersion . origVersionInfo

----------------------------------------------------------------

getSocket :: Connection -> IO UDPSocket
getSocket Connection{..} = readIORef udpSocket
getSocket :: Connection -> IO Socket
getSocket Connection{..} = readIORef connSocket

setSocket :: Connection -> UDPSocket -> IO UDPSocket
setSocket Connection{..} sock = atomicModifyIORef' udpSocket $
setSocket :: Connection -> Socket -> IO Socket
setSocket Connection{..} sock = atomicModifyIORef' connSocket $
\sock0 -> (sock, sock0)

clearSocket :: Connection -> IO UDPSocket
clearSocket Connection{..} = atomicModifyIORef' udpSocket (undefined,)
-- fixme
clearSocket :: Connection -> IO Socket
clearSocket Connection{..} = atomicModifyIORef' connSocket (undefined,)

getPeerSockAddr :: Connection -> IO SockAddr
getPeerSockAddr Connection{..} = readIORef peerSockAddr

setPeerSockAddr :: Connection -> SockAddr -> IO ()
setPeerSockAddr Connection{..} sa = writeIORef peerSockAddr sa

----------------------------------------------------------------

Expand Down
18 changes: 12 additions & 6 deletions Network/QUIC/Connection/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import Data.X509 (CertificateChain)
import Foreign.Marshal.Alloc
import Foreign.Ptr (nullPtr)
import Network.Control (Rate, RxFlow, TxFlow, newRate, newRxFlow, newTxFlow)
import Network.Socket (SockAddr, Socket)
import Network.TLS.QUIC
import Network.UDP (UDPSocket)
import UnliftIO.Concurrent
import UnliftIO.STM

Expand Down Expand Up @@ -199,7 +199,7 @@ data Connection = Connection
, connRecv :: ~Recv -- ~ for testing
-- Manage
, connRecvQ :: RecvQ
, udpSocket :: ~(IORef UDPSocket)
, connSocket :: IORef Socket
, readers :: IORef (IO ())
, mainThreadId :: ThreadId
, controlRate :: Rate
Expand All @@ -213,6 +213,7 @@ data Connection = Connection
, -- Peer
peerParameters :: IORef Parameters
, peerCIDDB :: TVar CIDDB
, peerSockAddr :: IORef SockAddr
, -- Queues
inputQ :: InputQ
, cryptoQ :: CryptoQ
Expand Down Expand Up @@ -287,12 +288,14 @@ newConnection
-> DebugLogger
-> QLogger
-> Hooks
-> IORef UDPSocket
-> IORef Socket
-> IORef SockAddr
-> RecvQ
-> Send
-> Recv
-> IO Connection
newConnection rl myparams verInfo myAuthCIDs peerAuthCIDs debugLog qLog hooks sref recvQ ~send ~recv = do
newConnection rl myparams verInfo myAuthCIDs peerAuthCIDs debugLog qLog hooks sref psaref recvQ ~send ~recv = do
-- ~ for testing
outQ <- newTQueueIO
let put x = atomically $ writeTQueue outQ $ OutRetrans x
connstate <- newConnState rl
Expand All @@ -314,6 +317,7 @@ newConnection rl myparams verInfo myAuthCIDs peerAuthCIDs debugLog qLog hooks sr
-- Peer
<*> newIORef baseParameters
<*> newTVarIO (newCIDDB peerCID)
<*> return psaref
-- Queues
<*> newTQueueIO
<*> newTQueueIO
Expand Down Expand Up @@ -380,7 +384,8 @@ clientConnection
-> DebugLogger
-> QLogger
-> Hooks
-> IORef UDPSocket
-> IORef Socket
-> IORef SockAddr
-> RecvQ
-> Send
-> Recv
Expand All @@ -396,7 +401,8 @@ serverConnection
-> DebugLogger
-> QLogger
-> Hooks
-> IORef UDPSocket
-> IORef Socket
-> IORef SockAddr
-> RecvQ
-> Send
-> Recv
Expand Down
Loading

0 comments on commit d2ce1b0

Please sign in to comment.