From c59bb7a117ba1f65484ce241f89c89fb06698205 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 28 Jan 2020 12:59:31 +0100 Subject: [PATCH] clientSubscriptionWorker A subscription worker which works over ClientSnocket. It is integrated into data diffusion `Ouroboros.Network.Diffusion`. * `ouroboros-network` - compiler and runs all its tests on Windows. * `ouroboros-consensus` - windows support is tracked in #1082 --- ouroboros-network/ouroboros-network.cabal | 1 + .../src/Ouroboros/Network/Diffusion.hs | 28 ++++--- .../src/Ouroboros/Network/NodeToClient.hs | 65 +++++++-------- .../src/Ouroboros/Network/NodeToNode.hs | 21 ++--- .../src/Ouroboros/Network/Snocket.hs | 3 + .../Ouroboros/Network/Subscription/Client.hs | 80 +++++++++++++++++++ .../src/Ouroboros/Network/Subscription/Dns.hs | 1 + .../src/Ouroboros/Network/Subscription/Ip.hs | 26 +++--- .../Ouroboros/Network/Subscription/Worker.hs | 25 +++--- .../src/Ouroboros/Network/Tracers.hs | 29 ++++--- ouroboros-network/test/Test/PeerState.hs | 2 +- ouroboros-network/test/Test/Subscription.hs | 1 + 12 files changed, 187 insertions(+), 95 deletions(-) create mode 100644 ouroboros-network/src/Ouroboros/Network/Subscription/Client.hs diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index b51532ab81f..9dc3880f7b1 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -78,6 +78,7 @@ library Ouroboros.Network.Subscription Ouroboros.Network.Subscription.Ip Ouroboros.Network.Subscription.Dns + Ouroboros.Network.Subscription.Client Ouroboros.Network.Subscription.Subscriber Ouroboros.Network.Subscription.PeerState Ouroboros.Network.Subscription.Worker diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index a228dfee051..53dcc7c9861 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -28,7 +28,7 @@ import Network.Mux (MuxTrace (..), WithMuxBearer (..)) import Network.Socket (SockAddr, AddrInfo) import qualified Network.Socket as Socket -import Ouroboros.Network.Snocket (SocketSnocket, LocalSnocket) +import Ouroboros.Network.Snocket (LocalAddress, SocketSnocket, LocalSnocket) import qualified Ouroboros.Network.Snocket as Snocket import Ouroboros.Network.Protocol.Handshake.Type (Handshake) @@ -57,23 +57,24 @@ import Ouroboros.Network.Subscription.Worker (LocalAddresses (..)) import Ouroboros.Network.Tracers data DiffusionTracers = DiffusionTracers { - dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr)) + dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr)) -- ^ IP subscription tracer - , dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr)) + , dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr)) -- ^ DNS subscription tracer - , dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace) + , dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace) -- ^ DNS resolver tracer - , dtMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) + , dtMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) -- ^ Mux tracer - , dtMuxLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) + , dtMuxLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace) -- ^ Mux tracer for local clients - , dtHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) + , dtHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term))) -- ^ Handshake protocol tracer - , dtHandshakeLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) + , dtHandshakeLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))) -- ^ Handshake protocol tracer for local clients - , dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace) + , dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace) + , dtLocalErrorPolicyTracer :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace) } @@ -122,7 +123,7 @@ data DiffusionApplications = DiffusionApplications { DictVersion (OuroborosApplication 'ResponderApp - (ConnectionId SockAddr) + (ConnectionId LocalAddress) NodeToClientProtocols IO ByteString @@ -194,6 +195,7 @@ runDataDiffusion tracers , dtHandshakeTracer , dtHandshakeLocalTracer , dtErrorPolicyTracer + , dtLocalErrorPolicyTracer } = tracers initiatorLocalAddresses :: LocalAddresses SockAddr @@ -220,14 +222,14 @@ runDataDiffusion tracers remoteErrorPolicy = NodeToNode.remoteNetworkErrorPolicy <> daErrorPolicies localErrorPolicy = NodeToNode.localNetworkErrorPolicy <> daErrorPolicies - runLocalServer :: SocketSnocket -> NetworkMutableState SockAddr -> IO Void + runLocalServer :: LocalSnocket -> NetworkMutableState LocalAddress -> IO Void runLocalServer sn networkLocalState = NodeToClient.withServer sn (NetworkServerTracers dtMuxLocalTracer dtHandshakeLocalTracer - dtErrorPolicyTracer) + dtLocalErrorPolicyTracer) networkLocalState (Snocket.localAddressFromPath daLocalAddress) (daLocalResponderApplication applications) @@ -251,7 +253,7 @@ runDataDiffusion tracers -> IO Void runIpSubscriptionWorker sn networkState = NodeToNode.ipSubscriptionWorker sn - (NetworkIPSubscriptionTracers + (NetworkSubscriptionTracers dtMuxTracer dtHandshakeTracer dtErrorPolicyTracer diff --git a/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs b/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs index 0c8395e8257..807f5db7142 100644 --- a/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs +++ b/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs @@ -29,9 +29,10 @@ module Ouroboros.Network.NodeToClient ( , withServer_V1 , withServer - , NetworkIPSubscriptionTracers (..) - , IPSubscriptionParams - , SubscriptionParams (..) + + , NetworkClientSubcriptionTracers + , NetworkSubscriptionTracers (..) + , ClientSubscriptionParams (..) , ncSubscriptionWorker , ncSubscriptionWorker_V1 @@ -60,14 +61,14 @@ module Ouroboros.Network.NodeToClient ( , DecoderFailureOrTooMuchInput , Handshake , LocalAddresses (..) - , IPSubscriptionTarget (..) , SubscriptionTrace (..) - , WithIPList (..) ) where import qualified Control.Concurrent.Async as Async import Control.Exception (IOException) import qualified Data.ByteString.Lazy as BL +import Data.Functor.Identity (Identity (..)) +import Data.Functor.Contravariant (contramap) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock @@ -79,8 +80,6 @@ import qualified Codec.CBOR.Term as CBOR import Codec.Serialise (Serialise (..), DeserialiseFailure) import Codec.SerialiseTerm -import qualified Network.Socket as Socket - import Network.Mux hiding (MiniProtocolLimits(..)) import Network.TypedProtocol.Driver.ByteLimit (DecoderFailureOrTooMuchInput) import Network.TypedProtocol.Driver (TraceSendRecv (..)) @@ -95,12 +94,9 @@ import Ouroboros.Network.Protocol.Handshake.Type import Ouroboros.Network.Protocol.Handshake.Version import Ouroboros.Network.Snocket import Ouroboros.Network.Socket -import Ouroboros.Network.Subscription.Ip (IPSubscriptionParams, SubscriptionParams (..)) -import qualified Ouroboros.Network.Subscription.Ip as Subscription -import Ouroboros.Network.Subscription.Ip ( IPSubscriptionTarget (..) - , WithIPList (..) - , SubscriptionTrace (..) - ) +import Ouroboros.Network.Subscription.Client ( ClientSubscriptionParams (..) ) +import qualified Ouroboros.Network.Subscription.Client as Subscription +import Ouroboros.Network.Subscription.Ip (SubscriptionTrace (..)) import Ouroboros.Network.Subscription.Worker (LocalAddresses (..)) import Ouroboros.Network.IOManager @@ -269,6 +265,10 @@ withServer_V1 sn tracers networkState addr versionData application = (DictVersion nodeToClientCodecCBORTerm) application) +type NetworkClientSubcriptionTracers + = NetworkSubscriptionTracers Identity LocalAddress NodeToClientProtocols NodeToClientVersion + + -- | 'ncSubscriptionWorker' which starts given application versions on each -- established connection. -- @@ -277,9 +277,9 @@ ncSubscriptionWorker ( HasInitiator appType ~ True ) => LocalSnocket - -> NetworkIPSubscriptionTracers LocalAddress NodeToClientProtocols NodeToClientVersion + -> NetworkClientSubcriptionTracers -> NetworkMutableState LocalAddress - -> IPSubscriptionParams () + -> ClientSubscriptionParams () -> Versions NodeToClientVersion DictVersion @@ -291,44 +291,41 @@ ncSubscriptionWorker -> IO Void ncSubscriptionWorker sn - NetworkIPSubscriptionTracers - { nistSubscriptionTracer - , nistMuxTracer - , nistHandshakeTracer - , nistErrorPolicyTracer + NetworkSubscriptionTracers + { nsSubscriptionTracer + , nsMuxTracer + , nsHandshakeTracer + , nsErrorPolicyTracer } networkState subscriptionParams versions - = Subscription.ipSubscriptionWorker + = Subscription.clientSubscriptionWorker sn - nistSubscriptionTracer - nistErrorPolicyTracer + (Identity `contramap` nsSubscriptionTracer) + nsErrorPolicyTracer networkState subscriptionParams (connectToNode' sn cborTermVersionDataCodec - (NetworkConnectTracers nistMuxTracer nistHandshakeTracer) + (NetworkConnectTracers nsMuxTracer nsHandshakeTracer) versions) -- | Like 'ncSubscriptionWorker' but specific to 'NodeToClientV_1'. -- ncSubscriptionWorker_V1 - :: forall appType fd addr x y. - ( HasInitiator appType ~ True - , fd ~ Socket.Socket - , addr ~ Socket.SockAddr - ) - => Snocket IO fd addr - -> NetworkIPSubscriptionTracers addr NodeToClientProtocols NodeToClientVersion - -> NetworkMutableState addr - -> IPSubscriptionParams () + :: forall appType x y. + ( HasInitiator appType ~ True ) + => LocalSnocket + -> NetworkSubscriptionTracers Identity LocalAddress NodeToClientProtocols NodeToClientVersion + -> NetworkMutableState LocalAddress + -> ClientSubscriptionParams () -> NodeToClientVersionData -> (OuroborosApplication appType - (ConnectionId addr) + (ConnectionId LocalAddress) NodeToClientProtocols IO BL.ByteString x y) -> IO Void diff --git a/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs b/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs index f0edf658ce6..5d75c0be235 100644 --- a/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs +++ b/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs @@ -33,8 +33,9 @@ module Ouroboros.Network.NodeToNode ( -- * Subscription Workers -- ** IP subscriptin worker , IPSubscriptionTarget (..) - , NetworkIPSubscriptionTracers (..) - , nullNetworkIPSubscriptionTracers + , NetworkIPSubscriptionTracers + , NetworkSubscriptionTracers (..) + , nullNetworkSubscriptionTracers , SubscriptionParams (..) , IPSubscriptionParams , ipSubscriptionWorker @@ -294,25 +295,25 @@ ipSubscriptionWorker -> IO Void ipSubscriptionWorker sn - NetworkIPSubscriptionTracers - { nistSubscriptionTracer - , nistMuxTracer - , nistHandshakeTracer - , nistErrorPolicyTracer + NetworkSubscriptionTracers + { nsSubscriptionTracer + , nsMuxTracer + , nsHandshakeTracer + , nsErrorPolicyTracer } networkState subscriptionParams versions = Subscription.ipSubscriptionWorker sn - nistSubscriptionTracer - nistErrorPolicyTracer + nsSubscriptionTracer + nsErrorPolicyTracer networkState subscriptionParams (connectToNode' sn cborTermVersionDataCodec - (NetworkConnectTracers nistMuxTracer nistHandshakeTracer) + (NetworkConnectTracers nsMuxTracer nsHandshakeTracer) versions) diff --git a/ouroboros-network/src/Ouroboros/Network/Snocket.hs b/ouroboros-network/src/Ouroboros/Network/Snocket.hs index 5de96b94cb6..ae5664adc9e 100644 --- a/ouroboros-network/src/Ouroboros/Network/Snocket.hs +++ b/ouroboros-network/src/Ouroboros/Network/Snocket.hs @@ -18,6 +18,7 @@ module Ouroboros.Network.Snocket , LocalSnocket , localSnocket , LocalAddress + , LocalFD , localAddressFromPath ) where @@ -381,11 +382,13 @@ namedPipeSnocket iocp name = Snocket { -- | System dependent LocalSnocket type #if defined(mingw32_HOST_OS) type LocalSnocket = HANDLESnocket +type LocalFD = Win32.HANDLE localSnocket :: AssociateWithIOCP -> FilePath -> LocalSnocket localSnocket = namedPipeSnocket #else type LocalSnocket = SocketSnocket +type LocalFD = Socket localSnocket :: AssociateWithIOCP -> FilePath -> LocalSnocket localSnocket iocp _ = rawSocketSnocket iocp diff --git a/ouroboros-network/src/Ouroboros/Network/Subscription/Client.hs b/ouroboros-network/src/Ouroboros/Network/Subscription/Client.hs new file mode 100644 index 00000000000..44c68205059 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/Subscription/Client.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE NamedFieldPuns #-} + +-- Subscription worker for client applications connecting with 'LocalSnocket' +-- which is using either unix sockets or Windows' named pipes. +-- +module Ouroboros.Network.Subscription.Client + ( ClientSubscriptionParams (..) + , clientSubscriptionWorker + ) where + +import Control.Monad.Class.MonadTime +import Control.Tracer + +import Data.Void (Void) +import Data.Functor.Identity (Identity (..)) + +import Ouroboros.Network.Snocket ( LocalAddress + , LocalSnocket + , LocalFD + ) +import Ouroboros.Network.ErrorPolicy ( ErrorPolicies + , ErrorPolicyTrace + , WithAddr + , completeApplicationTx + ) +import Ouroboros.Network.Socket (NetworkMutableState (..)) +import Ouroboros.Network.Subscription.Ip (socketStateChangeTx, mainTx) +import Ouroboros.Network.Subscription.Worker +import Ouroboros.Network.Subscription.Subscriber + + +data ClientSubscriptionParams a = ClientSubscriptionParams + { cspAddress :: !LocalAddress + -- ^ unix socket or named pipe address + , cspConnectionAttemptDelay :: !(Maybe DiffTime) + -- ^ delay between connection attempts + , cspErrorPolicies :: !ErrorPolicies + -- ^ error policies for subscription worker + } + +-- | Client subscription worker keeps subsribing to the 'LocalAddress' using +-- either unix socket or named pipe. +-- +clientSubscriptionWorker + :: LocalSnocket + -> Tracer IO (SubscriptionTrace LocalAddress) + -> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace) + -> NetworkMutableState LocalAddress + -> ClientSubscriptionParams a + -> (LocalFD -> IO a) + -> IO Void +clientSubscriptionWorker snocket + tracer + errorPolicyTracer + NetworkMutableState { nmsConnectionTable, nmsPeerStates } + ClientSubscriptionParams { cspAddress + , cspConnectionAttemptDelay + , cspErrorPolicies + } + k = + worker tracer + errorPolicyTracer + nmsConnectionTable + nmsPeerStates + snocket + WorkerCallbacks + { wcSocketStateChangeTx = socketStateChangeTx + , wcCompleteApplicationTx = completeApplicationTx cspErrorPolicies + , wcMainTx = mainTx + } + workerParams + k + where + workerParams = WorkerParams { + wpLocalAddresses = Identity cspAddress, + wpSelectAddress = \_ (Identity addr) -> Just addr, + wpConnectionAttemptDelay = const cspConnectionAttemptDelay, + wpSubscriptionTarget = pure (constantSubscriptionTarget cspAddress), + wpValency = 1 + } diff --git a/ouroboros-network/src/Ouroboros/Network/Subscription/Dns.hs b/ouroboros-network/src/Ouroboros/Network/Subscription/Dns.hs index b6ee09a6df5..453f15e9e71 100644 --- a/ouroboros-network/src/Ouroboros/Network/Subscription/Dns.hs +++ b/ouroboros-network/src/Ouroboros/Network/Subscription/Dns.hs @@ -251,6 +251,7 @@ dnsSubscriptionWorker' snocket subTracer dnsTracer errorPolicyTracer (WithDomainName (dstDomain dst) `contramap` dnsTracer) resolver nmsPeerStates beforeConnectTx dst , wpValency = dstValency dst + , wpSelectAddress = selectSockAddr } spErrorPolicies main diff --git a/ouroboros-network/src/Ouroboros/Network/Subscription/Ip.hs b/ouroboros-network/src/Ouroboros/Network/Subscription/Ip.hs index aa0a7d3141f..698b4dd9871 100644 --- a/ouroboros-network/src/Ouroboros/Network/Subscription/Ip.hs +++ b/ouroboros-network/src/Ouroboros/Network/Subscription/Ip.hs @@ -27,6 +27,9 @@ module Ouroboros.Network.Subscription.Ip , completeApplicationTx , socketStateChangeTx , mainTx + + -- * Utilitity functions + , selectSockAddr ) where @@ -107,12 +110,21 @@ ipSubscriptionWorker snocket subscriptionTracer errorPolicyTracer wpSubscriptionTarget = pure $ ipSubscriptionTarget subscriptionTracer' nmsPeerStates (ispIps spSubscriptionTarget), - wpValency = ispValency spSubscriptionTarget + wpValency = ispValency spSubscriptionTarget, + wpSelectAddress = selectSockAddr } subscriptionTracer' = (WithIPList spLocalAddresses (ispIps spSubscriptionTarget) `contramap` subscriptionTracer) +selectSockAddr :: Socket.SockAddr + -> LocalAddresses Socket.SockAddr + -> Maybe Socket.SockAddr +selectSockAddr Socket.SockAddrInet{} (LocalAddresses (Just localAddr) _ _ ) = Just localAddr +selectSockAddr Socket.SockAddrInet6{} (LocalAddresses _ (Just localAddr) _ ) = Just localAddr +selectSockAddr Socket.SockAddrUnix{} (LocalAddresses _ _ (Just localAddr) ) = Just localAddr +selectSockAddr _ _ = Nothing + ipSubscriptionTarget :: forall m addr. ( MonadSTM m @@ -180,7 +192,7 @@ subscriptionWorker -> Tracer IO (SubscriptionTrace Socket.SockAddr) -> Tracer IO (WithAddr Socket.SockAddr ErrorPolicyTrace) -> NetworkMutableState Socket.SockAddr - -> WorkerParams IO Socket.SockAddr + -> WorkerParams IO LocalAddresses Socket.SockAddr -> ErrorPolicies -> Main IO (PeerStates IO Socket.SockAddr) x -- ^ main callback @@ -205,18 +217,8 @@ subscriptionWorker snocket , wcMainTx = main } workerParams - selectAddress k - where - selectAddress :: Socket.SockAddr - -> LocalAddresses Socket.SockAddr - -> Maybe Socket.SockAddr - selectAddress Socket.SockAddrInet{} (LocalAddresses (Just localAddr) _ _ ) = Just localAddr - selectAddress Socket.SockAddrInet6{} (LocalAddresses _ (Just localAddr) _ ) = Just localAddr - selectAddress Socket.SockAddrUnix{} (LocalAddresses _ _ (Just localAddr) ) = Just localAddr - selectAddress _ _ = Nothing - data WithIPList a = WithIPList { wilSrc :: !(LocalAddresses Socket.SockAddr) , wilDsts :: ![Socket.SockAddr] diff --git a/ouroboros-network/src/Ouroboros/Network/Subscription/Worker.hs b/ouroboros-network/src/Ouroboros/Network/Subscription/Worker.hs index faa5171d06a..b567f7189c3 100644 --- a/ouroboros-network/src/Ouroboros/Network/Subscription/Worker.hs +++ b/ouroboros-network/src/Ouroboros/Network/Subscription/Worker.hs @@ -193,7 +193,7 @@ data ConnectResult = -- | Traverse 'SubscriptionTarget's in an infinite loop. -- subscriptionLoop - :: forall m s sock addr a x. + :: forall m s sock localAddrs addr a x. ( MonadAsync m , MonadFork m , MonadMask m @@ -216,8 +216,7 @@ subscriptionLoop -> Snocket m sock addr -> WorkerCallbacks m s addr a x - -> WorkerParams m addr - -> (addr -> LocalAddresses addr -> Maybe addr) + -> WorkerParams m localAddrs addr -- ^ given a remote address, pick the local one -> (sock -> m a) -- ^ application @@ -231,8 +230,8 @@ subscriptionLoop , wpConnectionAttemptDelay = connectionAttemptDelay , wpSubscriptionTarget = subscriptionTargets , wpValency = valency + , wpSelectAddress } - selectAddress k = do valencyVar <- atomically $ newValencyCounter tbl valency @@ -310,7 +309,7 @@ subscriptionLoop r <- refConnection tbl remoteAddr valencyVar case r of ConnectionTableCreate -> - case selectAddress remoteAddr localAddresses of + case wpSelectAddress remoteAddr localAddresses of Nothing -> traceWith tr (SubscriptionTraceUnsupportedRemoteAddr remoteAddr) @@ -515,8 +514,11 @@ data WorkerCallbacks m s addr a t = WorkerCallbacks { -- | Worker parameters -- -data WorkerParams m addr = WorkerParams { - wpLocalAddresses :: LocalAddresses addr, +data WorkerParams m localAddrs addr = WorkerParams { + wpLocalAddresses :: localAddrs addr, + -- ^ local addresses of the server + wpSelectAddress :: addr -> localAddrs addr -> Maybe addr, + -- ^ given remote addr pick the local address wpConnectionAttemptDelay :: addr -> Maybe DiffTime, -- ^ delay after a connection attempt to 'addr' wpSubscriptionTarget :: m (SubscriptionTarget m addr), @@ -532,7 +534,7 @@ data WorkerParams m addr = WorkerParams { -- 'orElse', PR #432. -- worker - :: forall s sock addr a x. + :: forall s sock localAddrs addr a x. ( Ord addr , Show addr ) @@ -544,18 +546,17 @@ worker -> Snocket IO sock addr -> WorkerCallbacks IO s addr a x - -> WorkerParams IO addr - -> (addr -> LocalAddresses addr -> Maybe addr) + -> WorkerParams IO localAddrs addr -> (sock -> IO a) -- ^ application -> IO x -worker tr errTrace tbl sVar snocket workerCallbacks@WorkerCallbacks {wcCompleteApplicationTx, wcMainTx} workerParams selectAddress k = do +worker tr errTrace tbl sVar snocket workerCallbacks@WorkerCallbacks {wcCompleteApplicationTx, wcMainTx } workerParams k = do resQ <- newResultQ threadsVar <- atomically $ newTVar Set.empty withAsync (subscriptionLoop tr tbl resQ sVar threadsVar snocket - workerCallbacks workerParams selectAddress k) $ \_ -> + workerCallbacks workerParams k) $ \_ -> mainLoop errTrace resQ threadsVar sVar wcCompleteApplicationTx wcMainTx `finally` killThreads threadsVar where diff --git a/ouroboros-network/src/Ouroboros/Network/Tracers.hs b/ouroboros-network/src/Ouroboros/Network/Tracers.hs index 1fb9d39c91c..f13c5ca9650 100644 --- a/ouroboros-network/src/Ouroboros/Network/Tracers.hs +++ b/ouroboros-network/src/Ouroboros/Network/Tracers.hs @@ -1,6 +1,7 @@ module Ouroboros.Network.Tracers - ( NetworkIPSubscriptionTracers (..) - , nullNetworkIPSubscriptionTracers + ( NetworkSubscriptionTracers (..) + , NetworkIPSubscriptionTracers + , nullNetworkSubscriptionTracers , NetworkDNSSubscriptionTracers (..) , nullNetworkDNSSubscriptionTracers ) where @@ -20,29 +21,31 @@ import Ouroboros.Network.Subscription.Dns -- | IP subscription tracers. -- -data NetworkIPSubscriptionTracers addr ptcl vNumber = NetworkIPSubscriptionTracers { - nistMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace), +data NetworkSubscriptionTracers withIPList addr ptcl vNumber = NetworkSubscriptionTracers { + nsMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace), -- ^ low level mux-network tracer, which logs mux sdu (send and received) -- and other low level multiplexing events. - nistHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId addr) + nsHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId addr) (TraceSendRecv (Handshake vNumber CBOR.Term))), -- ^ handshake protocol tracer; it is important for analysing version -- negotation mismatches. - nistErrorPolicyTracer :: Tracer IO (WithAddr addr ErrorPolicyTrace), + nsErrorPolicyTracer :: Tracer IO (WithAddr addr ErrorPolicyTrace), -- ^ error policy tracer; must not be 'nullTracer', otherwise all the -- exceptions which are not matched by any error policy will be caught -- and not logged or rethrown. - nistSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace addr)) + nsSubscriptionTracer :: Tracer IO (withIPList (SubscriptionTrace addr)) -- ^ subscription tracers; it is infrequent it should not be 'nullTracer' -- by default. } -nullNetworkIPSubscriptionTracers :: NetworkIPSubscriptionTracers addr ptcl vNumber -nullNetworkIPSubscriptionTracers = NetworkIPSubscriptionTracers { - nistMuxTracer = nullTracer, - nistHandshakeTracer = nullTracer, - nistErrorPolicyTracer = nullTracer, - nistSubscriptionTracer = nullTracer +type NetworkIPSubscriptionTracers addr ptcl vNumber = NetworkSubscriptionTracers WithIPList addr ptcl vNumber + +nullNetworkSubscriptionTracers :: NetworkSubscriptionTracers withIPList addr ptcl vNumber +nullNetworkSubscriptionTracers = NetworkSubscriptionTracers { + nsMuxTracer = nullTracer, + nsHandshakeTracer = nullTracer, + nsErrorPolicyTracer = nullTracer, + nsSubscriptionTracer = nullTracer } -- | DNS subscription tracers. diff --git a/ouroboros-network/test/Test/PeerState.hs b/ouroboros-network/test/Test/PeerState.hs index bc463df045d..eb3c64cf69c 100644 --- a/ouroboros-network/test/Test/PeerState.hs +++ b/ouroboros-network/test/Test/PeerState.hs @@ -377,12 +377,12 @@ prop_subscriptionWorker laIpv6 = Just localAddr, laUnix = Nothing }, + wpSelectAddress = \_ LocalAddresses {laIpv4, laIpv6} -> getFirst (First laIpv4 <> First laIpv6), wpConnectionAttemptDelay = const Nothing, wpSubscriptionTarget = pure $ ipSubscriptionTarget nullTracer peerStatesVar [remoteAddr], wpValency = 1 } - (\_ LocalAddresses {laIpv4, laIpv6} -> getFirst (First laIpv4 <> First laIpv6)) (\sock -> app sock `finally` (void $ atomically $ tryPutTMVar doneVar ())) diff --git a/ouroboros-network/test/Test/Subscription.hs b/ouroboros-network/test/Test/Subscription.hs index cf60262ddcb..70b52c86dab 100644 --- a/ouroboros-network/test/Test/Subscription.hs +++ b/ouroboros-network/test/Test/Subscription.hs @@ -726,6 +726,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do (NetworkMutableState tbl peerStatesVar) WorkerParams { wpLocalAddresses = LocalAddresses (Just localAddr) Nothing Nothing, + wpSelectAddress = selectSockAddr, wpConnectionAttemptDelay = \_ -> Just minConnectionAttemptDelay, wpSubscriptionTarget = pure $ listSubscriptionTarget [remoteAddr], wpValency = 1