resource pipeline_sort() op print_array(int a[1:*]) op sort(var int a[1:*]) op result(int pos, int value) {send} # results optype pipe(int value) {send} # values to sort op worker(int m) returns cap pipe pi {call} process main_routine { int n writes("number of integers? ") read(n) int nums[1:n] write("input integers, separated by whitespace") for [i = 1 to n] { read(nums[i]) } write("original numbers") print_array(nums) sort(nums) write("sorted numbers") print_array(nums) } # Print elements of array a proc print_array(a) { for [i = lb(a) to ub(a)] { write(a[i]) } } # Sort array a into non-decreasing order proc sort(a) { if (ub(a) == 0) { return } cap pipe first_worker # Call worker; get back a capability for its # pipe operation; use the pipe to send all values # in a to the worker. first_worker = worker(ub(a)-lb(a)+1) for [i = lb(a) to ub(a)] { send first_worker(a[i]) } # Gather the results into the right places in a for [i = lb(a) to ub(a)] { int pos, value receive result(pos, value) a[ub(a)+lb(a)-pos] = value } } # Worker receives m integers on mypipe from its # predecessor. It keeps smallest and sends # others on to the next worker. After seeing all # m integers, worker sends smallest to sort, # together with the position (m) in array a in # which smallest is to be placed. proc worker(m) returns pi { int smallest # the smallest seen so far op pipe mypipe pi = mypipe reply # invoker now has a capability for mypipe receive mypipe(smallest) if (m > 1) { # create next instance of worker cap pipe next_worker # pipe to next worker next_worker = worker(m-1) for [i = m-1 downto 1] { int candidate receive mypipe(candidate) # save new value if it is smallest # so far; send other values on if (candidate < smallest) { candidate :=: smallest } send next_worker(candidate) } } send result(m, smallest) # send smallest to sort } end pipeline_sort