add distributing multiple tasks to one node
Feb 1, 2016
Expand Up @@ -75,7 +75,7 @@ expandDataSpec :: DataSpec -> IO [DataDef]
expandDataSpec (HdfsDataSpec path depth filterPrefix) = do
putStrLn $ "looking for files at " ++ path
paths <- hdfsListFilesInSubdirsFiltering depth filterPrefix path
putStrLn $ "found " ++ (show paths)
putStrLn $ "found these input files: " ++ (show paths)
return $ map HdfsData paths
expandDataSpec (SimpleDataSpec numDBs) = return $ mkSimpleDataSpecs numDBs
Expand All @@ -97,7 +97,7 @@ hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do
initialFilePaths <- HDFS.listFiles path
recursiveFiles <- recursiveDescent descendDepth path initialFilePaths
logDebug $ "found: " ++ (show recursiveFiles)
return $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter
return $ map trimSlashes $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter
getFileNamePart path' = let parts = splitOn "/" path' in if null parts then "" else parts !! (length parts -1)
recursiveDescent :: Int -> String -> [String] -> IO [String]
Expand All @@ -107,3 +107,7 @@ hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do
pathsWithChildren <- mapM (\p -> (HDFS.listFiles p >>= \cs -> return (p, cs))) absolute :: IO [(String, [String])]
descended <- mapM (\(p, cs) -> if null cs then return [p] else recursiveDescent (n-1) p cs) pathsWithChildren :: IO [[String]]
return $ concat descended
trimSlashes :: String -> String
trimSlashes [] = [] -- hadoop-rpc does not work on /paths//with/double//slashes
trimSlashes ('/':'/':rest) = trimSlashes $ '/':rest
trimSlashes (x:rest) = x:(trimSlashes rest)
55 changes: 37 additions & 18 deletions src/Control/Distributed/Task/Distribution/TaskDistribution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Control.Monad.IO.Class
import qualified Data.Binary as B (encode)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Map.Strict as Map
import qualified Data.Rank1Dynamic as R1 (toDynamic)
import Data.Time.Clock (UTCTime, diffUTCTime, NominalDiffTime, getCurrentTime)

Expand Down Expand Up @@ -146,7 +147,7 @@ executeOnNodes' taskDef dataDefs resultDef slaveNodes = do
masterProcess <- getSelfPid
config <- liftIO getConfiguration
before <- liftIO getCurrentTime
taskResults <- distributeWorkForNodes masterProcess (_distributionStrategy config) taskDef dataDefs resultDef slaveNodes
taskResults <- distributeWorkForNodes masterProcess 1 (_distributionStrategy config) taskDef dataDefs resultDef slaveNodes
after <- liftIO getCurrentTime
say $ "total time: " ++ show (diffUTCTime after before)
mapM_ say $ map showRunStat $ snd taskResults
Expand All @@ -168,48 +169,67 @@ showRunStat (n, totalTaskTime, remoteStat) =
taskLoadDur = deserializeTimeDiff $ _remoteTaskLoadDuration remoteStat
execTaskDur = deserializeTimeDiff $ _remoteExecTaskDuration remoteStat

type NodeOccupancy = Map.Map NodeId Int

occupyNode :: NodeId -> NodeOccupancy -> NodeOccupancy
occupyNode = Map.adjust (+1)

unoccupyNode :: NodeId -> NodeOccupancy -> NodeOccupancy
unoccupyNode = Map.adjust decrement
where decrement = flip (-) 1

Tries to find work for every worker node, looking at all tasks, forgetting the node if no task is found.
Note: tries to be open to other result types but assumes a list of results, as these can be concatenated over multiple tasks. Requires result entries to be serializable, not the
complete result - confusing this can cause devastatingly misleading compiler complaints about Serializable.
distributeWorkForNodes :: forall entry . (Serializable entry) => ProcessId -> DistributionStrategy -> TaskDef -> [DataDef] -> ResultDef -> [NodeId] -> Process ([entry], [TaskRunStat])
distributeWorkForNodes masterProcess strategy taskDef dataDefs resultDef allNodes = doItFor ([], 0, allNodes, dataDefs)
distributeWorkForNodes :: forall entry . (Serializable entry) => ProcessId -> Int -> DistributionStrategy -> TaskDef -> [DataDef] -> ResultDef -> [NodeId] -> Process ([entry], [TaskRunStat])
distributeWorkForNodes masterProcess maxTasksPerNode strategy taskDef dataDefs resultDef allNodes = doItFor ([], 0, foldr (flip Map.insert 0) Map.empty allNodes, dataDefs)
doItFor :: ([([entry], TaskRunStat)], Int, [NodeId], [DataDef]) -> Process ([entry], [TaskRunStat])
doItFor :: ([([entry], TaskRunStat)], Int, NodeOccupancy, [DataDef]) -> Process ([entry], [TaskRunStat])
doItFor (collected, 0, _, []) = return $ let (res, stat) = unzip collected in (concat $ reverse res, stat) -- everything processed
doItFor (collected, resultsWaitingOn, freeNodes, []) = collectNextResult collected freeNodes [] resultsWaitingOn -- everything distributed, but still results to be collected
doItFor (collected, resultsWaitingOn, [], undistributedTasks) = collectNextResult collected [] undistributedTasks resultsWaitingOn -- no unoccupied workers, wait results to come back
doItFor (collected, resultsWaitingOn, freeNode:freeNodes, undistributedTasks) = do -- find a suitable task for an unoccupied nodes
(suitableTask, remainingTasks) <- findSuitableTask strategy
doItFor (collected, resultsWaitingOn, nodeOccupancy, []) = collectNextResult collected nodeOccupancy [] resultsWaitingOn -- everything distributed, but still results to be collected
doItFor (collected, resultsWaitingOn, nodeOccupancy, undistributedTasks)
| noFreeNodes nodeOccupancy = liftIO (logInfo $ "no unoccupied workers, wait results to come back: "++show nodeOccupancy)
>> collectNextResult collected nodeOccupancy undistributedTasks resultsWaitingOn --
| otherwise = let freeNodes = nextFreeNodes nodeOccupancy
freeNode = if null freeNodes then error "no free nodes" else head freeNodes :: NodeId
in do
liftIO $ logInfo $ "finding a suitable task for the next unoccupied node: "++show freeNode++" - occupations: "++show nodeOccupancy
(suitableTask, remainingTasks) <- findSuitableTask strategy freeNode
(doItFor (collected, resultsWaitingOn, freeNodes, remainingTasks)) -- no further work for this node available, discard it for distribution
(doItFor (collected, resultsWaitingOn, nodeOccupancy, remainingTasks)) -- no further work for this node available, discard it for distribution
(\t -> do -- regular distribution
say $ "spawning on: " ++ (show $ freeNode)
spawnSlaveProcess masterProcess taskDef t resultDef freeNode
doItFor (collected, resultsWaitingOn+1, freeNodes, remainingTasks))
doItFor (collected, resultsWaitingOn+1, occupyNode freeNode nodeOccupancy, remainingTasks))
findSuitableTask :: DistributionStrategy -> Process (Maybe DataDef, [DataDef])
findSuitableTask AnywhereIsFine = return $ if null undistributedTasks then (Nothing, []) else (Just (head undistributedTasks), tail undistributedTasks)
findSuitableTask FirstTaskWithData = findSuitableTask' [] undistributedTasks
noFreeNodes = null . nextFreeNodes
nextFreeNodes :: NodeOccupancy -> [NodeId]
nextFreeNodes = Map.keys . Map.filter (< maxTasksPerNode)
findSuitableTask :: DistributionStrategy -> NodeId -> Process (Maybe DataDef, [DataDef])
findSuitableTask AnywhereIsFine _ = return $ if null undistributedTasks then (Nothing, []) else (Just (head undistributedTasks), tail undistributedTasks)
findSuitableTask FirstTaskWithData freeNode = findSuitableTask' [] undistributedTasks
findSuitableTask' :: [DataDef] -> [DataDef] -> Process (Maybe DataDef, [DataDef])
findSuitableTask' notSuitable [] = return (Nothing, reverse notSuitable)
findSuitableTask' notSuitable (t:rest) = do
allNodesSuitableForTask <- findNodesWithData' t
if any (==freeNode) allNodesSuitableForTask
then return (Just t, reverse notSuitable ++ rest)
else findSuitableTask' (t:notSuitable) rest
findNodesWithData' (HdfsData loc) = liftIO $ findNodesWithData loc allNodes -- TODO this listing is not really efficient for this approach ...
findNodesWithData' :: DataDef -> Process [NodeId]
findNodesWithData' (HdfsData loc) = liftIO $ findNodesWithData loc allNodes -- TODO this listing is not really efficient for this approach, caching necessary?
findNodesWithData' (PseudoDB _) = return allNodes -- no data locality strategy for simple pseudo db
collectNextResult collected freeNodes undistributedTasks resultsWaitingOn = do
collectNextResult collected nodeOccupancy undistributedTasks resultsWaitingOn = do
say $ "waiting for a result"
(taskMetaData, maybeNextResult) <- collectSingle
say $ "got result from: " ++ (show $ _slaveNodeId taskMetaData)
let updatedResults = maybe collected (:collected) maybeNextResult in -- no restarts for failed tasks for now
doItFor (updatedResults, resultsWaitingOn-1, _slaveNodeId taskMetaData:freeNodes, undistributedTasks)
doItFor (updatedResults, resultsWaitingOn-1, unoccupyNode (_slaveNodeId taskMetaData) nodeOccupancy, undistributedTasks)

collectSingle :: forall entry . (Serializable entry) => Process (TaskMetaData, Maybe ([entry], TaskRunStat))
collectSingle = receiveWait [
Expand Down Expand Up @@ -299,8 +319,7 @@ handleSlaveResult dataDef resultDef (taskResult, runStat) acceptTime processingD
handlePlainResult _ ReturnAsMessage plainResult = return plainResult
handlePlainResult _ (HdfsResult _ _) _ = error "storage of a plain result (some distribution methods) to hdfs currently not supported"
handlePlainResult _ ReturnOnlyNumResults plainResult = return $ [BLC.pack $ show $ length plainResult]
handleFileResult (HdfsData _) ReturnAsMessage resultFilePath = logWarn ("Reading result from file: "++resultFilePath++", with hdfs input this is probably unnecesary imperformant for larger results")
>> readResultFromFile resultFilePath
handleFileResult (HdfsData _) ReturnAsMessage resultFilePath = logInfo ("Reading result from file: "++resultFilePath) >> readResultFromFile resultFilePath
handleFileResult _ ReturnAsMessage resultFilePath = readResultFromFile resultFilePath
handleFileResult _ ReturnOnlyNumResults _ = error "not implemented for only returning numbers"
handleFileResult (HdfsData path) (HdfsResult outputPrefix outputSuffix) resultFilePath = wrapHdfsAction $ copyToHdfs resultFilePath (outputPrefix++restpath) (filename'++outputSuffix)
2 changes: 1 addition & 1 deletion src/Control/Distributed/Task/TaskSpawning/TaskSpawning.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ isZippedSuffix = isSuffixOf ".gz"
onFirst :: (a -> a') -> (a, b, c) -> (a', b, c)
onFirst f (a, b, c) = (f a, b, c)

loadData :: Configuration -> DataDef -> IO TaskResult
loadData :: Configuration -> DataDef -> IO TaskInput
loadData config (HdfsData hdfsLocation) = HDS.loadEntries (_hdfsConfig config, hdfsLocation)
loadData _ (PseudoDB numDB) = SDS.loadEntries ("resources/pseudo-db/" ++ (show numDB)) -- TODO make relative path configurable?

1 change: 1 addition & 0 deletions task-distribution.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ library
, zlib
, hadoop-rpc
, vector
, containers
default-language: Haskell2010

executable example
