-
Notifications
You must be signed in to change notification settings - Fork 87
/
Node.hs
394 lines (356 loc) · 13.1 KB
/
Node.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Top-level module to run a single Hydra node.
--
-- Checkout [Hydra
-- Documentation](https://hydra.family/head-protocol/dev/architecture)
-- for some details about the overall architecture of the `Node`.
module Hydra.Node where
import Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (
MonadLabelledSTM,
labelTVarIO,
newTVarIO,
stateTVar,
writeTVar,
)
import Control.Monad.Trans.Writer (execWriter, tell)
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Server (Server, sendOutput)
import Hydra.Cardano.Api (AsType (AsPaymentKey, AsSigningKey, AsVerificationKey), getVerificationKey)
import Hydra.Chain (
Chain (..),
ChainEvent (..),
ChainStateHistory,
PostTxError,
)
import Hydra.Chain.ChainState (ChainStateType, IsChainState)
import Hydra.Chain.Direct.Util (readFileTextEnvelopeThrow)
import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventsToSinks, stateChanged)
import Hydra.HeadLogic (
Effect (..),
HeadState (..),
IdleState (..),
Input (..),
Outcome (..),
aggregateState,
defaultTTL,
recoverChainStateHistory,
recoverState,
)
import Hydra.HeadLogic qualified as HeadLogic
import Hydra.HeadLogic.Outcome (StateChanged (..))
import Hydra.HeadLogic.State (getHeadParameters)
import Hydra.Ledger (Ledger)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (Network (..), NetworkCallback (..))
import Hydra.Network.Message (Message, NetworkEvent (..))
import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue)
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
import Hydra.Tx (HasParty (..), HeadParameters (..), Party (..), deriveParty)
import Hydra.Tx.Crypto (AsType (AsHydraKey))
import Hydra.Tx.Environment (Environment (..))
import Hydra.Tx.IsTx (ArbitraryIsTx)
import Hydra.Tx.Utils (verificationKeyToOnChainId)
-- * Environment Handling
-- | Intialize the 'Environment' from command line options.
initEnvironment :: RunOptions -> IO Environment
initEnvironment options = do
sk <- readFileTextEnvelopeThrow (AsSigningKey AsHydraKey) hydraSigningKey
otherParties <- mapM loadParty hydraVerificationKeys
participants <- getParticipants
pure $
Environment
{ party = deriveParty sk
, signingKey = sk
, otherParties
, participants
, contestationPeriod
}
where
-- XXX: This is mostly a cardano-specific initialization step of loading
-- --cardano-verification-key options and deriving 'OnChainId's from it. We should be able to call out to the various chain layer
getParticipants =
case chainConfig of
Offline{} -> pure []
Direct
DirectChainConfig
{ cardanoVerificationKeys
, cardanoSigningKey
} -> do
ownSigningKey <- readFileTextEnvelopeThrow (AsSigningKey AsPaymentKey) cardanoSigningKey
otherVerificationKeys <- mapM (readFileTextEnvelopeThrow (AsVerificationKey AsPaymentKey)) cardanoVerificationKeys
pure $ verificationKeyToOnChainId <$> (getVerificationKey ownSigningKey : otherVerificationKeys)
contestationPeriod = case chainConfig of
Offline{} -> defaultContestationPeriod
Direct DirectChainConfig{contestationPeriod = cp} -> cp
loadParty p =
Party <$> readFileTextEnvelopeThrow (AsVerificationKey AsHydraKey) p
RunOptions
{ hydraSigningKey
, hydraVerificationKeys
, chainConfig
} = options
-- | Checks that command line options match a given 'HeadState'. This function
-- takes 'Environment' because it is derived from 'RunOptions' via
-- 'initEnvironment'.
--
-- Throws: 'ParameterMismatch' when state not matching the environment.
checkHeadState ::
MonadThrow m =>
Tracer m (HydraNodeLog tx) ->
Environment ->
HeadState tx ->
m ()
checkHeadState tracer env headState = do
unless (null paramsMismatch) $ do
traceWith tracer (Misconfiguration paramsMismatch)
throwIO $ ParameterMismatch paramsMismatch
where
paramsMismatch =
maybe [] validateParameters $ getHeadParameters headState
validateParameters HeadParameters{contestationPeriod = loadedCp, parties} =
execWriter $ do
when (loadedCp /= configuredCp) $
tell [ContestationPeriodMismatch{loadedCp, configuredCp}]
let loadedParties = sort parties
configuredParties = sort (party : otherParties)
when (loadedParties /= configuredParties) $
tell [PartiesMismatch{loadedParties, configuredParties}]
Environment{contestationPeriod = configuredCp, otherParties, party} = env
-- * Create and run a hydra node
-- | A draft version of the 'HydraNode' that holds state, but is not yet
-- connected (see 'connect'). This is commonly created by the 'hydrate' smart
-- constructor.
data DraftHydraNode tx m = DraftHydraNode
{ tracer :: Tracer m (HydraNodeLog tx)
, env :: Environment
, ledger :: Ledger tx
, nodeState :: NodeState tx m
, inputQueue :: InputQueue m (Input tx)
, eventSource :: EventSource (StateEvent tx) m
, eventSinks :: [EventSink (StateEvent tx) m]
, -- XXX: This is an odd field in here, but needed for the chain layer to
-- bootstrap. Maybe move to NodeState or make it differently accessible?
chainStateHistory :: ChainStateHistory tx
}
instance HasParty (DraftHydraNode tx m) where
getParty DraftHydraNode{env} = getParty env
-- | Hydrate a 'DraftHydraNode' by loading events from source, re-aggregate node
-- state and sending events to sinks while doing so.
hydrate ::
(MonadDelay m, MonadLabelledSTM m, MonadAsync m, MonadThrow m, IsChainState tx) =>
Tracer m (HydraNodeLog tx) ->
Environment ->
Ledger tx ->
ChainStateType tx ->
EventSource (StateEvent tx) m ->
[EventSink (StateEvent tx) m] ->
m (DraftHydraNode tx m)
hydrate tracer env ledger initialChainState eventSource eventSinks = do
events <- getEvents eventSource
let lastSeenEventId = getEventId . last <$> nonEmpty events
traceWith tracer LoadedState{numberOfEvents = fromIntegral $ length events}
let headState = recoverState initialState (stateChanged <$> events)
chainStateHistory = recoverChainStateHistory initialChainState (stateChanged <$> events)
-- Check whether the loaded state matches our configuration (env)
checkHeadState tracer env headState
-- (Re-)submit events to sinks; de-duplication is handled by the sinks
putEventsToSinks eventSinks events
nodeState <- createNodeState lastSeenEventId headState
inputQueue <- createInputQueue
pure
DraftHydraNode
{ tracer
, env
, ledger
, nodeState
, inputQueue
, eventSource
, eventSinks
, chainStateHistory
}
where
initialState = Idle IdleState{chainState = initialChainState}
wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ())
wireChainInput node = enqueue . ChainInput
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ())
wireClientInput node = enqueue . ClientInput
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (NetworkEvent (Message tx)) m
wireNetworkInput node =
NetworkCallback{deliver = enqueue . NetworkInput defaultTTL}
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
-- | Connect chain, network and API to a hydrated 'DraftHydraNode' to get a fully
-- connected 'HydraNode'.
connect ::
Monad m =>
Chain tx m ->
Network m (Message tx) ->
Server tx m ->
DraftHydraNode tx m ->
m (HydraNode tx m)
connect chain network server node =
pure HydraNode{tracer, env, ledger, nodeState, inputQueue, eventSource, eventSinks, oc = chain, hn = network, server}
where
DraftHydraNode{tracer, env, ledger, nodeState, inputQueue, eventSource, eventSinks} = node
-- | Fully connected hydra node with everything wired in.
data HydraNode tx m = HydraNode
{ tracer :: Tracer m (HydraNodeLog tx)
, env :: Environment
, ledger :: Ledger tx
, nodeState :: NodeState tx m
, inputQueue :: InputQueue m (Input tx)
, eventSource :: EventSource (StateEvent tx) m
, eventSinks :: [EventSink (StateEvent tx) m]
, oc :: Chain tx m
, hn :: Network m (Message tx)
, server :: Server tx m
}
runHydraNode ::
( MonadCatch m
, MonadAsync m
, IsChainState tx
) =>
HydraNode tx m ->
m ()
runHydraNode node =
-- NOTE(SN): here we could introduce concurrent head processing, e.g. with
-- something like 'forM_ [0..1] $ async'
forever $ stepHydraNode node
stepHydraNode ::
( MonadCatch m
, MonadAsync m
, IsChainState tx
) =>
HydraNode tx m ->
m ()
stepHydraNode node = do
i@Queued{queuedId, queuedItem} <- dequeue
traceWith tracer $ BeginInput{by = party, inputId = queuedId, input = queuedItem}
outcome <- atomically $ processNextInput node queuedItem
traceWith tracer (LogicOutcome party outcome)
case outcome of
Continue{stateChanges, effects} -> do
processStateChanges node stateChanges
processEffects node tracer queuedId effects
Wait{stateChanges} -> do
processStateChanges node stateChanges
maybeReenqueue i
Error{} -> pure ()
traceWith tracer EndInput{by = party, inputId = queuedId}
where
maybeReenqueue q@Queued{queuedId, queuedItem} =
case queuedItem of
NetworkInput ttl msg
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg}
_ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem}
Environment{party} = env
HydraNode{tracer, inputQueue = InputQueue{dequeue, reenqueue}, env} = node
-- | The time to wait between re-enqueuing a 'Wait' outcome from 'HeadLogic'.
waitDelay :: DiffTime
waitDelay = 0.1
-- | Monadic interface around 'Hydra.Logic.update'.
processNextInput ::
IsChainState tx =>
HydraNode tx m ->
Input tx ->
STM m (Outcome tx)
processNextInput HydraNode{nodeState, ledger, env} e =
modifyHeadState $ \s ->
let outcome = computeOutcome s e
in (outcome, aggregateState s outcome)
where
NodeState{modifyHeadState} = nodeState
computeOutcome = HeadLogic.update env ledger
processStateChanges :: MonadSTM m => HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges node stateChanges = do
events <- atomically . forM stateChanges $ \stateChanged -> do
eventId <- getNextEventId
pure StateEvent{eventId, stateChanged}
putEventsToSinks eventSinks events
where
HydraNode
{ eventSinks
, nodeState = NodeState{getNextEventId}
} = node
processEffects ::
( MonadAsync m
, MonadCatch m
, IsChainState tx
) =>
HydraNode tx m ->
Tracer m (HydraNodeLog tx) ->
Word64 ->
[Effect tx] ->
m ()
processEffects node tracer inputId effects = do
mapM_ processEffect $ zip effects [0 ..]
where
processEffect (effect, effectId) = do
traceWith tracer $ BeginEffect party inputId effectId effect
case effect of
ClientEffect i -> sendOutput server i
NetworkEffect msg -> broadcast hn msg
OnChainEffect{postChainTx} ->
postTx postChainTx
`catch` \(postTxError :: PostTxError tx) ->
enqueue . ChainInput $ PostTxError{postChainTx, postTxError}
traceWith tracer $ EndEffect party inputId effectId
HydraNode
{ hn
, oc = Chain{postTx}
, server
, inputQueue = InputQueue{enqueue}
, env = Environment{party}
} = node
-- ** Manage state
-- | Handle to access and modify the state in the Hydra Node.
data NodeState tx m = NodeState
{ modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
, queryHeadState :: STM m (HeadState tx)
, getNextEventId :: STM m EventId
}
-- | Initialize a new 'NodeState'.
createNodeState ::
MonadLabelledSTM m =>
-- | Last seen 'EventId'.
Maybe EventId ->
HeadState tx ->
m (NodeState tx m)
createNodeState lastSeenEventId initialState = do
nextEventIdV <- newTVarIO $ maybe 0 (+ 1) lastSeenEventId
labelTVarIO nextEventIdV "next-event-id"
hs <- newTVarIO initialState
labelTVarIO hs "head-state"
pure
NodeState
{ modifyHeadState = stateTVar hs
, queryHeadState = readTVar hs
, getNextEventId = do
eventId <- readTVar nextEventIdV
writeTVar nextEventIdV $ eventId + 1
pure eventId
}
-- * Logging
data HydraNodeLog tx
= BeginInput {by :: Party, inputId :: Word64, input :: Input tx}
| EndInput {by :: Party, inputId :: Word64}
| BeginEffect {by :: Party, inputId :: Word64, effectId :: Word32, effect :: Effect tx}
| EndEffect {by :: Party, inputId :: Word64, effectId :: Word32}
| LogicOutcome {by :: Party, outcome :: Outcome tx}
| DroppedFromQueue {inputId :: Word64, input :: Input tx}
| LoadedState {numberOfEvents :: Word64}
| Misconfiguration {misconfigurationErrors :: [ParamMismatch]}
deriving stock (Generic)
deriving stock instance IsChainState tx => Eq (HydraNodeLog tx)
deriving stock instance IsChainState tx => Show (HydraNodeLog tx)
deriving anyclass instance IsChainState tx => ToJSON (HydraNodeLog tx)
instance (ArbitraryIsTx tx, IsChainState tx) => Arbitrary (HydraNodeLog tx) where
arbitrary = genericArbitrary
shrink = genericShrink