Skip to content

Commit

Permalink
haddock documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Axel Mannhardt committed Mar 17, 2016
1 parent 9ac391e commit 41f5e34
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import qualified System.Log.Logger as L

import Control.Distributed.Task.Util.Logging (initLogging)

-- | Sets up hslogger with a part logfile, part stdout configuration.
initDefaultLogging :: String -> IO ()
initDefaultLogging suffix = do
-- progName <- getExecutablePath
Expand Down
27 changes: 22 additions & 5 deletions src/Control/Distributed/Task/Distribution/RunComputation.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{-|
Defines a higher level interface to running calculations. Resolves HDFS input paths.
|-}
module Control.Distributed.Task.Distribution.RunComputation (
MasterOptions(..),
TaskSpec(..),
Expand All @@ -19,32 +22,46 @@ import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging

-- | The definition of a distributed calculation.
data MasterOptions = MasterOptions {
-- | the master hostname
_host :: String,
-- | the master port
_port :: Int,
-- | the task logic
_taskSpec :: TaskSpec,
-- | which data to process
_dataSpecs :: DataSpec,
-- | how to process the result
_resultSpec :: ResultSpec
}

{-
ObjectCodeModuleDeployment:
- the function here is ignored, it only forces the compilation of the contained module
- this could contain configurations for the object code file path etc. in the future
-}
-- | Task logic definition, most modes expect task mode support, see RemoteExecutionSupport.
data TaskSpec
-- | build the given string as module remotely (restrictions apply)
= SourceCodeSpec String
-- | run this binary as task
| FullBinaryDeployment
-- | serialize the given function in the context of the given program, run both as task (restrictions apply)
| SerializedThunk (TaskInput -> TaskResult)
-- | only transport some of the generated object code and relink remotely (restrictions apply) - the function here is ignored, it only forces the compilation of the contained module
| ObjectCodeModuleDeployment (TaskInput -> TaskResult)
-- | definition of input data
data DataSpec
-- | simple test data, the path is configured, amount of files can be limited
= SimpleDataSpec Int
-- | use given HDFS as starting directory, descend a number of directories from there and take all files starting with the filter prefix (if any given)
| HdfsDataSpec HdfsPath Int (Maybe String)
-- | what to do with the result
data ResultSpec
-- | process all results with the given method
= CollectOnMaster ([TaskResult] -> IO ())
-- | store the results in HDFS, in the given directory(1), with the given suffix (2), based on the input path.
| StoreInHdfs String String
-- | do nothing, for testing purposes only
| Discard

-- | Run a computation.
runMaster :: MasterOptions -> IO ()
runMaster (MasterOptions masterHost masterPort taskSpec dataSpec resultSpec) = do
taskDef <- buildTaskDef taskSpec
Expand Down
20 changes: 19 additions & 1 deletion src/Control/Distributed/Task/Distribution/TaskDistribution.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{-|
Contains all node communication (using Cloud Haskell). This includes distribution logic.
-}
module Control.Distributed.Task.Distribution.TaskDistribution (
startSlaveNode,
executeDistributed,
Expand Down Expand Up @@ -88,7 +91,7 @@ __remoteTable =

slaveTaskClosure :: TaskTransport -> S.Closure (Process ())
slaveTaskClosure =
-- $(mkClosure 'slaveTask)
-- \$(mkClosure 'slaveTask)
-- ======>
((S.closure
(slaveTask__static
Expand Down Expand Up @@ -121,13 +124,19 @@ rtable = __remoteTable $ initRemoteTable

type NodeConfig = (String, Int)

{-|
Start a slave listening on given hostname, port.
-}
startSlaveNode :: NodeConfig -> IO ()
startSlaveNode (host, port) = do
initDefaultLogging (show port)
backend <- initializeBackend host (show port) rtable
putStrLn "initializing slave"
startSlave backend

{-|
Run a calculation on all accessible slaves. This is a low-level method, look at the RunComputaiton module for a nicer interface.
-}
executeDistributed :: NodeConfig -> TaskDef -> [DataDef] -> ResultDef -> ([TaskResult] -> IO ())-> IO ()
executeDistributed (host, port) taskDef dataDefs resultDef resultProcessor = do
backend <- initializeBackend host (show port) rtable
Expand Down Expand Up @@ -325,10 +334,16 @@ handlePrepareSlave :: Int -> ByteString -> Process PrepareSlaveResponse
handlePrepareSlave hash content = liftIO (RemoteStore.put hash content) >> return PreparationFinished

-- simple tasks
{-|
List all accessible slaves.
-}
showSlaveNodes :: NodeConfig -> IO ()
showSlaveNodes config = withSlaveNodes config (
\slaveNodes -> putStrLn ("Slave nodes: " ++ show slaveNodes))

{-|
List all accessible slaves that have at least a single block of the specified path stored physically.
-}
showSlaveNodesWithData :: NodeConfig -> String -> IO ()
showSlaveNodesWithData slaveConfig hdfsFilePath = withSlaveNodes slaveConfig (
\slaveNodes -> do
Expand All @@ -340,6 +355,9 @@ withSlaveNodes (host, port) action = liftIO $ do
backend <- initializeBackend host (show port) initRemoteTable
startMaster backend (\slaveNodes -> liftIO (action slaveNodes))

{-|
Convenience method to stop all accessible slaves remotely.
-}
shutdownSlaveNodes :: NodeConfig -> IO ()
shutdownSlaveNodes (host, port) = do
backend <- initializeBackend host (show port) rtable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import Control.Distributed.Task.Types.TaskTypes
import Control.Distributed.Task.Util.Configuration
import Control.Distributed.Task.Util.Logging

-- | Combines all defined task mode hooks.
withRemoteExecutionSupport :: (TaskInput -> TaskResult) -> IO () -> IO ()
withRemoteExecutionSupport fn = withSerializedThunkRemoteExecutionSupport . withFullBinaryRemoteExecutionSupport fn

-- | Provides support for fullbinary task mode.
withFullBinaryRemoteExecutionSupport :: (TaskInput -> TaskResult) -> IO () -> IO ()
withFullBinaryRemoteExecutionSupport fn mainAction = do
args <- getArgs
Expand All @@ -32,6 +34,7 @@ withFullBinaryRemoteExecutionSupport fn mainAction = do
else mainAction
_ -> mainAction

-- | Provides support for serialized thunk task mode.
withSerializedThunkRemoteExecutionSupport :: IO () -> IO ()
withSerializedThunkRemoteExecutionSupport mainAction = do
args <- getArgs
Expand Down
1 change: 1 addition & 0 deletions src/Control/Distributed/Task/Util/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ logTrace _ = return () --simpleLog L.debugM
simpleLog :: (String -> String -> IO ()) -> String -> IO ()
simpleLog levelLogger = levelLogger L.rootLoggerName

-- | Sets up hslogger.
initLogging :: L.Priority -> L.Priority -> FilePath -> IO ()
initLogging stdoutLogLevel fileLogLevel logfile = do
L.updateGlobalLogger L.rootLoggerName (L.removeHandler)
Expand Down

0 comments on commit 41f5e34

Please sign in to comment.