Java ForkJoin Parallel


https://github.com/nivance/Java7Demos/blob/master/forkjoin/MergeSort.java

http://www.programmingopiethehokie.com/2014/07/java-7-forkjoin-and-java-8-streams.html
The fork/join framework provides an easy way to parallelize work that can be broken up into smaller parts. Work is distributed to threads in a pool, and idle threads can steal work from busy threads (work stealing).

Fork/join parallelism is a style of parallel programming useful for exploiting the parallelism inherent in divide and conquer algorithms on shared memory multiprocessors.
The idea is quite simple: a larger task can be divided into smaller tasks whose solutions can then be combined. As long as the smaller tasks are independent, they can be executed in parallel.
Example: finding the maximum value in a large array of integers. 

Example: finding the maximum value in a large array of integers. Each task is given three data:
  • array to search
  • start and end indices to search
The computation starts with the creation of a "root task", which represents the overall problem.
rootTask = new Task(arr, 0, arr.length)
Assume that task class has a compute method which searches the specified range. The compute method will check to see how many elements are in the range it needs to search. If the number of elements in the range is below a threshold value, then a sequential computation is performed. Otherwise, the range is split into two sub-ranges containing half of the elements. Subtasks are created to search the sub-ranges. The maximum of the maximum values in the subranges is the maximum value in the overall range.

Task != Thread, Work stealing

Even though tasks behave a lot like threads, we do not want to make then equivalent to threads. Tasks are much more "lightweight" than threads, and in general, a fork/join program should be reasonably efficient even if a large number of tasks are created. For this reason, allocating one thread per task is not a good strategy, because the overhead of creating, running, and terminating threads could create overhead which reduces the possible parallel speedup.

In practice, fork/join frameworks use a thread pool in which a fixed number of threads are created. Each thread has a queue of tasks that are awaiting a chance to execute. When a task is started (forked), it is added to the queue of the thread that is executing its parent task.

Because each thread can be executing only one task at a time, each thread's task queues can accumulate tasks which are not currently executing. Threads that have no tasks allocated to them will attempt to steal a task from a thread whose queue has at least one task - this is called work stealing. By this mechanism, tasks are distributed to all of the threads in the thread pool.

By using a thread pool with work stealing, a fork/join framework can allow a relatively fine-grained division of the problem, but only create the minimum number of threads needed to fully exploit the available CPU cores. Typically, the thread pool will have one thread per available CPU core.
class FindMaxTask extends RecursiveTask<Integer> {

        private int[] arr;
        private int start, end;

        public FindMaxTask(int[] arr, int start, int end) {
                this.arr = arr;
                this.start = start;
                this.end = end;
        }

        @Override
        protected Integer compute() {
                if (end - start <= THRESHOLD) {
                        // number of elements is at or below the threshold - compute directly
                        return computeDirectly();
                } else {
                        // number of elements is above the threshold - split into subtasks
                        int mid = start + (end-start) / 2;
                        FindMaxTask left = new FindMaxTask(arr, start, mid);
                        FindMaxTask right = new FindMaxTask(arr, mid, end);

                        // invoke the tasks in parallel and wait for them to complete
                        invokeAll(left, right);

                        // maximum of overall range is maximum of sub-ranges
                        return Math.max(left.join(), right.join());
                }
        }

        private Integer computeDirectly() {
                int max = Integer.MIN_VALUE;
                for (int i = start; i < end; i++) {
                        if (arr[i] > max) {
                                max = arr[i];
                        }
                }
                return max;
        }
}
Note that the invokeAll method may be called to execute two or more tasks in parallel (waiting for them to complete). The join method waits for a task to complete (if it has not completed yet) and returns the result of its compute method.
To do the overall computation on an entire array, we just need to create a root task and execute it using a ForkJoinPool object:
int[] data = some array of integer values

ForkJoinPool pool = new ForkJoinPool();
FindMaxTask rootTask = new FindMaxTask(data, 0, data.length);
Integer result = pool.invoke(rootTask);

int threshold = data.length / Runtime.getRuntime().availableProcessors();
MergeSortTask rootTask = new MergeSortTask(data, 0, data.length, threshold);

http://cecs.wright.edu/~pmateti/Courses/7900/Lectures/jdk1.8.0_20-samples/sample/forkjoin/mergesort/MergeSort.java
public class MergeSort {
    private final ForkJoinPool pool;

    private static class MergeSortTask extends RecursiveAction {
        private final int[] array;
        private final int low;
        private final int high;
        private static final int THRESHOLD = 8;

        /**
         * Creates a {@code MergeSortTask} containing the array and the bounds of the array
         *
         * @param array the array to sort
         * @param low the lower element to start sorting at
         * @param high the non-inclusive high element to sort to
         */
        protected MergeSortTask(int[] array, int low, int high) {
            this.array = array;
            this.low = low;
            this.high = high;
        }

        @Override
        protected void compute() {
            if (high - low <= THRESHOLD) {
                Arrays.sort(array, low, high);
            } else {
                int middle = low + ((high - low) >> 1);
                // Execute the sub tasks and wait for them to finish
                invokeAll(new MergeSortTask(array, low, middle), new MergeSortTask(array, middle, high));
                // Then merge the results
                merge(middle);
            }
        }

        /**
         * Merges the two sorted arrays this.low, middle - 1 and middle, this.high - 1
         * @param middle the index in the array where the second sorted list begins
         */
        private void merge(int middle) {
            if (array[middle - 1] < array[middle]) {
                return; // the arrays are already correctly sorted, so we can skip the merge
            }
            int[] copy = new int[high - low];
            System.arraycopy(array, low, copy, 0, copy.length);
            int copyLow = 0;
            int copyHigh = high - low;
            int copyMiddle = middle - low;

            for (int i = low, p = copyLow, q = copyMiddle; i < high; i++) {
                if (q >= copyHigh || (p < copyMiddle && copy[p] < copy[q]) ) {
                    array[i] = copy[p++];
                } else {
                    array[i] = copy[q++];
                }
            }
        }
    }

    /**
     * Creates a {@code MergeSort} containing a ForkJoinPool with the indicated parallelism level
     * @param parallelism the parallelism level used
     */
    public MergeSort(int parallelism) {
        pool = new ForkJoinPool(parallelism);
    }

    /**
     * Sorts all the elements of the given array using the ForkJoin framework
     * @param array the array to sort
     */
    public void sort(int[] array) {
        ForkJoinTask<Void> job = pool.submit(new MergeSortTask(array, 0, array.length));
        job.join();
    }
}

http://www.javacodegeeks.com/2011/02/java-forkjoin-parallel-programming.html
03public class FibonacciProblem {
04  
05 public int n;
06
07 public FibonacciProblem(int n) {
08  this.n = n;
09 }
10  
11 public long solve() {
12  return fibonacci(n);
13 }
14  
15 private long fibonacci(int n) {
16  System.out.println("Thread: " +
17   Thread.currentThread().getName() + " calculates " + n);
18  if (n <= 1)
19   return n;
20  else
21   return fibonacci(n-1) + fibonacci(n-2);
22 }
23
24}

public class FibonacciTask extends RecursiveTask<Long> {
06
07 private static final long serialVersionUID = 6136927121059165206L;
08  
09 private static final int THRESHOLD = 5;
10
11 private FibonacciProblem problem;
12 public long result;
13  
14 public FibonacciTask(FibonacciProblem problem) {
15  this.problem = problem;
16 }
17
18 @Override
19 public Long compute() {
20  if (problem.n < THRESHOLD) { // easy problem, don't bother with parallelism
21   result = problem.solve();
22  }
23  else {
24   FibonacciTask worker1 = new FibonacciTask(new FibonacciProblem(problem.n-1));
25   FibonacciTask worker2 = new FibonacciTask(new FibonacciProblem(problem.n-2));
26   worker1.fork();
27   result = worker2.compute() + worker1.join();
28
29  }
30  return result;
31 }
32
33}

03public class FibonacciProblem {
04  
05 public int n;
06
07 public FibonacciProblem(int n) {
08  this.n = n;
09 }
10  
11 public long solve() {
12  return fibonacci(n);
13 }
14  
15 private long fibonacci(int n) {
16  System.out.println("Thread: " +
17   Thread.currentThread().getName() + " calculates " + n);
18  if (n <= 1)
19   return n;
20  else
21   return fibonacci(n-1) + fibonacci(n-2);
22 }
23
24}

http://ricardozuasti.com/2012/java-concurrency-examples-forkjoin-framework/
public class QuickSort<T extends Comparable> extends RecursiveAction {
 
    private List<T> data;
    private int left;
    private int right;
 
    public QuickSort(List<T> data){
        this.data=data;
        this.left = 0;
        this.right = data.size() - 1;
    }
 
    public QuickSort(List<T> data, int left, int right){
        this.data = data;
        this.left = left;
        this.right = right;
    }
 
    @Override
    protected void compute() {
        if (left < right){
            int pivotIndex = left + ((right - left)/2);
 
            pivotIndex = partition(pivotIndex);
 
            invokeAll(new QuickSort(data, left, pivotIndex-1),
                      new QuickSort(data, pivotIndex+1, right));
        }
 
    }
 
    private int partition(int pivotIndex){
        T pivotValue = data.get(pivotIndex);
 
        swap(pivotIndex, right);
 
        int storeIndex = left;
        for (int i=left; i<right; i++){
            if (data.get(i).compareTo(pivotValue) < 0){
                swap(i, storeIndex);
                storeIndex++;
            }
        }
 
        swap(storeIndex, right);
 
        return storeIndex;
    }
 
    private void swap(int i, int j){
        if (i != j){
            T iValue = data.get(i);
 
            data.set(i, data.get(j));
            data.set(j, iValue);
        }
    }
}
http://highperformanceinjava.blogspot.com/2013/01/forkjoin-for-parallel-sorting-and-more.html
protected void compute() {
        if (begin < end) {
            int pivot = OurQuickSort.partition(array, begin, end);
            if (end - begin < THRESHOLD) {
                OurQuickSort.quicksort(array, begin, pivot - 1);
                OurQuickSort.quicksort(array, pivot + 1, end);
            } else {
                invokeAll(
                        new SortAction(array, begin, pivot - 1),
                        new SortAction(array, pivot + 1, end));
            }
        }
    }
http://java.dzone.com/articles/javas-fork-join-framework

http://www.java2s.com/Open-Source/Java_Free_Code/Framework/Download_parallel_sort_Free_Java_Code.htm
Implementation of merge sort and quicksort using the Fork/Join framework (RecursiveAction) of Java 7.

Labels

LeetCode (1432) GeeksforGeeks (1122) LeetCode - Review (1067) Review (882) Algorithm (668) to-do (609) Classic Algorithm (270) Google Interview (237) Classic Interview (222) Dynamic Programming (220) DP (186) Bit Algorithms (145) POJ (141) Math (137) Tree (132) LeetCode - Phone (129) EPI (122) Cracking Coding Interview (119) DFS (115) Difficult Algorithm (115) Lintcode (115) Different Solutions (110) Smart Algorithm (104) Binary Search (96) BFS (91) HackerRank (90) Binary Tree (86) Hard (79) Two Pointers (78) Stack (76) Company-Facebook (75) BST (72) Graph Algorithm (72) Time Complexity (69) Greedy Algorithm (68) Interval (63) Company - Google (62) Geometry Algorithm (61) Interview Corner (61) LeetCode - Extended (61) Union-Find (60) Trie (58) Advanced Data Structure (56) List (56) Priority Queue (53) Codility (52) ComProGuide (50) LeetCode Hard (50) Matrix (50) Bisection (48) Segment Tree (48) Sliding Window (48) USACO (46) Space Optimization (45) Company-Airbnb (41) Greedy (41) Mathematical Algorithm (41) Tree - Post-Order (41) ACM-ICPC (40) Algorithm Interview (40) Data Structure Design (40) Graph (40) Backtracking (39) Data Structure (39) Jobdu (39) Random (39) Codeforces (38) Knapsack (38) LeetCode - DP (38) Recursive Algorithm (38) String Algorithm (38) TopCoder (38) Sort (37) Introduction to Algorithms (36) Pre-Sort (36) Beauty of Programming (35) Must Known (34) Binary Search Tree (33) Follow Up (33) prismoskills (33) Palindrome (32) Permutation (31) Array (30) Google Code Jam (30) HDU (30) Array O(N) (29) Logic Thinking (29) Monotonic Stack (29) Puzzles (29) Code - Detail (27) Company-Zenefits (27) Microsoft 100 - July (27) Queue (27) Binary Indexed Trees (26) TreeMap (26) to-do-must (26) 1point3acres (25) GeeksQuiz (25) Merge Sort (25) Reverse Thinking (25) hihocoder (25) Company - LinkedIn (24) Hash (24) High Frequency (24) Summary (24) Divide and Conquer (23) Proof (23) Game Theory (22) Topological Sort (22) Lintcode - Review (21) Tree - Modification (21) Algorithm Game (20) CareerCup (20) Company - Twitter (20) DFS + Review (20) DP - Relation (20) Brain Teaser (19) DP - Tree (19) Left and Right Array (19) O(N) (19) Sweep Line (19) UVA (19) DP - Bit Masking (18) LeetCode - Thinking (18) KMP (17) LeetCode - TODO (17) Probabilities (17) Simulation (17) String Search (17) Codercareer (16) Company-Uber (16) Iterator (16) Number (16) O(1) Space (16) Shortest Path (16) itint5 (16) DFS+Cache (15) Dijkstra (15) Euclidean GCD (15) Heap (15) LeetCode - Hard (15) Majority (15) Number Theory (15) Rolling Hash (15) Tree Traversal (15) Brute Force (14) Bucket Sort (14) DP - Knapsack (14) DP - Probability (14) Difficult (14) Fast Power Algorithm (14) Pattern (14) Prefix Sum (14) TreeSet (14) Algorithm Videos (13) Amazon Interview (13) Basic Algorithm (13) Codechef (13) Combination (13) Computational Geometry (13) DP - Digit (13) LCA (13) LeetCode - DFS (13) Linked List (13) Long Increasing Sequence(LIS) (13) Math-Divisible (13) Reservoir Sampling (13) mitbbs (13) Algorithm - How To (12) Company - Microsoft (12) DP - Interval (12) DP - Multiple Relation (12) DP - Relation Optimization (12) LeetCode - Classic (12) Level Order Traversal (12) Prime (12) Pruning (12) Reconstruct Tree (12) Thinking (12) X Sum (12) AOJ (11) Bit Mask (11) Company-Snapchat (11) DP - Space Optimization (11) Dequeue (11) Graph DFS (11) MinMax (11) Miscs (11) Princeton (11) Quick Sort (11) Stack - Tree (11) 尺取法 (11) 挑战程序设计竞赛 (11) Coin Change (10) DFS+Backtracking (10) Facebook Hacker Cup (10) Fast Slow Pointers (10) HackerRank Easy (10) Interval Tree (10) Limited Range (10) Matrix - Traverse (10) Monotone Queue (10) SPOJ (10) Starting Point (10) States (10) Stock (10) Theory (10) Tutorialhorizon (10) Kadane - Extended (9) Mathblog (9) Max-Min Flow (9) Maze (9) Median (9) O(32N) (9) Quick Select (9) Stack Overflow (9) System Design (9) Tree - Conversion (9) Use XOR (9) Book Notes (8) Company-Amazon (8) DFS+BFS (8) DP - States (8) Expression (8) Longest Common Subsequence(LCS) (8) One Pass (8) Quadtrees (8) Traversal Once (8) Trie - Suffix (8) 穷竭搜索 (8) Algorithm Problem List (7) All Sub (7) Catalan Number (7) Cycle (7) DP - Cases (7) Facebook Interview (7) Fibonacci Numbers (7) Flood fill (7) Game Nim (7) Graph BFS (7) HackerRank Difficult (7) Hackerearth (7) Inversion (7) Kadane’s Algorithm (7) Manacher (7) Morris Traversal (7) Multiple Data Structures (7) Normalized Key (7) O(XN) (7) Radix Sort (7) Recursion (7) Sampling (7) Suffix Array (7) Tech-Queries (7) Tree - Serialization (7) Tree DP (7) Trie - Bit (7) 蓝桥杯 (7) Algorithm - Brain Teaser (6) BFS - Priority Queue (6) BFS - Unusual (6) Classic Data Structure Impl (6) DP - 2D (6) DP - Monotone Queue (6) DP - Unusual (6) DP-Space Optimization (6) Dutch Flag (6) How To (6) Interviewstreet (6) Knapsack - MultiplePack (6) Local MinMax (6) MST (6) Minimum Spanning Tree (6) Number - Reach (6) Parentheses (6) Pre-Sum (6) Probability (6) Programming Pearls (6) Rabin-Karp (6) Reverse (6) Scan from right (6) Schedule (6) Stream (6) Subset Sum (6) TSP (6) Xpost (6) n00tc0d3r (6) reddit (6) AI (5) Abbreviation (5) Anagram (5) Art Of Programming-July (5) Assumption (5) Bellman Ford (5) Big Data (5) Code - Solid (5) Code Kata (5) Codility-lessons (5) Coding (5) Company - WMware (5) Convex Hull (5) Crazyforcode (5) DFS - Multiple (5) DFS+DP (5) DP - Multi-Dimension (5) DP-Multiple Relation (5) Eulerian Cycle (5) Graph - Unusual (5) Graph Cycle (5) Hash Strategy (5) Immutability (5) Java (5) LogN (5) Manhattan Distance (5) Matrix Chain Multiplication (5) N Queens (5) Pre-Sort: Index (5) Quick Partition (5) Quora (5) Randomized Algorithms (5) Resources (5) Robot (5) SPFA(Shortest Path Faster Algorithm) (5) Shuffle (5) Sieve of Eratosthenes (5) Strongly Connected Components (5) Subarray Sum (5) Sudoku (5) Suffix Tree (5) Swap (5) Threaded (5) Tree - Creation (5) Warshall Floyd (5) Word Search (5) jiuzhang (5)

Popular Posts