Fork/Join
The fork/join framework is an implementation of the ‘ExecutorService’ interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the perfromance of your application.
As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.
#Basic Use
The first step for using the fork/join framework is to write code that performs a segment of the work. Your code should look similar to the following pseudocode:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
Wrap this code in a ForkJoinTask subclass, typically using one of its more specialized types, either RecursiveTask (which can return a result) or RecursiveAction. After your ForkJoinTask subclass is ready, create the object that represents all the work to be done and pass it to the invoke() method of a ForkJoinPool instance.
#Blurring for Clarity
To help you understand how the fork/join framework works, consider the following example. Suppose that you want to blur an image. The original source image is represented by an array of integers, where each integer contains the color values for a single pixel. The blurred destination image is also represented by an integer array with the same size as the source.
Performing the blur is accomplished by working through the source array one pixel at a time. Each pixel is averaged with its surrounding pixels (the red, green, and blue components are averaged), and the result is placed in the destination array. Since an image is a large array, this process can take a long time. You can take advantage of concurrent processing on multiprocessor systems by implementing the algorithm using the fork/join framework. Here is one possible implementation:
package ink.cyz.learn.java.multiThread.list;
import org.junit.Test;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;
import java.util.concurrent.*;
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd.
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
// Average pixels from source, write results into destination.
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
int pixel = mSource[mindex];
rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
}
// Re-assemble destination pixel.
int dpixel = (0xff000000)
| (((int) rt) << 16)
| (((int) gt) << 8)
| (((int) bt) << 0);
mDestination[index] = dpixel;
}
}
protected static int sThreshold = 10000;
@Override
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}
// Plumbing follows.
public static void main(String[] args) throws Exception {
String srcName = "c:\\hxb.jpg";
File srcFile = new File(srcName);
BufferedImage image = ImageIO.read(srcFile);
System.out.println("Source image: " + srcName);
BufferedImage blurredImage = blur(image);
String dstName = "c:\\hxb1.jpg";
File dstFile = new File(dstName);
ImageIO.write(blurredImage, "jpg", dstFile);
System.out.println("Output image: " + dstName);
}
public static BufferedImage blur(BufferedImage srcImage) {
int w = srcImage.getWidth();
int h = srcImage.getHeight();
int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
int[] dst = new int[src.length];
System.out.println("Array size is " + src.length);
System.out.println("Threshold is " + sThreshold);
int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor"
+ (processors != 1 ? "s are " : " is ")
+ "available");
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
ForkJoinPool pool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
pool.invoke(fb);
long endTime = System.currentTimeMillis();
System.out.println("Image blur took " + (endTime - startTime) +
" milliseconds.");
BufferedImage dstImage =
new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
dstImage.setRGB(0, 0, w, h, dst, 0, w);
return dstImage;
}
}