%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % % % Eden Porting Project, Philipps-Universität Marburg % % % % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% The following Haskell module defines implementation skeletons for the parallel functional language Eden. \begin{code} {-# OPTIONS -cpp #-} module EdenMapSkels (-- process schemes :: parMap, farm, ssf, workpool, -- map skeletons :: (a -> b) -> [a] -> [b] map_par, map_farm, map_ssf, -- helper functions to be used externally: -- Int -> [a] -> [[a]] / [[a]] -> [a] -- unshuffle, shuffle, splitIntoN, unSplit, chunk, unchunk, -- simple sequential list strategies spine, whnfspine ) where import Eden import List import Maybe(maybeToList,mapMaybe) import System.IO.Unsafe import Control.Monad import Control.Concurrent import Control.Parallel.Strategies(Strategy, seqList, r0, rwhnf, using) -- We restrict the import to avoid all GpH strategies which use "par". import Control.Parallel (pseq) -- use pseq as seq import Prelude hiding (seq) seq = pseq \end{code} \newpage \section{Eden Map Skeletons} This section provides essential Eden Skeletons for \verb!map!, as described in research papers: ParMap, Farm, Direct Mapping Farm, Workpool. \subsection{Basic Process Schemes for map Skeletons} \begin{code} -- basic: parMap parMap :: (Trans a, Trans b) => (Process a b) -- worker process -> [a] -- tasks -> [b] -- results parMap p tasks = spawn (repeat p) tasks -- process farms farm :: (Trans a, Trans b) => Int -> (Int -> [a] -> [[a]]) -- n, distribute -> ([[b]] -> [b]) -- combine -> (Process [a] [b]) -- worker process -> [a] -> [b] -- what to do -- normal farm np distr combine p tasks = combine (parMap p (distr np tasks)) -- self service farm (alias direct mapping) ssf :: (Trans a, Trans b) => Int -> (Int -> [a] -> [[a]]) -> ([[b]] -> [b]) -> ([a] -> Process () [b]) -> [a] -> [b] ssf np distribute combine p xs = combine ( spawn [ p (tasks i) | i <- [0..(np-1)]] (replicate np ())) where tasks i = (distribute np xs) !! i \end{code} \subsection{Resulting map Skeletons} The Process Schemes presented up to now can be used to define implementation skeletons which implement a parallel map with the same type as the sequential one: \begin{code} map_par, map_farm, map_ssf:: (Trans a , Trans b) => (a -> b) -> [a] -> [b] map_par f = parMap $ process f map_farm f = (farm noPe unshuffle shuffle ). process $ map f map_ssf f xs = ssf noPe unshuffle shuffle procf xs where procf xs = process (\() -> map f xs) \end{code} \subsection{Dynamic task distribution} The workpool skeletons uses non-deterministic merge function to achieve dynamic load balancing. \begin{code} -- simple workpool workpool :: (Trans t, Trans r) => Int -> Int -> Process [t] [r] -> [t] -> [r] workpool np prefetch worker tasks = map (\ (id,res) -> res) fromWorkers where fromWorkers = merge (tagWithPids (parMap worker taskss)) taskss = distribute np (initialReqs ++ newReqs) tasks initialReqs = concat (replicate prefetch [0..np-1]) newReqs = map (\ (id,res) -> id) fromWorkers \end{code} --helper \subsection{Auxiliary Functions} \begin{code} unshuffle :: Int -> [a] -> [[a]] unshuffle n xs = unshuffle xs where unshuffle xs = map (f xs) [0..n-1] where f xs i = g (drop i xs) g [] = [] g xs = head xs : (g (drop n xs)) -- simple shuffling (not incremental!) shuffle :: [[a]] -> [a] shuffle = concat . transpose {- bresenham computes [i1, ..., ip] such that i1 + ... + ip = n and | ij - ik | <= 1, for all 1 <= j,k <= n (from computer graphics for printing smooth lines) -} bresenham :: Int -> Int -> [Int] bresenham n p = take p (bresenham1 n) where bresenham1 m = (m `div` p) : bresenham1 ((m `mod` p)+ n) {- Parameterized list splitting: splitIntoN distributes one list on n lists with Bresenham distribution (equal distribution without precondition on length) -} splitIntoN :: Int -> [a] -> [[a]] splitIntoN n xs = f bh xs where bh = bresenham (length xs) n f [] [] = [] f [] _ = error "some elements left over" f (t:ts) xs = hs : (f ts rest) where (hs,rest) = splitAt t xs unSplit :: [[a]] -> [a] unSplit = concat {- chunk is the simple variant, filling the last list with less elements (works best on lists of length k*n) -} chunk :: Int -> [a] -> [[a]] chunk _ [] = [] chunk n xs = ys : chunk n zs where (ys,zs) = splitAt n xs unchunk :: [[a]] -> [a] unchunk = concat \end{code} \subsubsection{Distribution and Combination functions for Master-Worker-Skeletons} Task distribution according to worker requests. \begin{code} distribute :: Int -> [Int] -> [t] -> [[t]] distribute np reqs tasks = [taskList reqs tasks n | n<-[0..np-1]] where taskList (r:rs) (t:ts) pe | pe == r = t:(taskList rs ts pe) | otherwise = taskList rs ts pe taskList _ _ _ = [] tagWithPids :: [[r]] -> [[(Int,r)]] tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss] \end{code} mergeByTags: join sorted lists into one sorted list uses a binary combination scheme to merge several runs into one, this ensures that the smallest runs are merged first. \begin{code} mergeByTags :: [[(Int,r)]] -> [(Int,r)] mergeByTags [] = [] mergeByTags [wOut] = wOut mergeByTags [w1,w2] = merge2ByTag w1 w2 mergeByTags wOuts = merge2ByTag (mergeHalf wOuts) (mergeHalf (tail wOuts)) where mergeHalf = mergeByTags . (takeEach 2) takeEach n [] = [] takeEach n xs@(x:_) = x:takeEach n (drop n xs) merge2ByTag [] w2 = w2 merge2ByTag w1 [] = w1 merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s) | i < j = r1: merge2ByTag w1s w2 | i > j = r2: merge2ByTag w1 w2s | otherwise = error "found tags i == j" \end{code} \subsection{Simple sequential List strategies} \begin{code} spine, whnfspine :: Strategy [a] -- spine strategy: unfold a list structure, leaving elements alone spine = seqList r0 -- whnfspine strategy: unfold a list structure, evaluating list elements to whnf whnfspine = seqList rwhnf \end{code}