Documentation

The Java™ Tutorials
Hide TOC
Fork/Join
Trail: Essential Java Classes
Lesson: Concurrency
Section: High Level Concurrency Objects
Subsection: Executors

Fork/Join

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. fork/join框架是ExecutorService接口的实现,它帮助您利用多个处理器。It is designed for work that can be broken into smaller pieces recursively. 它是为可以递归地分解成更小的片段的工作而设计的。The goal is to use all the available processing power to enhance the performance of your application.目标是使用所有可用的处理能力来提高应用程序的性能。

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. 与任何ExecutorService实现一样,fork/join框架将任务分发给线程池中的工作线程。The fork/join framework is distinct because it uses a work-stealing algorithm. fork/join框架是不同的,因为它使用了工作窃取算法。Worker threads that run out of things to do can steal tasks from other threads that are still busy.没有事情可做的工作线程可以从其他仍然繁忙的线程窃取任务。

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. fork/join框架的中心是ForkJoinPool类,它是AbstractExecutorService类的扩展。ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.ForkJoinPool实现了核心工作窃取算法,可以执行ForkJoinTask流程。

Basic Use基本用途

The first step for using the fork/join framework is to write code that performs a segment of the work. 使用fork/join框架的第一步是编写执行工作部分的代码。Your code should look similar to the following pseudocode:您的代码应类似于以下伪代码:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

Wrap this code in a ForkJoinTask subclass, typically using one of its more specialized types, either RecursiveTask (which can return a result) or RecursiveAction.将此代码包装在ForkJoinTask子类中,通常使用其更专门化的类型之一,即RecursiveTask(可以返回结果)或RecursiveAction

After your ForkJoinTask subclass is ready, create the object that represents all the work to be done and pass it to the invoke() method of a ForkJoinPool instance.在您的ForkJoinTask子类准备就绪后,创建表示要完成的所有工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。

Blurring for Clarity为了清晰而模糊

To help you understand how the fork/join framework works, consider the following example. 为了帮助您理解Foo/Engin框架是如何工作的,请考虑下面的示例。Suppose that you want to blur an image. 假设要模糊图像。The original source image is represented by an array of integers, where each integer contains the color values for a single pixel. 原始源图像由整数数组表示,其中每个整数包含单个像素的颜色值。The blurred destination image is also represented by an integer array with the same size as the source.模糊的目标图像也由与源图像大小相同的整数数组表示。

Performing the blur is accomplished by working through the source array one pixel at a time. 执行模糊是通过一次通过源阵列一个像素来完成的。Each pixel is averaged with its surrounding pixels (the red, green, and blue components are averaged), and the result is placed in the destination array. 每个像素与其周围的像素平均(红色、绿色和蓝色分量平均),结果放置在目标阵列中。Since an image is a large array, this process can take a long time. 由于图像是一个大型阵列,因此此过程可能需要很长时间。You can take advantage of concurrent processing on multiprocessor systems by implementing the algorithm using the fork/join framework. 通过使用fork/join框架实现算法,可以利用多处理器系统上的并发处理。Here is one possible implementation:以下是一种可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >> 8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >> 0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

Now you implement the abstract compute() method, which either performs the blur directly or splits it into two smaller tasks. 现在实现了抽象compute()方法,该方法要么直接执行模糊,要么将其拆分为两个较小的任务。A simple array length threshold helps determine whether the work is performed or split.简单的数组长度阈值有助于确定是执行工作还是拆分工作。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

If the previous methods are in a subclass of the RecursiveAction class, then setting up the task to run in a ForkJoinPool is straightforward, and involves the following steps:如果前面的方法位于RecursiveAction类的子类中,那么将任务设置为在ForkJoinPool中运行是很简单的,并涉及以下步骤:

  1. Create a task that represents all of the work to be done.创建一个代表所有要完成的工作的任务。

    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. Create the ForkJoinPool that will run the task.创建将运行任务的ForkJoinPool

    ForkJoinPool pool = new ForkJoinPool();
  3. Run the task.运行任务。

    pool.invoke(fb);

For the full source code, including some extra code that creates the destination image file, see the ForkBlur example.有关完整的源代码,包括创建目标图像文件的一些额外代码,请参阅ForkBlur示例。

Standard Implementations标准实现

Besides using the fork/join framework to implement custom algorithms for tasks to be performed concurrently on a multiprocessor system (such as the ForkBlur.java example in the previous section), there are some generally useful features in Java SE which are already implemented using the fork/join framework. 除了使用fork/join框架为要在多处理器系统上并发执行的任务实现自定义算法(如前一节中的ForkBlur.java示例),java SE中还有一些常用的功能,这些功能已经使用fork/join框架实现。One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. Java SE 8中引入的一个这样的实现被java.util.Arrays类用于其parallelSort()方法。These methods are similar to sort(), but leverage concurrency via the fork/join framework. 这些方法类似于sort(),但通过fork/join框架利用并发性。Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems. 在多处理器系统上运行时,大型阵列的并行排序比顺序排序快。However, how exactly the fork/join framework is leveraged by these methods is outside the scope of the Java Tutorials. 但是,这些方法究竟如何利用fork/join框架超出了Java教程的范围。For this information, see the Java API documentation.有关此信息,请参阅Java API文档。

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release. fork/join框架的另一个实现由java.util.streams包中的方法使用,该包是计划在JavaSE8版本中发布的项目Lambda的一部分。For more information, see the Lambda Expressions section.有关详细信息,请参阅Lambda表达式部分。


Previous page: Thread Pools
Next page: Concurrent Collections