Omgo's Blog

October 24, 2010

parallel quicksort with java fork join framework JSR 166y

Filed under: Core Java — aswin @ 9:39 pm
Fork Join (jsr 166y) framework is slated for Java 7 and is absolutely cool and is no surprise coming from Doug Lea.  I had to try something out with it since the packages compiled on java 6 is also availalbe. 

I tried parallizing the quicksort algorithm using this and looks like it compiles and even produces right results!!!  The important part here is the use of Phaser, a resettable CountdownLatch if you would.  So basically you create a Phaser and pass it along to the root (first) RecursiveAction object.
Each of the RecursiveAction object (in this example an instance of ParallelQuckSort) increments the Phaser counter by using the regiter() method and when they are done their work (sorting or spawning children to do
that) they "arrive" and "unregister" at the Phaser bringing down the phaser counter by one. When all the RecursiveAction (spawned by the recursion) are done, the root one waiting on the Phaser lock would resume and return to the client, at which
point the array would have been sorted. 

Any way here it is the source for what I have done, any comments and suggestions are welcome.
package test.aswin.concurrency.forkjoin;

import jsr166y.*;
import jsr166y.Phaser;

import java.util.Arrays;

/**
 * Example of using JSR 166 Y for a simple parallel quick sort.
 */
public class ParallelQuickSort extends RecursiveAction {
    Phaser phaser;
    int[] arr = null;
    int left;
    int right;

    ParallelQuickSort(Phaser phaser, int[] arr) {
        this(phaser, arr, 0, arr.length - 1);
    }

    ParallelQuickSort(Phaser phaser, int[] arr, int left, int right) {
        this.phaser = phaser;
        this.arr = arr;
        this.left = left;
        this.right = right;
        phaser.register();  //important
    }


    private ParallelQuickSort leftSorter(int pivotI) {
        return new ParallelQuickSort(phaser, arr, left, --pivotI);
    }

    private ParallelQuickSort rightSorter(int pivotI) {
        return new ParallelQuickSort(phaser, arr, pivotI, right);
    }

    private void recurSort(int leftI, int rightI) {
        if (rightI - leftI > 7) {
            int pIdx = partition(leftI, rightI, getPivot(arr, leftI, rightI));
            recurSort(leftI, pIdx - 1);
            recurSort(pIdx, rightI);
        } else if (rightI - leftI > 0) {
            insertionSort(leftI, rightI);
        }
    }


    @Override
    protected void compute() {
        if (right - left > 1000) {   // if more than 1000 (totally arbitrary number i chose) try doing it parallelly
            int pIdx = partition(left, right, getPivot(arr, left, right));
            leftSorter(pIdx).fork();
            rightSorter(pIdx).fork();

        } else if (right - left > 7) {  // less than 1000 sort recursively in this thread
            recurSort(left, right);

        } else if (right - left > 0) {  //if less than 7 try simple insertion sort
            insertionSort(left, right);
        }

        if (isRoot()) { //if this instance is the root one (the one that started the sort process), wait for others
                        // to complete.
            phaser.arriveAndAwaitAdvance();
        } else {  // all not root one just arrive and de register not waiting for others.
            phaser.arriveAndDeregister();
        }
    }

    /** Patition the array segment based on the pivot   **/
    private int partition(int startI, int endI, int pivot) {
        for (int si = startI - 1, ei = endI + 1; ; ) {
            for (; arr[++si] < pivot;) ;
            for (; ei > startI && arr[--ei] > pivot ; ) ;
            if (si >= ei) {
                return si;
            }
            swap(si, ei);
        }
    }

    private void insertionSort(int leftI, int rightI) {
        for (int i = leftI; i < rightI + 1; i++)
            for (int j = i; j > leftI && arr[j - 1] > arr[j]; j--)
                swap(j, j - 1);

    }

    private void swap(int startI, int endI) {
        int temp = arr[startI];
        arr[startI] = arr[endI];
        arr[endI] = temp;
    }

    /**
     * Check to see if this instance is the root, i.e the first one used to sort the array.
     * @return
     */
    private boolean isRoot() {
        return arr.length == (right - left) + 1;
    }

    /**
     * copied from java.util.Arrays
     */
    private int getPivot(int[] arr, int startI, int endI) {
        int len = (endI - startI) + 1;
        // Choose a partition element, v
        int m = startI + (len >> 1);       // Small arrays, middle element
        if (len > 7) {
            int l = startI;
            int n = startI + len - 1;
            if (len > 40) {        // Big arrays, pseudomedian of 9
                int s = len / 8;
                l = med3(arr, l, l + s, l + 2 * s);
                m = med3(arr, m - s, m, m + s);
                n = med3(arr, n - 2 * s, n - s, n);
            }
            m = med3(arr, l, m, n); // Mid-size, med of 3
        }
        int v = arr[m];
        return v;
    }

    /**
     * copied from java.util.Arrays
     */
    private static int med3(int x[], int a, int b, int c) {
        return (x[a] < x[b] ?
                (x[b] < x[c] ? b : x[a] < x[c] ? c : a) :
                (x[b] > x[c] ? b : x[a] > x[c] ? c : a));
    }

    public static void main(String[] args) throws InterruptedException {
        int[] arr = ArrayUtils.generateRandom(1000000);
        ForkJoinPool pool = new ForkJoinPool();


        for (int i = 0; i < 10; i++) {
            System.out.println("Sarting " + i);
            int[] arrclone1 = arr.clone();
            int[] arrclone2 = arr.clone();

            long l = System.nanoTime();
            Arrays.sort(arrclone1);
            System.out.printf("Times for seq %,8d%n", (System.nanoTime() - l));

/*          // sorted (asc and desc) are faster than unsorted one, while trying parallel sort
            for (int left=0,right=arrclone1.length-1; left<right; left++, right--) {
                // exchange the first and last
                int temp = arrclone1[left]; arrclone1[left]  = arrclone1[right]; arrclone1[right] = temp;
            }
*/


            l = System.nanoTime();
            Phaser phaser = new Phaser();
            pool.invoke(new ParallelQuickSort(phaser, arrclone2));
            System.out.printf("Times %,8d%n", (System.nanoTime() - l));
            System.out.println("Is sorted :>" + ArrayUtils.isSorted(arrclone2, true));
        }
    }
}
One thing I tried and failed was to parallelize the partition operation which is single threaded in this example. The thing I wrote for that worked, but had horrible performance so I am not including it here. What I
did basically was to have two threads parallely swapping even and odd indexed items respectively. I think processor locality of the data and synchronizing the elements across cpu cache may be the issue, but I am
investigating it further and with help of the experts out there, may find out why it is perforing bad. If anyone wants to help me out with this, please let me know :)

Advertisements

July 14, 2009

Good “observations” on java string references

Filed under: Core Java — Tags: — aswin @ 6:47 pm

There are stuff in core java which you puzzle you irrespective of the decade long experiences you may have. One such thing regarding Strings as weakhashmap keys is here. Pretty interesting read, I guarantee 🙂

Blog at WordPress.com.