From ead1b58738559986614bedf2e66248e20884500c Mon Sep 17 00:00:00 2001 From: Axel Mannhardt Date: Mon, 1 Feb 2016 22:18:00 +0100 Subject: [PATCH] add distributing multiple tasks to one node --- .../Task/Distribution/RunComputation.hs | 8 ++- .../Task/Distribution/TaskDistribution.hs | 55 +++++++++++++------ .../Task/TaskSpawning/TaskSpawning.hs | 2 +- task-distribution.cabal | 1 + 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/src/Control/Distributed/Task/Distribution/RunComputation.hs b/src/Control/Distributed/Task/Distribution/RunComputation.hs index 790ccf0..9c2e41a 100644 --- a/src/Control/Distributed/Task/Distribution/RunComputation.hs +++ b/src/Control/Distributed/Task/Distribution/RunComputation.hs @@ -75,7 +75,7 @@ expandDataSpec :: DataSpec -> IO [DataDef] expandDataSpec (HdfsDataSpec path depth filterPrefix) = do putStrLn $ "looking for files at " ++ path paths <- hdfsListFilesInSubdirsFiltering depth filterPrefix path - putStrLn $ "found " ++ (show paths) + putStrLn $ "found these input files: " ++ (show paths) return $ map HdfsData paths expandDataSpec (SimpleDataSpec numDBs) = return $ mkSimpleDataSpecs numDBs where @@ -97,7 +97,7 @@ hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do initialFilePaths <- HDFS.listFiles path recursiveFiles <- recursiveDescent descendDepth path initialFilePaths logDebug $ "found: " ++ (show recursiveFiles) - return $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter + return $ map trimSlashes $ maybe recursiveFiles (\prefix -> filter ((prefix `isPrefixOf`) . getFileNamePart) recursiveFiles) fileNamePrefixFilter where getFileNamePart path' = let parts = splitOn "/" path' in if null parts then "" else parts !! (length parts -1) recursiveDescent :: Int -> String -> [String] -> IO [String] @@ -107,3 +107,7 @@ hdfsListFilesInSubdirsFiltering descendDepth fileNamePrefixFilter path = do pathsWithChildren <- mapM (\p -> (HDFS.listFiles p >>= \cs -> return (p, cs))) absolute :: IO [(String, [String])] descended <- mapM (\(p, cs) -> if null cs then return [p] else recursiveDescent (n-1) p cs) pathsWithChildren :: IO [[String]] return $ concat descended + trimSlashes :: String -> String + trimSlashes [] = [] -- hadoop-rpc does not work on /paths//with/double//slashes + trimSlashes ('/':'/':rest) = trimSlashes $ '/':rest + trimSlashes (x:rest) = x:(trimSlashes rest) diff --git a/src/Control/Distributed/Task/Distribution/TaskDistribution.hs b/src/Control/Distributed/Task/Distribution/TaskDistribution.hs index a46b704..bf659f8 100644 --- a/src/Control/Distributed/Task/Distribution/TaskDistribution.hs +++ b/src/Control/Distributed/Task/Distribution/TaskDistribution.hs @@ -21,6 +21,7 @@ import Control.Monad.IO.Class import qualified Data.Binary as B (encode) import Data.ByteString.Lazy (ByteString) import qualified Data.ByteString.Lazy.Char8 as BLC +import qualified Data.Map.Strict as Map import qualified Data.Rank1Dynamic as R1 (toDynamic) import Data.Time.Clock (UTCTime, diffUTCTime, NominalDiffTime, getCurrentTime) @@ -146,7 +147,7 @@ executeOnNodes' taskDef dataDefs resultDef slaveNodes = do masterProcess <- getSelfPid config <- liftIO getConfiguration before <- liftIO getCurrentTime - taskResults <- distributeWorkForNodes masterProcess (_distributionStrategy config) taskDef dataDefs resultDef slaveNodes + taskResults <- distributeWorkForNodes masterProcess 1 (_distributionStrategy config) taskDef dataDefs resultDef slaveNodes after <- liftIO getCurrentTime say $ "total time: " ++ show (diffUTCTime after before) mapM_ say $ map showRunStat $ snd taskResults @@ -168,33 +169,51 @@ showRunStat (n, totalTaskTime, remoteStat) = taskLoadDur = deserializeTimeDiff $ _remoteTaskLoadDuration remoteStat execTaskDur = deserializeTimeDiff $ _remoteExecTaskDuration remoteStat +type NodeOccupancy = Map.Map NodeId Int + +occupyNode :: NodeId -> NodeOccupancy -> NodeOccupancy +occupyNode = Map.adjust (+1) + +unoccupyNode :: NodeId -> NodeOccupancy -> NodeOccupancy +unoccupyNode = Map.adjust decrement + where decrement = flip (-) 1 + {-| Tries to find work for every worker node, looking at all tasks, forgetting the node if no task is found. Note: tries to be open to other result types but assumes a list of results, as these can be concatenated over multiple tasks. Requires result entries to be serializable, not the complete result - confusing this can cause devastatingly misleading compiler complaints about Serializable. |-} -distributeWorkForNodes :: forall entry . (Serializable entry) => ProcessId -> DistributionStrategy -> TaskDef -> [DataDef] -> ResultDef -> [NodeId] -> Process ([entry], [TaskRunStat]) -distributeWorkForNodes masterProcess strategy taskDef dataDefs resultDef allNodes = doItFor ([], 0, allNodes, dataDefs) +distributeWorkForNodes :: forall entry . (Serializable entry) => ProcessId -> Int -> DistributionStrategy -> TaskDef -> [DataDef] -> ResultDef -> [NodeId] -> Process ([entry], [TaskRunStat]) +distributeWorkForNodes masterProcess maxTasksPerNode strategy taskDef dataDefs resultDef allNodes = doItFor ([], 0, foldr (flip Map.insert 0) Map.empty allNodes, dataDefs) where - doItFor :: ([([entry], TaskRunStat)], Int, [NodeId], [DataDef]) -> Process ([entry], [TaskRunStat]) + doItFor :: ([([entry], TaskRunStat)], Int, NodeOccupancy, [DataDef]) -> Process ([entry], [TaskRunStat]) doItFor (collected, 0, _, []) = return $ let (res, stat) = unzip collected in (concat $ reverse res, stat) -- everything processed - doItFor (collected, resultsWaitingOn, freeNodes, []) = collectNextResult collected freeNodes [] resultsWaitingOn -- everything distributed, but still results to be collected - doItFor (collected, resultsWaitingOn, [], undistributedTasks) = collectNextResult collected [] undistributedTasks resultsWaitingOn -- no unoccupied workers, wait results to come back - doItFor (collected, resultsWaitingOn, freeNode:freeNodes, undistributedTasks) = do -- find a suitable task for an unoccupied nodes - (suitableTask, remainingTasks) <- findSuitableTask strategy + doItFor (collected, resultsWaitingOn, nodeOccupancy, []) = collectNextResult collected nodeOccupancy [] resultsWaitingOn -- everything distributed, but still results to be collected + doItFor (collected, resultsWaitingOn, nodeOccupancy, undistributedTasks) + | noFreeNodes nodeOccupancy = liftIO (logInfo $ "no unoccupied workers, wait results to come back: "++show nodeOccupancy) + >> collectNextResult collected nodeOccupancy undistributedTasks resultsWaitingOn -- + | otherwise = let freeNodes = nextFreeNodes nodeOccupancy + freeNode = if null freeNodes then error "no free nodes" else head freeNodes :: NodeId + in do + liftIO $ logInfo $ "finding a suitable task for the next unoccupied node: "++show freeNode++" - occupations: "++show nodeOccupancy + (suitableTask, remainingTasks) <- findSuitableTask strategy freeNode maybe - (doItFor (collected, resultsWaitingOn, freeNodes, remainingTasks)) -- no further work for this node available, discard it for distribution + (doItFor (collected, resultsWaitingOn, nodeOccupancy, remainingTasks)) -- no further work for this node available, discard it for distribution (\t -> do -- regular distribution say $ "spawning on: " ++ (show $ freeNode) spawnSlaveProcess masterProcess taskDef t resultDef freeNode - doItFor (collected, resultsWaitingOn+1, freeNodes, remainingTasks)) + doItFor (collected, resultsWaitingOn+1, occupyNode freeNode nodeOccupancy, remainingTasks)) suitableTask where - findSuitableTask :: DistributionStrategy -> Process (Maybe DataDef, [DataDef]) - findSuitableTask AnywhereIsFine = return $ if null undistributedTasks then (Nothing, []) else (Just (head undistributedTasks), tail undistributedTasks) - findSuitableTask FirstTaskWithData = findSuitableTask' [] undistributedTasks + noFreeNodes = null . nextFreeNodes + nextFreeNodes :: NodeOccupancy -> [NodeId] + nextFreeNodes = Map.keys . Map.filter (< maxTasksPerNode) + findSuitableTask :: DistributionStrategy -> NodeId -> Process (Maybe DataDef, [DataDef]) + findSuitableTask AnywhereIsFine _ = return $ if null undistributedTasks then (Nothing, []) else (Just (head undistributedTasks), tail undistributedTasks) + findSuitableTask FirstTaskWithData freeNode = findSuitableTask' [] undistributedTasks where + findSuitableTask' :: [DataDef] -> [DataDef] -> Process (Maybe DataDef, [DataDef]) findSuitableTask' notSuitable [] = return (Nothing, reverse notSuitable) findSuitableTask' notSuitable (t:rest) = do allNodesSuitableForTask <- findNodesWithData' t @@ -202,14 +221,15 @@ distributeWorkForNodes masterProcess strategy taskDef dataDefs resultDef allNode then return (Just t, reverse notSuitable ++ rest) else findSuitableTask' (t:notSuitable) rest where - findNodesWithData' (HdfsData loc) = liftIO $ findNodesWithData loc allNodes -- TODO this listing is not really efficient for this approach ... + findNodesWithData' :: DataDef -> Process [NodeId] + findNodesWithData' (HdfsData loc) = liftIO $ findNodesWithData loc allNodes -- TODO this listing is not really efficient for this approach, caching necessary? findNodesWithData' (PseudoDB _) = return allNodes -- no data locality strategy for simple pseudo db - collectNextResult collected freeNodes undistributedTasks resultsWaitingOn = do + collectNextResult collected nodeOccupancy undistributedTasks resultsWaitingOn = do say $ "waiting for a result" (taskMetaData, maybeNextResult) <- collectSingle say $ "got result from: " ++ (show $ _slaveNodeId taskMetaData) let updatedResults = maybe collected (:collected) maybeNextResult in -- no restarts for failed tasks for now - doItFor (updatedResults, resultsWaitingOn-1, _slaveNodeId taskMetaData:freeNodes, undistributedTasks) + doItFor (updatedResults, resultsWaitingOn-1, unoccupyNode (_slaveNodeId taskMetaData) nodeOccupancy, undistributedTasks) collectSingle :: forall entry . (Serializable entry) => Process (TaskMetaData, Maybe ([entry], TaskRunStat)) collectSingle = receiveWait [ @@ -299,8 +319,7 @@ handleSlaveResult dataDef resultDef (taskResult, runStat) acceptTime processingD handlePlainResult _ ReturnAsMessage plainResult = return plainResult handlePlainResult _ (HdfsResult _ _) _ = error "storage of a plain result (some distribution methods) to hdfs currently not supported" handlePlainResult _ ReturnOnlyNumResults plainResult = return $ [BLC.pack $ show $ length plainResult] - handleFileResult (HdfsData _) ReturnAsMessage resultFilePath = logWarn ("Reading result from file: "++resultFilePath++", with hdfs input this is probably unnecesary imperformant for larger results") - >> readResultFromFile resultFilePath + handleFileResult (HdfsData _) ReturnAsMessage resultFilePath = logInfo ("Reading result from file: "++resultFilePath) >> readResultFromFile resultFilePath handleFileResult _ ReturnAsMessage resultFilePath = readResultFromFile resultFilePath handleFileResult _ ReturnOnlyNumResults _ = error "not implemented for only returning numbers" handleFileResult (HdfsData path) (HdfsResult outputPrefix outputSuffix) resultFilePath = wrapHdfsAction $ copyToHdfs resultFilePath (outputPrefix++restpath) (filename'++outputSuffix) diff --git a/src/Control/Distributed/Task/TaskSpawning/TaskSpawning.hs b/src/Control/Distributed/Task/TaskSpawning/TaskSpawning.hs index 27c5636..9892f2c 100644 --- a/src/Control/Distributed/Task/TaskSpawning/TaskSpawning.hs +++ b/src/Control/Distributed/Task/TaskSpawning/TaskSpawning.hs @@ -90,7 +90,7 @@ isZippedSuffix = isSuffixOf ".gz" onFirst :: (a -> a') -> (a, b, c) -> (a', b, c) onFirst f (a, b, c) = (f a, b, c) -loadData :: Configuration -> DataDef -> IO TaskResult +loadData :: Configuration -> DataDef -> IO TaskInput loadData config (HdfsData hdfsLocation) = HDS.loadEntries (_hdfsConfig config, hdfsLocation) loadData _ (PseudoDB numDB) = SDS.loadEntries ("resources/pseudo-db/" ++ (show numDB)) -- TODO make relative path configurable? diff --git a/task-distribution.cabal b/task-distribution.cabal index 5d4f4c6..632d350 100644 --- a/task-distribution.cabal +++ b/task-distribution.cabal @@ -65,6 +65,7 @@ library , zlib , hadoop-rpc , vector + , containers default-language: Haskell2010 executable example