%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%                                                            %
%  Eden Porting Project, Philipps-Universität Marburg        %
%                                                            %
%                                                            %
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

The following Haskell module defines implementation skeletons for 
the parallel functional language Eden.

\begin{code}

{-# OPTIONS -cpp #-}

module EdenMapSkels (-- process schemes :: <varying types>
                 parMap, farm, ssf, workpool,
                 -- map skeletons :: (a -> b) -> [a] -> [b]
                 map_par, map_farm, map_ssf,
                 -- helper functions to be used externally:
                 -- Int -> [a] -> [[a]] / [[a]] -> [a]
                 -- 
                 unshuffle, shuffle,
                 splitIntoN,             unSplit,
                 chunk,                  unchunk,
                 -- simple sequential list strategies
                 spine, whnfspine
                 ) where

import Eden
import List
import Maybe(maybeToList,mapMaybe)

import System.IO.Unsafe
import Control.Monad

import Control.Concurrent
import Control.Parallel.Strategies(Strategy, seqList, r0, rwhnf, using)
 -- We restrict the import to avoid all GpH strategies which use "par".
import Control.Parallel (pseq) -- use pseq as seq
import Prelude hiding (seq)

seq = pseq



\end{code}
\newpage
\section{Eden Map Skeletons}
This section provides essential Eden Skeletons for \verb!map!, as 
described in research papers: ParMap, Farm, Direct Mapping Farm, Workpool.

\subsection{Basic Process Schemes for map Skeletons}
\begin{code}
-- basic: parMap
parMap :: (Trans a, Trans b) => 
                (Process a b)   -- worker process
                -> [a]     -- tasks
                -> [b]     -- results
parMap p tasks = spawn (repeat p) tasks

-- process farms
farm :: (Trans a, Trans b) => 
                Int -> (Int -> [a] -> [[a]]) -- n, distribute
                -> ([[b]] -> [b])            -- combine
                -> (Process [a] [b])         -- worker process
                -> [a] -> [b]                -- what to do
-- normal
farm np distr combine p tasks = 
        combine (parMap p (distr np tasks))

-- self service farm (alias direct mapping)
ssf :: (Trans a, Trans b) => 
	Int -> (Int -> [a] -> [[a]]) -> ([[b]] -> [b]) 
	-> ([a] -> Process () [b]) -> [a] -> [b]
ssf  np distribute combine  p xs 	
   =  combine ( 
          spawn [ p (tasks i) | i <- [0..(np-1)]]
                     (replicate np ()))
       where  tasks i  = (distribute np xs) !! i
\end{code}
\subsection{Resulting map Skeletons}
The Process Schemes presented up to now can be used to define 
implementation skeletons which implement a parallel map with the
same type as the sequential one:

\begin{code}
map_par, map_farm, map_ssf:: (Trans a , Trans b) => 
                             (a -> b) -> [a] -> [b] 
map_par f    = parMap $ process f
map_farm f   = (farm noPe unshuffle shuffle ). process $ map f
map_ssf f xs = ssf noPe unshuffle shuffle procf xs
    where procf xs = process (\() -> map f xs)  

\end{code}
\subsection{Dynamic task distribution}
The workpool skeletons uses non-deterministic merge function to achieve 
dynamic load balancing.

\begin{code}

-- simple workpool
workpool :: (Trans t, Trans r) =>
            Int -> Int -> Process [t] [r] -> [t] -> [r] 
workpool np prefetch worker tasks
   = map (\ (id,res) -> res) fromWorkers
    where   fromWorkers     = merge (tagWithPids (parMap worker taskss))
            taskss          = distribute np (initialReqs ++ newReqs) tasks
            initialReqs     = concat (replicate prefetch [0..np-1])
            newReqs         = map (\ (id,res) -> id) fromWorkers
\end{code}


--helper
\subsection{Auxiliary Functions}
\begin{code}
unshuffle :: Int -> [a] -> [[a]]
unshuffle n xs = unshuffle xs
		where  unshuffle xs = map (f xs) [0..n-1]
				where f xs i = g (drop i xs)
				      g [] = []
				      g xs = head xs : (g (drop n xs))


-- simple shuffling (not incremental!)
shuffle :: [[a]] -> [a]
shuffle = concat . transpose

{- bresenham computes [i1, ..., ip] such that i1 + ... + ip = n
		and | ij - ik | <= 1, for all 1 <= j,k <= n  
    (from computer graphics for printing smooth lines)
-}
bresenham :: Int -> Int -> [Int]
bresenham n p = take p (bresenham1 n)
              where bresenham1 m = (m `div` p) : bresenham1 ((m `mod` p)+ n)


{- Parameterized list splitting: 
splitIntoN distributes one list on n lists with Bresenham distribution
             (equal distribution without precondition on length)
-}
splitIntoN :: Int -> [a] -> [[a]]
splitIntoN n xs = f bh xs
		where bh = bresenham (length xs) n
		      f [] [] = []
		      f [] _  = error "some elements left over"
		      f (t:ts) xs = hs : (f ts rest)
				  where (hs,rest) = splitAt t xs 

unSplit :: [[a]] -> [a]
unSplit = concat

{-
chunk is the simple variant, filling the last list with less elements
             (works best on lists of length k*n)
-}
chunk      :: Int -> [a] -> [[a]]
chunk _ [] =  []
chunk n xs =  ys : chunk n zs
    where (ys,zs) = splitAt n xs

unchunk :: [[a]] -> [a]
unchunk = concat

\end{code}
\subsubsection{Distribution and Combination functions for Master-Worker-Skeletons}
Task distribution according to worker requests.
\begin{code}
distribute :: Int -> [Int] -> [t] -> [[t]]
distribute np reqs tasks = [taskList reqs tasks n | n<-[0..np-1]]
    where taskList (r:rs) (t:ts) pe
                        | pe == r    = t:(taskList rs ts pe)
                        | otherwise  =    taskList rs ts pe
          taskList _      _      _  = []

tagWithPids :: [[r]] -> [[(Int,r)]]
tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss] 
\end{code}

mergeByTags: join sorted lists into one sorted list
  uses a binary combination scheme to merge several runs into one,
  this ensures that the smallest runs are merged first.
\begin{code}
mergeByTags :: [[(Int,r)]] -> [(Int,r)]
mergeByTags [] = []
mergeByTags [wOut] = wOut
mergeByTags [w1,w2] = merge2ByTag w1 w2
mergeByTags wOuts = merge2ByTag
                      (mergeHalf wOuts)
                      (mergeHalf (tail wOuts))
    where mergeHalf = mergeByTags . (takeEach 2)
          takeEach n [] = []
          takeEach n xs@(x:_) = x:takeEach n (drop n xs)

merge2ByTag [] w2 = w2
merge2ByTag w1 [] = w1
merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s)
                        | i < j = r1: merge2ByTag w1s w2
                        | i > j = r2: merge2ByTag w1 w2s
                        | otherwise = error "found tags i == j"
\end{code}

\subsection{Simple sequential List strategies}
\begin{code}
spine, whnfspine :: Strategy [a]

-- spine strategy: unfold a list structure, leaving elements alone
spine = seqList r0
-- whnfspine strategy: unfold a list structure, evaluating list elements to whnf
whnfspine = seqList rwhnf
\end{code}