Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix startCardanoTestnet #1654

Merged
merged 7 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/Internal/Spawn.purs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ spawn' cmd args opts mbFilter cont = do
child <- ChildProcess.spawn cmd args opts
let fullCmd = cmd <> foldMap (" " <> _) args
closedAVar <- AVar.empty
interface <- RL.createInterface (stdout child) mempty
stdoutInterfaceRef <- Ref.new Nothing
stderrInterface <- RL.createInterface (stderr child) mempty
flip RL.setLineHandler stderrInterface \str -> do
traceM $ "stderr: " <> str
outputRef <- Ref.new ""
ChildProcess.onClose child \code -> do
RL.close interface
stdoutInterface <- Ref.read stdoutInterfaceRef
traverse_ RL.close stdoutInterface
RL.close stderrInterface
void $ AVar.tryPut code closedAVar
output <- Ref.read outputRef
cont $ Left $ error
Expand All @@ -113,16 +115,18 @@ spawn' cmd args opts mbFilter cont = do
case mbFilter of
Nothing -> cont (pure mp)
Just filter -> do
flip RL.setLineHandler interface
stdoutInterface <- RL.createInterface (stdout child) mempty
Ref.write (Just stdoutInterface) stdoutInterfaceRef
flip RL.setLineHandler stdoutInterface
\str -> do
output <- Ref.modify (_ <> str <> "\n") outputRef
filter { output, line: str } >>= case _ of
Success -> do
clearLineHandler interface
clearLineHandler stdoutInterface
cont (pure mp)
Cancel -> do
kill SIGINT child
clearLineHandler interface
clearLineHandler stdoutInterface
cont $ Left $ error
$ "Process cancelled because output received: "
<> str
Expand Down
12 changes: 6 additions & 6 deletions src/Internal/Testnet/Contract.purs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module Ctl.Internal.Testnet.Contract
import Contract.Prelude

import Cardano.Serialization.Lib (privateKey_generateEd25519) as Csl
import Cardano.Types (NetworkId(TestnetId))
import Cardano.Types.Address (Address, getPaymentCredential, getStakeCredential)
import Cardano.Types.Address (toBech32) as Address
import Cardano.Types.BigInt (BigInt)
Expand Down Expand Up @@ -65,6 +64,7 @@ import Ctl.Internal.Testnet.DistributeFunds (Tx(Tx)) as DistrFunds
import Ctl.Internal.Testnet.Server
( StartedTestnetCluster
, makeClusterContractEnv
, mkLogging
, startTestnetCluster
)
import Ctl.Internal.Testnet.Types (TestnetConfig)
Expand Down Expand Up @@ -242,13 +242,13 @@ startTestnetContractEnv
}
startTestnetContractEnv cfg distr cleanupRef = do
_ <- cleanupOnExit cleanupRef
cluster <- startTestnetCluster cfg cleanupRef
{ env, printLogs, clearLogs } <- makeClusterContractEnv cleanupRef cfg
let env' = env { networkId = TestnetId }
wallets <- mkWallets env' cluster
logging@{ logger } <- liftEffect $ mkLogging cfg
cluster <- startTestnetCluster cfg cleanupRef logger
{ env, printLogs, clearLogs } <- makeClusterContractEnv cleanupRef logging
wallets <- mkWallets env cluster
pure
{ cluster
, env: env'
, env
, wallets
, printLogs
, clearLogs
Expand Down
127 changes: 63 additions & 64 deletions src/Internal/Testnet/Server.purs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ module Ctl.Internal.Testnet.Server
, startOgmios
, startTestnetCluster
, makeClusterContractEnv
, mkLogging
) where

import Contract.Prelude hiding (log)

import Cardano.Types (NetworkId(MainnetId))
import Cardano.Types (NetworkId(TestnetId))
import Cardano.Types.BigNum (maxValue, toString) as BigNum
import Contract.Config (Hooks, defaultSynchronizationParams, defaultTimeParams)
import Contract.Monad (ContractEnv)
Expand Down Expand Up @@ -55,7 +56,6 @@ import Ctl.Internal.Testnet.Utils
, suppressAndLogErrors
, tmpdirUnique
, tryAndLogErrors
, waitFor
, waitForClose
, waitForError
, waitForEvent
Expand All @@ -66,33 +66,36 @@ import Data.Array (head) as Array
import Data.Log.Message (Message)
import Data.Maybe (Maybe(Nothing, Just))
import Data.Set as Set
import Data.String (stripPrefix, trim) as String
import Data.String (split, stripPrefix, trim) as String
import Data.String.CodeUnits (indexOf) as String
import Data.String.Pattern (Pattern(Pattern))
import Data.Time.Duration (Milliseconds(Milliseconds))
import Data.UInt (UInt)
import Data.UInt (toString) as UInt
import Effect.AVar (tryPut) as AVarSync
import Effect.Aff (Aff)
import Effect.Aff as Aff
import Effect.Aff.Class (class MonadAff)
import Effect.Aff.AVar (empty, take) as AVar
import Effect.Aff.Retry
( RetryPolicy
, constantDelay
, limitRetriesByCumulativeDelay
, recovering
)
import Effect.Class (class MonadEffect)
import Effect.Console (log)
import Effect.Exception (Error, error, throw)
import Effect.Ref (Ref)
import Effect.Ref (modify_, new) as Ref
import Foreign.Object as Object
import Node.ChildProcess (defaultSpawnOptions)
import Node.ChildProcess (defaultSpawnOptions, stdout)
import Node.ChildProcess as Node.ChildProcess
import Node.Encoding (Encoding(UTF8))
import Node.FS.Sync (readdir) as FSSync
import Node.FS.Sync as Node.FS
import Node.Path (FilePath)
import Node.Process as Node.Process
import Node.Stream (onDataString)

type Channels a =
{ stderr :: EventSource a
Expand Down Expand Up @@ -208,11 +211,12 @@ startKupo cfg params cleanupRef = do
startTestnetCluster
:: TestnetConfig
-> Ref (Array (Aff Unit))
-> Logger
-> Aff StartedTestnetCluster
startTestnetCluster cfg cleanupRef = do
startTestnetCluster cfg cleanupRef logger = do
{ testnet, channels, workdirAbsolute } <-
annotateError "Could not start cardano-testnet" $
startCardanoTestnet cfg.clusterConfig cleanupRef
startCardanoTestnet cfg.clusterConfig cleanupRef logger

{ paths } <- waitUntil (Milliseconds 4000.0)
$ map hush
Expand Down Expand Up @@ -312,6 +316,7 @@ spawnCardanoTestnet workdir params = do
startCardanoTestnet
:: TestnetClusterConfig
-> TestnetCleanupRef
-> Logger
-> Aff
{ testnet :: ManagedProcess
, channels ::
Expand All @@ -320,52 +325,51 @@ startCardanoTestnet
}
, workdirAbsolute :: FilePath
}
startCardanoTestnet params cleanupRef = annotateError "startCardanoTestnet" do
workdir <- tmpdirUnique "cardano-testnet"
testnet <- scheduleCleanup
cleanupRef
(spawnCardanoTestnet workdir params)
stopProcessWithChildren
channels <- liftEffect $ getChannels testnet
workspace <- waitUntil (Milliseconds 100.0) $ findWorkspaceDir workdir
scheduleWorkspaceCleanup workspace
redirectStreams channels workspace
workspaceFromLogs <- waitForCardanoTestnetWorkspace channels.stdout
when (workspace /= workspaceFromLogs) do
runCleanup cleanupRef
throwError $ error "cardano-testnet workspace mismatch"
attachStdoutMonitors testnet
log "startCardanoTestnet:done"
pure { testnet, workdirAbsolute: workspace, channels }
startCardanoTestnet params cleanupRef logger =
annotateError "startCardanoTestnet" do
workdir <- tmpdirUnique "cardano-testnet"
testnet@(ManagedProcess _ testnetProcess _) <- scheduleCleanup
cleanupRef
(spawnCardanoTestnet workdir params)
stopProcessWithChildren

workspaceFromLogsAvar <- AVar.empty
liftEffect $ onDataString (stdout testnetProcess) UTF8 \str -> do
let lines = String.split (Pattern "\n") str
traverse_
( \line -> do
logger Trace $ "[cardano-testnet:stdout] " <> line
let
mWorkspace = String.stripPrefix (Pattern "Workspace: ") $
String.trim line
maybe (pure unit)
(void <<< flip AVarSync.tryPut workspaceFromLogsAvar)
mWorkspace
)
lines

workspace <- waitUntil (Milliseconds 100.0) $ findWorkspaceDir workdir
-- Schedule a cleanup immediately after the workspace
-- directory is created.
scheduleWorkspaceCleanup workspace
-- Wait for cardano-testnet to output the workspace, indicating
-- that initialization is complete.
workspaceFromLogs <- AVar.take workspaceFromLogsAvar

when (workspace /= workspaceFromLogs) do
runCleanup cleanupRef
-- this error should never happen
throwError $ error "cardano-testnet workspace mismatch"

channels <- liftEffect $ getChannels testnet
attachStdoutMonitors testnet
pure { testnet, workdirAbsolute: workspace, channels }
where
findWorkspaceDir :: forall m. MonadEffect m => FilePath -> m (Maybe FilePath)
findWorkspaceDir workdir =
liftEffect $ map (concatPaths workdir) <<< Array.head <$>
FSSync.readdir workdir

redirectStreams :: StdStreams -> FilePath -> Aff Unit
redirectStreams channels workspace =
void $ redirectChannels channels
{ stdoutTo:
{ log: Just $ workspace <</>> "cardano-testnet.stdout.log"
, console: Nothing
}
, stderrTo:
{ log: Just $ workspace <</>> "cardano-testnet.stderr.log"
, console: Nothing
}
}

waitForCardanoTestnetWorkspace
:: forall m
. MonadAff m
=> EventSource String
-> m FilePath
waitForCardanoTestnetWorkspace =
liftAff
<<< flip waitFor
(String.stripPrefix (Pattern "Workspace: ") <<< String.trim)

attachStdoutMonitors :: ManagedProcess -> Aff Unit
attachStdoutMonitors testnet =
void $ Aff.forkAff $
Expand All @@ -388,7 +392,7 @@ startCardanoTestnet params cleanupRef = annotateError "startCardanoTestnet" do
_ -> true
when shouldCleanup do
addCleanup cleanupRef $ liftEffect do
log $ "Cleaning up cardano-testnet workspace: " <> workspace
logger Trace $ "Cleaning up cardano-testnet workspace: " <> workspace
_rmdirSync workspace

type StdStreams =
Expand Down Expand Up @@ -537,7 +541,7 @@ makeNaiveClusterContractEnv cfg logger customLogger = do
pure
{ backend
, handle: mkQueryHandle cfg backend
, networkId: MainnetId
, networkId: TestnetId
, logLevel: cfg.logLevel
, customLogger: customLogger
, suppressLogs: cfg.suppressLogs
Expand All @@ -555,25 +559,25 @@ makeNaiveClusterContractEnv cfg logger customLogger = do
makeClusterContractEnv
:: forall r
. Ref (Array (Aff Unit))
-> Record (ClusterConfig r)
-> { updatedConfig :: Record (ClusterConfig r)
, logger :: Logger
, customLogger :: Maybe (LogLevel -> Message -> Aff Unit)
, printLogs :: Aff Unit
, clearLogs :: Aff Unit
}
-> Aff
{ env :: ContractEnv
, clearLogs :: Aff Unit
, printLogs :: Aff Unit
}
makeClusterContractEnv cleanupRef cfg = do
{ updatedConfig
, logger
, customLogger
, printLogs
, clearLogs
} <- liftEffect $ mkLogging cfg
makeClusterContractEnv
cleanupRef
{ updatedConfig, logger, customLogger, printLogs, clearLogs } =
cleanupBracket
cleanupRef
(makeNaiveClusterContractEnv updatedConfig logger customLogger)
stopContractEnv
$ pure
<<< { env: _, printLogs, clearLogs }
(pure <<< { env: _, printLogs, clearLogs })

-- | Kill a process and wait for it to stop listening on a specific port.
stopChildProcessWithPort :: UInt -> ManagedProcess -> Aff Unit
Expand All @@ -588,8 +592,3 @@ stopChildProcessWithPort port childProcess = do
defaultRetryPolicy :: RetryPolicy
defaultRetryPolicy = limitRetriesByCumulativeDelay (Milliseconds 3000.00) $
constantDelay (Milliseconds 100.0)

-- replace with Effect.Console.log to debug. Not providing an option at runtime,
-- because it's just for the CTL developers.
log :: forall m. Monad m => String -> m Unit
log _ = pure unit