Skip to content

Commit

Permalink
clientSubscriptionWorker
Browse files Browse the repository at this point in the history
A subscription worker which works over ClientSnocket.  It is integrated
into data diffusion `Ouroboros.Network.Diffusion`.

* `ouroboros-network`   - compiler and runs all its tests on Windows.
* `ouroboros-consensus` - windows support is tracked in #1082
  • Loading branch information
coot committed Feb 1, 2020
1 parent 7f1d4ee commit fb7696b
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 95 deletions.
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ library
Ouroboros.Network.Subscription
Ouroboros.Network.Subscription.Ip
Ouroboros.Network.Subscription.Dns
Ouroboros.Network.Subscription.Client
Ouroboros.Network.Subscription.Subscriber
Ouroboros.Network.Subscription.PeerState
Ouroboros.Network.Subscription.Worker
Expand Down
28 changes: 15 additions & 13 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import Network.Mux (MuxTrace (..), WithMuxBearer (..))
import Network.Socket (SockAddr, AddrInfo)
import qualified Network.Socket as Socket

import Ouroboros.Network.Snocket (SocketSnocket, ClientSnocket)
import Ouroboros.Network.Snocket (ClientAddress, SocketSnocket, ClientSnocket)
import qualified Ouroboros.Network.Snocket as Snocket

import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
Expand Down Expand Up @@ -57,23 +57,24 @@ import Ouroboros.Network.Subscription.Worker (LocalAddresses (..))
import Ouroboros.Network.Tracers

data DiffusionTracers = DiffusionTracers {
dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-- ^ IP subscription tracer
, dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
, dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-- ^ DNS subscription tracer
, dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace)
, dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace)
-- ^ DNS resolver tracer
, dtMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
, dtMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
-- ^ Mux tracer
, dtMuxLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
, dtMuxLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId ClientAddress) MuxTrace)
-- ^ Mux tracer for local clients
, dtHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr)
, dtHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr)
(TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term)))
-- ^ Handshake protocol tracer
, dtHandshakeLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr)
, dtHandshakeLocalTracer :: Tracer IO (WithMuxBearer (ConnectionId ClientAddress)
(TraceSendRecv (Handshake NodeToClientVersion CBOR.Term)))
-- ^ Handshake protocol tracer for local clients
, dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
, dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
, dtLocalErrorPolicyTracer :: Tracer IO (WithAddr ClientAddress ErrorPolicyTrace)
}


Expand Down Expand Up @@ -122,7 +123,7 @@ data DiffusionApplications = DiffusionApplications {
DictVersion
(OuroborosApplication
'ResponderApp
(ConnectionId SockAddr)
(ConnectionId ClientAddress)
NodeToClientProtocols
IO
ByteString
Expand Down Expand Up @@ -194,6 +195,7 @@ runDataDiffusion tracers
, dtHandshakeTracer
, dtHandshakeLocalTracer
, dtErrorPolicyTracer
, dtLocalErrorPolicyTracer
} = tracers

initiatorLocalAddresses :: LocalAddresses SockAddr
Expand All @@ -220,14 +222,14 @@ runDataDiffusion tracers
remoteErrorPolicy = NodeToNode.remoteNetworkErrorPolicy <> daErrorPolicies
localErrorPolicy = NodeToNode.localNetworkErrorPolicy <> daErrorPolicies

runLocalServer :: SocketSnocket -> NetworkMutableState SockAddr -> IO Void
runLocalServer :: ClientSnocket -> NetworkMutableState ClientAddress -> IO Void
runLocalServer sn networkLocalState =
NodeToClient.withServer
sn
(NetworkServerTracers
dtMuxLocalTracer
dtHandshakeLocalTracer
dtErrorPolicyTracer)
dtLocalErrorPolicyTracer)
networkLocalState
(Snocket.clientAddressFromPath daLocalAddress)
(daLocalResponderApplication applications)
Expand All @@ -251,7 +253,7 @@ runDataDiffusion tracers
-> IO Void
runIpSubscriptionWorker sn networkState = NodeToNode.ipSubscriptionWorker
sn
(NetworkIPSubscriptionTracers
(NetworkSubscriptionTracers
dtMuxTracer
dtHandshakeTracer
dtErrorPolicyTracer
Expand Down
65 changes: 31 additions & 34 deletions ouroboros-network/src/Ouroboros/Network/NodeToClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ module Ouroboros.Network.NodeToClient (
, withServer_V1
, withServer

, NetworkIPSubscriptionTracers (..)
, IPSubscriptionParams
, SubscriptionParams (..)

, NetworkClientSubcriptionTracers
, NetworkSubscriptionTracers (..)
, ClientSubscriptionParams (..)
, ncSubscriptionWorker
, ncSubscriptionWorker_V1

Expand Down Expand Up @@ -59,14 +60,14 @@ module Ouroboros.Network.NodeToClient (
, DecoderFailureOrTooMuchInput
, Handshake
, LocalAddresses (..)
, IPSubscriptionTarget (..)
, SubscriptionTrace (..)
, WithIPList (..)
) where

import qualified Control.Concurrent.Async as Async
import Control.Exception (IOException)
import qualified Data.ByteString.Lazy as BL
import Data.Functor.Identity (Identity (..))
import Data.Functor.Contravariant (contramap)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
Expand All @@ -78,8 +79,6 @@ import qualified Codec.CBOR.Term as CBOR
import Codec.Serialise (Serialise (..), DeserialiseFailure)
import Codec.SerialiseTerm

import qualified Network.Socket as Socket

import Network.Mux hiding (MiniProtocolLimits(..))
import Network.TypedProtocol.Driver.ByteLimit (DecoderFailureOrTooMuchInput)
import Network.TypedProtocol.Driver (TraceSendRecv (..))
Expand All @@ -94,12 +93,9 @@ import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version
import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket
import Ouroboros.Network.Subscription.Ip (IPSubscriptionParams, SubscriptionParams (..))
import qualified Ouroboros.Network.Subscription.Ip as Subscription
import Ouroboros.Network.Subscription.Ip ( IPSubscriptionTarget (..)
, WithIPList (..)
, SubscriptionTrace (..)
)
import Ouroboros.Network.Subscription.Client ( ClientSubscriptionParams (..) )
import qualified Ouroboros.Network.Subscription.Client as Subscription
import Ouroboros.Network.Subscription.Ip (SubscriptionTrace (..))
import Ouroboros.Network.Subscription.Worker (LocalAddresses (..))
import Ouroboros.Network.IOManager

Expand Down Expand Up @@ -268,6 +264,10 @@ withServer_V1 sn tracers networkState addr versionData application =
(DictVersion nodeToClientCodecCBORTerm)
application)

type NetworkClientSubcriptionTracers
= NetworkSubscriptionTracers Identity ClientAddress NodeToClientProtocols NodeToClientVersion


-- | 'ncSubscriptionWorker' which starts given application versions on each
-- established connection.
--
Expand All @@ -276,9 +276,9 @@ ncSubscriptionWorker
( HasInitiator appType ~ True
)
=> ClientSnocket
-> NetworkIPSubscriptionTracers ClientAddress NodeToClientProtocols NodeToClientVersion
-> NetworkClientSubcriptionTracers
-> NetworkMutableState ClientAddress
-> IPSubscriptionParams ()
-> ClientSubscriptionParams ()
-> Versions
NodeToClientVersion
DictVersion
Expand All @@ -290,44 +290,41 @@ ncSubscriptionWorker
-> IO Void
ncSubscriptionWorker
sn
NetworkIPSubscriptionTracers
{ nistSubscriptionTracer
, nistMuxTracer
, nistHandshakeTracer
, nistErrorPolicyTracer
NetworkSubscriptionTracers
{ nsSubscriptionTracer
, nsMuxTracer
, nsHandshakeTracer
, nsErrorPolicyTracer
}
networkState
subscriptionParams
versions
= Subscription.ipSubscriptionWorker
= Subscription.clientSubscriptionWorker
sn
nistSubscriptionTracer
nistErrorPolicyTracer
(Identity `contramap` nsSubscriptionTracer)
nsErrorPolicyTracer
networkState
subscriptionParams
(connectToNode'
sn
cborTermVersionDataCodec
(NetworkConnectTracers nistMuxTracer nistHandshakeTracer)
(NetworkConnectTracers nsMuxTracer nsHandshakeTracer)
versions)


-- | Like 'ncSubscriptionWorker' but specific to 'NodeToClientV_1'.
--
ncSubscriptionWorker_V1
:: forall appType fd addr x y.
( HasInitiator appType ~ True
, fd ~ Socket.Socket
, addr ~ Socket.SockAddr
)
=> Snocket IO fd addr
-> NetworkIPSubscriptionTracers addr NodeToClientProtocols NodeToClientVersion
-> NetworkMutableState addr
-> IPSubscriptionParams ()
:: forall appType x y.
( HasInitiator appType ~ True )
=> ClientSnocket
-> NetworkSubscriptionTracers Identity ClientAddress NodeToClientProtocols NodeToClientVersion
-> NetworkMutableState ClientAddress
-> ClientSubscriptionParams ()
-> NodeToClientVersionData
-> (OuroborosApplication
appType
(ConnectionId addr)
(ConnectionId ClientAddress)
NodeToClientProtocols
IO BL.ByteString x y)
-> IO Void
Expand Down
21 changes: 11 additions & 10 deletions ouroboros-network/src/Ouroboros/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ module Ouroboros.Network.NodeToNode (
-- * Subscription Workers
-- ** IP subscriptin worker
, IPSubscriptionTarget (..)
, NetworkIPSubscriptionTracers (..)
, nullNetworkIPSubscriptionTracers
, NetworkIPSubscriptionTracers
, NetworkSubscriptionTracers (..)
, nullNetworkSubscriptionTracers
, SubscriptionParams (..)
, IPSubscriptionParams
, ipSubscriptionWorker
Expand Down Expand Up @@ -305,25 +306,25 @@ ipSubscriptionWorker
-> IO Void
ipSubscriptionWorker
sn
NetworkIPSubscriptionTracers
{ nistSubscriptionTracer
, nistMuxTracer
, nistHandshakeTracer
, nistErrorPolicyTracer
NetworkSubscriptionTracers
{ nsSubscriptionTracer
, nsMuxTracer
, nsHandshakeTracer
, nsErrorPolicyTracer
}
networkState
subscriptionParams
versions
= Subscription.ipSubscriptionWorker
sn
nistSubscriptionTracer
nistErrorPolicyTracer
nsSubscriptionTracer
nsErrorPolicyTracer
networkState
subscriptionParams
(connectToNode'
sn
cborTermVersionDataCodec
(NetworkConnectTracers nistMuxTracer nistHandshakeTracer)
(NetworkConnectTracers nsMuxTracer nsHandshakeTracer)
versions)


Expand Down
3 changes: 3 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Snocket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Ouroboros.Network.Snocket
, clientSnocket
, ClientAddress
, clientAddressFromPath
, ClientFD
) where

import Control.Exception
Expand Down Expand Up @@ -381,11 +382,13 @@ namedPipeSnocket iocp name = Snocket {
-- | System dependent ClientSnocket type
#if defined(mingw32_HOST_OS)
type ClientSnocket = HANDLESnocket
type ClientFD = Win32.HANDLE

clientSnocket :: AssociateWithIOCP -> FilePath -> ClientSnocket
clientSnocket = namedPipeSnocket
#else
type ClientSnocket = SocketSnocket
type ClientFD = Socket

clientSnocket :: AssociateWithIOCP -> FilePath -> ClientSnocket
clientSnocket iocp _ = rawSocketSnocket iocp
Expand Down
80 changes: 80 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Subscription/Client.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{-# LANGUAGE NamedFieldPuns #-}

-- Subscription worker for client applications connecting with 'ClientSnocket'
-- which is using either unix sockets or Windows' named pipes.
--
module Ouroboros.Network.Subscription.Client
( ClientSubscriptionParams (..)
, clientSubscriptionWorker
) where

import Control.Monad.Class.MonadTime
import Control.Tracer

import Data.Void (Void)
import Data.Functor.Identity (Identity (..))

import Ouroboros.Network.Snocket ( ClientAddress
, ClientSnocket
, ClientFD
)
import Ouroboros.Network.ErrorPolicy ( ErrorPolicies
, ErrorPolicyTrace
, WithAddr
, completeApplicationTx
)
import Ouroboros.Network.Socket (NetworkMutableState (..))
import Ouroboros.Network.Subscription.Ip (socketStateChangeTx, mainTx)
import Ouroboros.Network.Subscription.Worker
import Ouroboros.Network.Subscription.Subscriber


data ClientSubscriptionParams a = ClientSubscriptionParams
{ cspAddress :: !ClientAddress
-- ^ unix socket or named pipe address
, cspConnectionAttemptDelay :: !(Maybe DiffTime)
-- ^ delay between connection attempts
, cspErrorPolicies :: !ErrorPolicies
-- ^ error policies for subscription worker
}

-- | Client subscription worker keeps subsribing to the 'ClientAddress' using
-- either unix socket or named pipe.
--
clientSubscriptionWorker
:: ClientSnocket
-> Tracer IO (SubscriptionTrace ClientAddress)
-> Tracer IO (WithAddr ClientAddress ErrorPolicyTrace)
-> NetworkMutableState ClientAddress
-> ClientSubscriptionParams a
-> (ClientFD -> IO a)
-> IO Void
clientSubscriptionWorker snocket
tracer
errorPolicyTracer
NetworkMutableState { nmsConnectionTable, nmsPeerStates }
ClientSubscriptionParams { cspAddress
, cspConnectionAttemptDelay
, cspErrorPolicies
}
k =
worker tracer
errorPolicyTracer
nmsConnectionTable
nmsPeerStates
snocket
WorkerCallbacks
{ wcSocketStateChangeTx = socketStateChangeTx
, wcCompleteApplicationTx = completeApplicationTx cspErrorPolicies
, wcMainTx = mainTx
}
workerParams
k
where
workerParams = WorkerParams {
wpLocalAddresses = Identity cspAddress,
wpSelectAddress = \_ (Identity addr) -> Just addr,
wpConnectionAttemptDelay = const cspConnectionAttemptDelay,
wpSubscriptionTarget = pure (constantSubscriptionTarget cspAddress),
wpValency = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ dnsSubscriptionWorker' snocket subTracer dnsTracer errorPolicyTracer
(WithDomainName (dstDomain dst) `contramap` dnsTracer)
resolver nmsPeerStates beforeConnectTx dst
, wpValency = dstValency dst
, wpSelectAddress = selectSockAddr
}
spErrorPolicies
main
Expand Down
Loading

0 comments on commit fb7696b

Please sign in to comment.