在生产环境中,应避免直接创建线程,线程数量必须得到控制。

1. 线程池

为了控制线程,JDK类库提供了一套Executor框架。

executor

线程池是JDK用来管理线程的的静态工厂。上图中ThreadPoolExecutor表示一个线程池。
Executor是一个接口,接口中只有void execute(Runnable command)方法。
ExecutorService也是一个接口,继承ExecutorService,增加了许多使用线程池的公用方法定义。
AbstactExecutorService为ExecutorService接口提供了默认实现。
ThreadPoolExecutor继承AbstactExecutorService抽象类。
Executors类是JDK1.5版本时封装的线程池工厂和工具类,这个类提供了几种默认的线程池类型和默认线程池工厂。
ForkJoinPool是Java7加入的一种用于并行执行任务的框架

1.1 创建线程池

创建线程池的实现其实是实例化ThreadPoolExecutor的过程:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

corePoolSize:指定线程池中的线程数量。
maximumPoolSize:线程池中的最大线程数量。
keepAliveTime:当线程池超过corePoolSize时,多余的空闲线程的空闲时间。
unit:keepAliveTime的时间单位。
workQueue:任务队列,被提交但未被执行的任务。
threadFactory工厂:创建线程的工厂,一般默认就可以。
handler:拒绝策略,当线程池满负荷运行,如何拒绝新的任务的策略。

参数解析

keepAliveTime,unit,threadFactory几个参数可根据线程池的任务场景去做简单的变化,在此不再赘述,一般自定义线程池我们的关注点大都在corePoolSize,maximumPoolSize,workQueue,handler四个参数上。

corePoolSize,根据业务的通用场景确定即可。《Java并发编程实践》书中给出了一个估算线程池corePoolSize大小的经验公式:
Ncpu=cpu数量
Ucpu=希望cpu的使用率,0<Ucpu<1
W/C=等待时间与计算时间的比率, CPU计算时间计算方式

Nthreads=Ncpu*Ucpu*(1+W/C)

如果你处理的是阻塞比较多的任务,你可以根据上述公式大致算出需要的线程数量(一般会远远超出当前实例所在服务器的cpu数量);如果是阻塞比较少的任务即cpu计算比重较大的任务,线程的数量可能就会相应的减少一些,避免服务器的超负荷运行。总之线程数不是精确的一个数,只要符合你业务的场景的大概数量就可以。

workQueue任务队列分为有限、无限、同步移交三种阻塞队列,常用的有如下几个:

  • ArrayBlockingQueue: 一个基于数组结构的有界阻塞队列,此队列按照FIFO原则排序。
  • LinkedBlockingQueue: 一个基于链表的阻塞队列,此队列按照FIFO原则排序,吞吐量高于ArrayBlockingQueue。
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须阻塞到另一个线程的移除操作。
  • PriorityBlockingQueue: 具有优先级的无限阻塞队列。

线程池的执行过程:

threadpool-processor

–图摘自《Java并发编程艺术》9.1小节-线程池的实现原理。

队列的大小和maxmumPoolSize息息相关,如果使用无界队列,则maxmumPoolSize也就失效了,如果使用的是有界队列,则当有界队列满了,则新启动线程执行任务。直到最大线程也满了之后执行拒绝策略。

最后就是拒绝策略:

  • AbortPolicy: 直接抛出异常。
  • CallerRunPolicy: 使用调用者的线程执行任务。
  • DiscardOldestPolicy: 丢弃队列里最老的任务,并执行当前任务。
  • DiscardPolicy: 不处理,丢弃。

除此之外还可以实现RejectExecutionHandler接口,自定义拒绝策略。

1.2 Executor提供的几种类型的线程池解析

  1. Executors.newFixedThreadPool(1);

Executors.newFixedThreadPool创建的线程池可以指定核心线程数,但是使用的是无界队列,如果是IO密集型任务,可能导致内存溢出。

  1. Executors.newSingleThreadExecutor();

Executors.newSingleThreadExecutor()创建一个线程的线程池,同样使用无界队列,和newFixedThreadPool的差别仅限于核心线程数。

  1. Executors.newCachedThreadPool();

Executors.newCachedThreadPool()创建的线程池是一个没有队列的存储任务的线程池,线程池最大数量为Integer.MAX_VALUE。所以这个线程池会一直创建新的线程执行任务,可能导致内存溢出。适用于中小数量级的任务,且任务非CPU密集型。

  1. Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory());

Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory())创建可以定时或延时执行任务的线程池,与Timer相比,具有更多的弹性。详解Java定时任务—极客学院

  1. Executors.newWorkStealingPool();

封装的ForkJoinPool线程池,线程数量为当前运行环境的cpu数量,不处理异常,异步模式。下一小节详细介绍。

1.3 ForkJoinPool线程池初探

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成小任务,最终汇总小任务结果后得到大任务结果的框架。

forkjoin

–图片摘自《Java并发编程的艺术》6.4.1 什么是Fork/Join框架 小节。

缺点:在某些情况下,该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

使用ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final int THRESHOLD = 2;  // 阈值 private int start;
private int end;
public CountTask(int start, int end) {
this.start = start; this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult=leftTask.join();
int rightResult=rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {

} catch (ExecutionException e) {

}
}

–示例摘自《Java并发编程的艺术》6.4.4 使用Fork/Join框架 小节。

异常处理

ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码。

1
2
3
if(task.isCompletedAbnormally()) {
System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

原理解析

ForkJoinPool继承AbstractExecutorService。

ForkJoinPool参数:

  • parallelism线程数量;
  • ForkJoinWorkerThreadFactory是ForkJoin线程工厂,创建ForkJoinWorkerThread的线程类实例;
  • mode是使用FIFO模式(true)还是LIFO模式(false);
  • UncaughtExceptionHandler是异常处理;
  • workerNamePrefix工作线程的名称前缀;

通常来说,我们使用ForkJoinPool时如果不指定线程数量时默认取2047和当前服务器cpu数量中的最小值。简单来说就是创建了一个fork/join线程的线程池。

参考资料

Effective Java 第二版 中文版
实战Java高并发程序设计 葛一鸣,郭超编著
Java并发编程艺术 方腾飞,魏鹏,程晓明 著