Java并发编程 — 任务执行
1 任务执行模式
使用「请求-线程池」模式而非「请求-线程」模式主要是为了优化过多线程在:
- 线程创建与关闭的资源消耗
- 空闲线程的资源占用以及CPU资源的竞争 等等。通常资源池化是解决诸如此类问题的最通用的方式之一。
1.1 有效的并行
假设任务A执行时间为90ms,任务B为10ms,试想两种情况:
- 串行执行,耗时为100ms+。特点:代码简单编写并且易懂,维护轻量。
- 并行执行,理想是90ms+。特点:代码层面复杂,维护难。系统层面增加了CPU任务协调负担与资源竞争等。
其实,此种情况的并行收益并不明显。
原则:
大量 相互独立且 同类 的任务进行 并发 处理,才能将任务进行有效的分配,进而获得性能的提升。
2 任务执行框架与工具
2.1 ExecutorService vs CompletionService
同样作为异步执行的框架,二者在实现上的不同主要体现在:
- ExecutorService = incoming queue + worker threads
- CompletionService = incoming queue + worker threads + output queue
其中 CompletionService 更大的优点在于对isDone的Task的获取的便利性,而不需要按照既定的顺序去 同步等待结果,举个例子:
- 并行获取4个资源,当有3个返回时就视为成功,并获取结果,cancel掉未返回的结果。
如下是一个使用 CompletionService
的例子(StackOverFlow):
ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
CompletionService<CalcResult> taskCompletionService =
new ExecutorCompletionService<CalcResult>(taskExecutor);
int submittedTasks = 5;
for(int i=0;i< submittedTasks;i++){
taskCompletionService.submit(new CallableTask(
String.valueOf(i),
(i * 10),
((i * 10) + 10 )
));
System.out.println("Task " + String.valueOf(i) + "subitted");
}
for(int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++){
try {
System.out.println("trying to take from Completion service");
Future<CalcResult> result = taskCompletionService.take();
System.out.println("result for a task availble in queue.Trying to get()" );
// above call blocks till atleast one task is completed and results availble for it
// but we dont have to worry which one
// process the result here by doing result.get()
CalcResult l = result.get();
System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));
} catch (InterruptedException e) {
// Something went wrong with a task submitted
System.out.println("Error Interrupted exception");
e.printStackTrace();
} catch (ExecutionException e) {
// Something went wrong with the result
e.printStackTrace();
System.out.println("Error get() threw exception");
}
}
2.2 优雅的任务执行
任务的生命周期不仅包括任务的创建与执行,更包含任务的中断与结束。而针对后者的优雅处理才能体现一个程序员水平,才能让服务更加健壮。
任务取消
通常,任务的取消有两种方式:
- 取消标志位
- 中断 (通常针对可响应中断的阻塞)
任务优雅取消
通常,我们用线程池的方式来管理线程,ExecutorService提供了优雅的线程取消与线程池关闭的接口与策略。
如通过获取的Future.cancel 来取消任务的执行。 通过 shutdown(等待当前存在的任务执行完成) 和 shutdownNow(不等待,返回未执行完毕的任务)的方式优雅关闭线程池。
JVM关闭
当JVM关闭时(通过调用System.exit, 或者Linux中的SIGNAL, 注意 kill -9是不会发送SIGNAL的),可以通过注册JVM钩子来补货关闭信号,并在关闭前执行相应的操作。如:
Thread shutdownThread = ...
Runtime.getRuntime().addShutdownHook(shutdownThread);
还需要注意的一点是,Daemon Thread 是不会阻碍JVM关闭的,当JVM停止时,并不会执行Daemon Thread的 finally方法,不会是释放栈。
2.3 任务执行框架
ExecutorService & ThreadPoolExecutor
ExecutorService是最常用的任务执行框架,但是,在使用它进行任务执行时,有很多点需要关注。
关于池化: 池化的设计思路主要是考虑到如下几点:
- 创建和销毁对象的资源消耗。
- 创建和销毁对象的时间消耗。
- 针对同一资源的统一管理。
任务隔离
隔离是代码编写中一种重要的模式,而任务按照执行耗时、资源竞争等作为分组条件,进行线程池划分是代码设计中需要慎重考虑与设计的,关于这点,可以参考Neflix的’Hystrix’框架。
线程池参数配置
线程数、任务队列和拒绝策略通常是ExecutorService最关注的三点。
(1)线程数。
线程数关系到系统资源的竞争(过多)与服务吞吐量(过少)。系统硬件资源(CPU个数等)和任务属性(资源竞争)是线程数设置的两个重要的参考指标。通常,如果线程数配置的如果不是过于不合理,是感知不到过大区别的,但是一个程序员代码水平的体现通常也是在这些细节上。 线程数的配置参考:线程个数 = CPU个数 x 预估的CPU利用率 x (1 + 线程等待时间/计算时间)
(2) 任务队列。 任务队列有三种可供选择:无界队列,有界队列,同步队列。 通常来讲,使用ArrayBlockingQueue或者定长大小的LinkedBlockingQueue是更好的选择,可以最大限度的避免当服务能力有限或者请求突增情况下的资源耗尽。
在特定情景下,如对任务执行优先级有限制,使用SynchronousQueue或者PriorityBlockingQueue是更好的选择。
(3)拒绝策略。
当队列已满或者在线程池关闭后提交任务都会触发拒绝策略的执行。默认的实现有Abort,Discard等,也可以自己实现策略接口实现自己的局将策略,如日志,异常上报等。
当然也可以通过包装 ExcutorService 为 BoundedExcutorSerivce的方式, 在任务提交前先使用Semaphore或者Guava的RateLimiter获取执行许可的方式来限制任务提交,来最终达到任务拒绝的目的。
编程池实现原理
首先看下ThreadPoolExecutor任务执行的源码。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* 1. 如果当前 running的线程数量小于 corePoolSize, 那么新建一个线程并start.
* 如果不小于, 转到2.
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 2. 尝试将任务放进队列, 如果成功, 再次去check 线程池状态,如果线程池已经 stop,那么
* 做rool back, 将刚放入queue的 command 删除, 并执行拒绝策略。
* 放进任务队列失败,执行3.
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 3. 再次检查是否能创建线程执行任务, 如果不能,拒绝策略执行。
*/
else if (!addWorker(command, false))
reject(command);
}
如上的过程,可以让execute过程中,尽量的避免获取全局锁。在1和3过程中是需要获取全解锁。而步骤2 尽量的使用double check的方式。这个流程也尽可能的通过避免锁的使用的方式达到最大的可伸缩性。
3 并发工具
这里分为如下几部分:
- 同步工具
- 并发容器
- 原子变量
3.1 同步工具
状态依赖 所谓的状态依赖,是指任务的执行需要一些先验条件的满足,这些条件就是任务向下执行依赖的状态。 如BlockingQueue offer操作的先验条件是队列未满。
针对这种状态依赖,通常的两种策略是:
- 状态不满足,直接返回。
- 状态不满足,阻塞。当状态满足是被唤醒继续执行。
这里只阐述第二种策略的实现方式
轮询加休眠
public void doThings() throws Exception{
while (true) {
synchronized (this) {
if (isStateSatisfied()) {
// do your things
}
Thread.sleep(100);
}
}
}
缺点:
- 大量业务无关的代码,复杂。
- 休眠时间不好控制,太长影响响应性,太短会加大CPU消耗。
条件队列
条件队列可以让一组线程以某种方式等待条件变为真。对象 内部条件队列 相关的API包含 wait()/notifyAll()/notify()。
public synchronized void doThings() throws Exception {
while (!isStateSatisfied()) {
// wait
wait();
}
// do your things
// notify
notifyAll();
}
类似于显式锁Lock对内部锁synchronized的扩展,Condition也是对内部条件队列的拓展。
AbstractQueuedSynchronizer
AQS是构建锁和synchronizer的框架,像锁ReentrantLock,以及信号量、CountDownLatch、FutureTask等同步器都是 基于AQS框架构建的。
3.2 并发容器
这里只简单的谈谈ConcurrentHashMap。
ConcurrentHashMap的产生是为了解决HashMap的线程不安全性以及HashTable的效率低下。前者在并发的情况下可能造成Entry链表 形成环形,而后者在内部使用synchronized保证线程安全但是效率低下。
ConcurrentHashMap的思路是通过数据分段为锁分段提供条件,已到达最小的锁竞争。
3.3 原子变量与非阻塞同步机制
锁的缺点主要体现在获取不到锁时:
- 线程挂起和恢复的CPU开销
- 线程挂起后需要等待再次被调度,此时的实时性问题
与锁这种悲观的方式比较,更乐观的策略通常是冲突检测。当前的多处理器的操作系统,针对于并发通常提供了底层的原子操作的支持, 如CAS(compare and set)等。
Java提供的原子变量包括AtomicInteger/AtomicLong等变量,通常非阻塞算法与数据结构都是基于如上原子变量以及volatile变量 构建起来的。